search_engine_backend/
indexer.rs

1use anyhow::{Context, Result};
2use chrono::{Duration as ChronoDuration, Utc};
3use reqwest::Client;
4use scraper::{Html, Selector};
5use sha2::{Digest, Sha256};
6use std::collections::HashSet;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::time::sleep;
11use tracing::{debug, error, info, warn};
12use url::Url;
13use uuid::Uuid;
14
15use crate::search::SearchDocument;
16use crate::storage::{CrawlQueue, CrawlStatus, WebPage};
17use crate::{Config, SearchService, StorageService};
18
19pub struct IndexerService {
20    config: Arc<Config>,
21    storage_service: Arc<StorageService>,
22    search_service: Arc<SearchService>,
23    client: Client,
24    force_process_flag: Arc<AtomicBool>,
25}
26
27impl IndexerService {
28    /// Generate a deterministic ID from a URL to prevent duplicate crawl items
29    fn url_to_id(url: &str) -> String {
30        let mut hasher = Sha256::new();
31        hasher.update(url.as_bytes());
32        let hash = hasher.finalize();
33        format!("{:x}", hash)
34    }
35
36    pub async fn new(
37        config: Arc<Config>,
38        storage_service: Arc<StorageService>,
39        search_service: Arc<SearchService>,
40    ) -> Result<Self> {
41        let client = Client::builder()
42            .user_agent(&config.application.user_agent)
43            .timeout(Duration::from_secs(30))
44            .build()
45            .context("Failed to create HTTP client for indexer")?;
46
47        Ok(Self {
48            config,
49            storage_service,
50            search_service,
51            client,
52            force_process_flag: Arc::new(AtomicBool::new(false)),
53        })
54    }
55
56    /// Queue domains for crawling
57    pub async fn queue_domains(&self, domains: &[String]) -> Result<usize> {
58        self.queue_domains_with_check(domains, true).await
59    }
60
61    /// Queue domains for crawling with optional last-indexed time checking
62    /// If check_last_indexed is true, domains that were indexed recently will be skipped
63    pub async fn queue_domains_with_check(
64        &self,
65        domains: &[String],
66        check_last_indexed: bool,
67    ) -> Result<usize> {
68        let mut queued_count = 0;
69
70        for domain in domains {
71            let url = if domain.starts_with("http") {
72                domain.clone()
73            } else {
74                format!("https://{}", domain)
75            };
76
77            let parsed_url = Url::parse(&url).with_context(|| format!("Invalid URL: {}", url))?;
78
79            let domain_name = parsed_url
80                .host_str()
81                .ok_or_else(|| anyhow::anyhow!("No host in URL: {}", url))?
82                .to_string();
83
84            // Check if domain is allowed
85            if !self.config.is_domain_allowed(&domain_name) {
86                warn!("Domain not in allowed list, skipping: {}", domain_name);
87                continue;
88            }
89
90            // Check if domain was recently indexed (if check is enabled)
91            if check_last_indexed {
92                if let Ok(Some(last_indexed)) = self
93                    .storage_service
94                    .get_domain_last_indexed(&domain_name)
95                    .await
96                {
97                    let threshold = Utc::now()
98                        - ChronoDuration::days(
99                            self.config.application.periodic_index_interval_days as i64,
100                        );
101                    if last_indexed > threshold {
102                        crate::log_and_capture!(info,
103                            "📋 SKIPPING DOMAIN: {} was recently indexed on {} (within {} day interval), skipping indexing",
104                            domain_name,
105                            last_indexed.format("%Y-%m-%d %H:%M:%S UTC"),
106                            self.config.application.periodic_index_interval_days
107                        );
108                        continue;
109                    } else {
110                        crate::log_and_capture!(info,
111                            "🔄 INDEXING DOMAIN: {} was last indexed on {} (older than {} day interval), will re-index",
112                            domain_name,
113                            last_indexed.format("%Y-%m-%d %H:%M:%S UTC"),
114                            self.config.application.periodic_index_interval_days
115                        );
116                    }
117                } else {
118                    info!(
119                        "🆕 INDEXING DOMAIN: {} has never been indexed, adding to queue",
120                        domain_name
121                    );
122                }
123            } else {
124                info!(
125                    "➡️ INDEXING DOMAIN: {} added to queue (forced indexing, no recency check)",
126                    domain_name
127                );
128            }
129
130            let crawl_item = CrawlQueue {
131                id: Uuid::new_v4().to_string(),
132                url: url.clone(),
133                domain: domain_name,
134                depth: 0,
135                status: CrawlStatus::Pending,
136                created_at: Utc::now(),
137                processed_at: None,
138                error_message: None,
139                retry_count: 0,
140            };
141
142            match self.storage_service.queue_crawl(&crawl_item).await {
143                Ok(_) => {
144                    info!(
145                        "✅ QUEUED: {} successfully added to crawl queue (depth: {})",
146                        url, crawl_item.depth
147                    );
148                    queued_count += 1;
149                }
150                Err(e) => {
151                    warn!("❌ QUEUE FAILED: Unable to queue domain {} - {}", url, e);
152                }
153            }
154        }
155
156        info!(
157            "📊 DOMAIN QUEUEING SUMMARY: Processed {} domains, queued {} for indexing",
158            domains.len(),
159            queued_count
160        );
161        Ok(queued_count)
162    }
163
164    /// Start the periodic indexing service that checks for stale domains and re-indexes them
165    /// This runs in a loop, checking every 6 hours for domains that need re-indexing
166    pub async fn start_periodic_indexing(&self) -> Result<()> {
167        info!(
168            "Starting periodic indexing service with interval of {} days",
169            self.config.application.periodic_index_interval_days
170        );
171
172        // Run periodically - check every 6 hours for domains that need re-indexing
173        let check_interval = Duration::from_secs(6 * 60 * 60); // 6 hours
174
175        loop {
176            sleep(check_interval).await;
177
178            info!("Running periodic domain indexing check");
179
180            match self.check_and_queue_stale_domains().await {
181                Ok(count) => {
182                    if count > 0 {
183                        info!("Queued {} domains for periodic re-indexing", count);
184                    } else {
185                        debug!("No domains need re-indexing at this time");
186                    }
187                }
188                Err(e) => {
189                    error!("Failed to check and queue stale domains: {}", e);
190                }
191            }
192        }
193    }
194
195    /// Start the periodic duplicate removal service
196    /// This runs in a loop, checking for and removing duplicates at configured intervals
197    pub async fn start_periodic_duplicate_removal(&self) -> Result<()> {
198        info!(
199            "Starting periodic duplicate removal service with interval of {} hours",
200            self.config.application.duplicate_removal_interval_hours
201        );
202
203        // Run periodically based on configuration
204        let check_interval =
205            Duration::from_secs(self.config.application.duplicate_removal_interval_hours * 60 * 60);
206
207        loop {
208            sleep(check_interval).await;
209
210            info!("Running periodic duplicate removal");
211
212            match self.storage_service.remove_duplicates().await {
213                Ok(count) => {
214                    if count > 0 {
215                        info!("Removed {} duplicate entries", count);
216                    } else {
217                        debug!("No duplicates found during this run");
218                    }
219                }
220                Err(e) => {
221                    error!("Failed to remove duplicates: {}", e);
222                }
223            }
224        }
225    }
226
227    /// Trigger immediate queue processing by setting the force flag
228    pub fn trigger_force_process_queue(&self) -> Result<()> {
229        self.force_process_flag.store(true, Ordering::Relaxed);
230        info!("🚀 Force queue processing triggered");
231        Ok(())
232    }
233
234    async fn check_and_queue_stale_domains(&self) -> Result<usize> {
235        // Get all allowed domains and queue those that haven't been indexed recently
236        let allowed_domains = &self.config.application.allowed_domains;
237
238        // Use queue_domains_with_check which will automatically check last indexed time
239        self.queue_domains_with_check(allowed_domains, true).await
240    }
241
242    pub async fn process_crawl_queue(&self) -> Result<()> {
243        info!("🚀 Starting crawl queue processing service");
244
245        let mut last_status_log = std::time::Instant::now();
246        let status_log_interval = Duration::from_secs(30); // Log status every 30 seconds
247        let mut items_processed_count = 0;
248        let mut total_processing_time = Duration::from_secs(0);
249
250        loop {
251            // Log periodic status if enough time has passed
252            if last_status_log.elapsed() >= status_log_interval {
253                self.log_crawl_queue_status(items_processed_count, total_processing_time)
254                    .await;
255                last_status_log = std::time::Instant::now();
256            }
257
258            let pending_items = self
259                .storage_service
260                .get_pending_crawl_items(self.config.application.max_concurrent_requests)
261                .await?;
262
263            if pending_items.is_empty() {
264                debug!("📭 No pending crawl items, entering idle mode");
265
266                // Wait for either timeout or force signal
267                let mut sleep_duration = Duration::from_secs(10);
268                loop {
269                    // Check if force processing was triggered
270                    if self.force_process_flag.load(Ordering::Relaxed) {
271                        self.force_process_flag.store(false, Ordering::Relaxed);
272                        info!("⚡ Force queue processing signal received - checking for items immediately");
273                        break;
274                    }
275
276                    // Sleep for 1 second at a time to be responsive to force signals
277                    let chunk_duration = Duration::from_secs(1);
278                    sleep(chunk_duration).await;
279
280                    if sleep_duration <= chunk_duration {
281                        break;
282                    }
283                    sleep_duration -= chunk_duration;
284                }
285                continue;
286            }
287
288            let batch_start = std::time::Instant::now();
289
290            // Log detailed information about the batch
291            let root_domains = pending_items.iter().filter(|item| item.depth == 0).count();
292            let discovered_links = pending_items.iter().filter(|item| item.depth > 0).count();
293
294            info!(
295                "⚙️ Processing batch of {} crawl items: {} root domains, {} discovered links",
296                pending_items.len(),
297                root_domains,
298                discovered_links
299            );
300
301            if discovered_links > 0 {
302                info!("🎯 PROCESSING DISCOVERED LINKS: The queue processor is now processing {} previously discovered links - this should resolve the missing URLs issue!", discovered_links);
303            }
304
305            // Process items concurrently but with a limit
306            let semaphore = Arc::new(tokio::sync::Semaphore::new(
307                self.config.application.max_concurrent_requests,
308            ));
309            let mut tasks = Vec::new();
310
311            for item in pending_items {
312                let semaphore = semaphore.clone();
313                let storage_service = self.storage_service.clone();
314                let search_service = self.search_service.clone();
315                let config = self.config.clone();
316                let client = self.client.clone();
317
318                let task = tokio::spawn(async move {
319                    let _permit = semaphore.acquire().await.unwrap();
320                    Self::process_crawl_item(item, storage_service, search_service, config, client)
321                        .await
322                });
323
324                tasks.push(task);
325            }
326
327            // Wait for all tasks to complete
328            let mut batch_success_count = 0;
329            let mut batch_failure_count = 0;
330            for task in tasks {
331                match task.await {
332                    Ok(_) => batch_success_count += 1,
333                    Err(e) => {
334                        error!("Crawl task failed: {}", e);
335                        batch_failure_count += 1;
336                    }
337                }
338            }
339
340            let batch_duration = batch_start.elapsed();
341            items_processed_count += batch_success_count + batch_failure_count;
342            total_processing_time += batch_duration;
343
344            info!(
345                "✅ Batch completed: {} successful, {} failed, took {:?} (total processed: {})",
346                batch_success_count, batch_failure_count, batch_duration, items_processed_count
347            );
348
349            // Add delay between batches
350            sleep(Duration::from_millis(
351                self.config.application.crawl_delay_ms,
352            ))
353            .await;
354        }
355    }
356
357    /// Log detailed crawl queue status for monitoring
358    async fn log_crawl_queue_status(&self, items_processed: usize, total_time: Duration) {
359        match self.storage_service.get_crawl_queue_stats().await {
360            Ok((pending, processing, completed, failed)) => {
361                let avg_time_per_item = if items_processed > 0 {
362                    total_time.as_millis() as f64 / items_processed as f64
363                } else {
364                    0.0
365                };
366
367                info!(
368                    "📊 CRAWL QUEUE STATUS: Pending: {}, Processing: {}, Completed: {}, Failed: {} | Processed: {} items | Avg time/item: {:.1}ms",
369                    pending, processing, completed, failed, items_processed, avg_time_per_item
370                );
371
372                // Log warning if queue is backing up
373                if pending > self.config.application.max_concurrent_requests * 3 {
374                    warn!(
375                        "⚠️ QUEUE BACKLOG: {} pending items detected, consider scaling up processing",
376                        pending
377                    );
378                }
379
380                // Log info about processing efficiency
381                if failed > 0 && completed > 0 {
382                    let failure_rate = (failed as f64 / (completed + failed) as f64) * 100.0;
383                    if failure_rate > 20.0 {
384                        warn!(
385                            "⚠️ HIGH FAILURE RATE: {:.1}% of crawl attempts are failing",
386                            failure_rate
387                        );
388                    } else {
389                        debug!("✅ Crawl success rate: {:.1}%", 100.0 - failure_rate);
390                    }
391                }
392            }
393            Err(e) => {
394                warn!("Failed to get crawl queue statistics: {}", e);
395            }
396        }
397    }
398
399    async fn process_crawl_item(
400        item: CrawlQueue,
401        storage_service: Arc<StorageService>,
402        search_service: Arc<SearchService>,
403        config: Arc<Config>,
404        client: Client,
405    ) -> Result<()> {
406        let start_time = std::time::Instant::now();
407        info!(
408            "🔍 PROCESSING: Starting to crawl {} (domain: {}, depth: {})",
409            item.url, item.domain, item.depth
410        );
411
412        // Update status to processing
413        if let Err(e) = storage_service
414            .update_crawl_status(&item.id, &item.domain, CrawlStatus::Processing, None)
415            .await
416        {
417            warn!("Failed to update crawl status to processing: {}", e);
418        }
419
420        match Self::crawl_and_index_page(&item, &storage_service, &search_service, &config, &client)
421            .await
422        {
423            Ok(_) => {
424                let processing_time = start_time.elapsed();
425                crate::log_and_capture!(
426                    info,
427                    "✅ INDEXED: Successfully processed and indexed {} in {:?}",
428                    item.url,
429                    processing_time
430                );
431                if let Err(e) = storage_service
432                    .update_crawl_status(&item.id, &item.domain, CrawlStatus::Completed, None)
433                    .await
434                {
435                    warn!("Failed to update crawl status to completed: {}", e);
436                }
437            }
438            Err(e) => {
439                let processing_time = start_time.elapsed();
440                error!(
441                    "❌ INDEX FAILED: Failed to process {} after {:?} - {}",
442                    item.url, processing_time, e
443                );
444                if let Err(update_err) = storage_service
445                    .update_crawl_status(
446                        &item.id,
447                        &item.domain,
448                        CrawlStatus::Failed,
449                        Some(e.to_string()),
450                    )
451                    .await
452                {
453                    warn!("Failed to update crawl status to failed: {}", update_err);
454                }
455            }
456        }
457
458        Ok(())
459    }
460
461    async fn crawl_and_index_page(
462        item: &CrawlQueue,
463        storage_service: &Arc<StorageService>,
464        search_service: &Arc<SearchService>,
465        config: &Arc<Config>,
466        client: &Client,
467    ) -> Result<()> {
468        debug!("Crawling URL: {}", item.url);
469
470        // Check robots.txt (simplified check)
471        if !Self::is_allowed_by_robots(&item.url, &config.application.user_agent, client).await? {
472            info!(
473                "🚫 ROBOTS.TXT BLOCKED: {} is disallowed by robots.txt, skipping",
474                item.url
475            );
476            return Ok(());
477        }
478
479        // Fetch the page
480        let response = client
481            .get(&item.url)
482            .send()
483            .await
484            .context("Failed to fetch page")?;
485
486        let status_code = response.status().as_u16();
487        let content_type = response
488            .headers()
489            .get("content-type")
490            .and_then(|v| v.to_str().ok())
491            .map(|s| s.to_string());
492
493        if !response.status().is_success() {
494            return Err(anyhow::anyhow!("HTTP error: {}", status_code));
495        }
496
497        let content = response
498            .text()
499            .await
500            .context("Failed to read response body")?;
501        let content_length = content.len();
502
503        debug!(
504            "📥 FETCHED: {} - {} bytes, status: {}",
505            item.url, content_length, status_code
506        );
507
508        // Only process HTML content
509        if let Some(ref ct) = content_type {
510            if !ct.contains("text/html") {
511                info!(
512                    "📄 CONTENT TYPE SKIPPED: {} has content type '{}', only HTML is indexed",
513                    item.url, ct
514                );
515                return Ok(());
516            }
517        }
518
519        // Parse HTML and extract all data synchronously
520        debug!(
521            "🔍 PARSING: Extracting content from {} ({} bytes)",
522            item.url,
523            content.len()
524        );
525        let (title, text_content, snippet, discovered_links) = {
526            let document = Html::parse_document(&content);
527
528            // Extract title
529            let title_selector = Selector::parse("title").unwrap();
530            let title = document
531                .select(&title_selector)
532                .next()
533                .map(|el| el.text().collect::<String>())
534                .unwrap_or_else(|| "Untitled".to_string());
535
536            // Extract text content
537            let text_content = Self::extract_text_content(&document);
538            debug!(
539                "📝 CONTENT EXTRACTED: {} - title: '{}', content: {} chars",
540                item.url,
541                title,
542                text_content.len()
543            );
544
545            // Generate snippet
546            let snippet = Self::generate_snippet(&text_content, 200);
547
548            // Extract links for further crawling (if within depth limit)
549            let discovered_links = if item.depth < config.application.max_crawl_depth {
550                debug!(
551                    "🔗 LINK DISCOVERY: Extracting links from {} (current depth: {}, max: {})",
552                    item.url, item.depth, config.application.max_crawl_depth
553                );
554                let base_url_parsed = Url::parse(&item.url)?;
555                let link_selector = Selector::parse("a[href]").unwrap();
556                let mut discovered_urls = HashSet::new();
557
558                // Collect all URLs first to avoid holding references across await
559                let mut hrefs = Vec::new();
560                for link in document.select(&link_selector) {
561                    if let Some(href) = link.value().attr("href") {
562                        hrefs.push(href.to_string());
563                    }
564                }
565
566                // Process the collected URLs
567                for href in hrefs {
568                    if let Ok(absolute_url) = base_url_parsed.join(&href) {
569                        let url_str = absolute_url.to_string();
570
571                        // Only crawl URLs from allowed domains
572                        if let Some(host) = absolute_url.host_str() {
573                            if config.is_domain_allowed(host) && !discovered_urls.contains(&url_str)
574                            {
575                                discovered_urls.insert(url_str.clone());
576                            }
577                        }
578                    }
579                }
580
581                discovered_urls.into_iter().collect::<Vec<_>>()
582            } else {
583                debug!(
584                    "🔗 LINK DISCOVERY SKIPPED: {} at maximum depth ({}/{}), not extracting links",
585                    item.url, item.depth, config.application.max_crawl_depth
586                );
587                Vec::new()
588            };
589
590            (title, text_content, snippet, discovered_links)
591        }; // document is dropped here
592
593        // Create webpage record
594        let webpage = WebPage {
595            id: Uuid::new_v4().to_string(),
596            url: item.url.clone(),
597            title: title.clone(),
598            content: text_content.clone(),
599            snippet: snippet.clone(),
600            domain: item.domain.clone(),
601            indexed_at: Utc::now(),
602            last_crawled: Utc::now(),
603            status_code,
604            content_type,
605            content_length: Some(content_length),
606        };
607
608        // Store in Cosmos DB
609        debug!("💾 STORING: Saving webpage {} to database", item.url);
610        storage_service
611            .store_webpage(&webpage)
612            .await
613            .context("Failed to store webpage")?;
614
615        // Index in Azure Cognitive Search
616        debug!("🔍 INDEXING: Adding {} to search index", item.url);
617        let search_doc = SearchDocument {
618            id: webpage.id.clone(),
619            title: webpage.title.clone(),
620            url: webpage.url.clone(),
621            content: webpage.content.clone(),
622            snippet: webpage.snippet.clone(),
623            domain: webpage.domain.clone(),
624            indexed_at: webpage.indexed_at,
625            last_crawled: webpage.last_crawled,
626        };
627
628        search_service
629            .index_document(&search_doc)
630            .await
631            .context("Failed to index document")?;
632
633        // Queue discovered links for further crawling
634        let num_discovered = discovered_links.len();
635
636        info!(
637            "🎯 INDEXED SUCCESSFULLY: {} - title: '{}', content: {} chars, links discovered: {}",
638            item.url,
639            title,
640            text_content.len(),
641            num_discovered
642        );
643
644        if num_discovered > 0 {
645            info!(
646                "🔗 QUEUEING LINKS: Adding {} discovered links to crawl queue from {}",
647                num_discovered, item.url
648            );
649        }
650        let mut successfully_queued = 0;
651        for url_str in discovered_links {
652            // Extract domain from the discovered URL instead of using parent's domain
653            let discovered_domain = if let Ok(parsed_url) = Url::parse(&url_str) {
654                parsed_url.host_str().unwrap_or(&item.domain).to_string()
655            } else {
656                item.domain.clone() // Fallback to parent domain if URL parsing fails
657            };
658
659            let crawl_item = CrawlQueue {
660                id: Self::url_to_id(&url_str),
661                url: url_str.clone(),
662                domain: discovered_domain,
663                depth: item.depth + 1,
664                status: CrawlStatus::Pending,
665                created_at: Utc::now(),
666                processed_at: None,
667                error_message: None,
668                retry_count: 0,
669            };
670
671            if let Err(e) = storage_service.queue_crawl(&crawl_item).await {
672                let error_str = e.to_string();
673                // Check if this is a conflict error (HTTP 409) indicating the URL already exists
674                if error_str.contains("409") || error_str.to_lowercase().contains("conflict") {
675                    debug!("URL already queued (skipping duplicate): {}", url_str);
676                } else {
677                    warn!("Failed to queue discovered link {}: {}", url_str, e);
678                }
679            } else {
680                info!(
681                    "✅ QUEUED LINK: {} (depth: {}) discovered from {}",
682                    url_str, crawl_item.depth, item.url
683                );
684                successfully_queued += 1;
685            }
686        }
687
688        if num_discovered > 0 {
689            info!(
690                "🔗 LINK DISCOVERY COMPLETE: Successfully queued {} of {} discovered links from {} (total indexed content: {} chars)",
691                successfully_queued, num_discovered, item.url, text_content.len()
692            );
693        }
694
695        Ok(())
696    }
697
698    fn extract_text_content(document: &Html) -> String {
699        // Remove script and style elements
700        let body_selector = Selector::parse("body").unwrap();
701
702        let body = document.select(&body_selector).next();
703        if let Some(body_el) = body {
704            let mut text_parts = Vec::new();
705            Self::extract_text_recursive(body_el, &mut text_parts);
706            text_parts.join(" ")
707        } else {
708            // Fallback to all text
709            document.root_element().text().collect::<Vec<_>>().join(" ")
710        }
711    }
712
713    fn extract_text_recursive(element: scraper::ElementRef, text_parts: &mut Vec<String>) {
714        for child in element.children() {
715            if let Some(text) = child.value().as_text() {
716                let text = text.trim();
717                if !text.is_empty() {
718                    text_parts.push(text.to_string());
719                }
720            } else if let Some(child_element) = scraper::ElementRef::wrap(child) {
721                // Skip script and style elements
722                if !matches!(
723                    child_element.value().name(),
724                    "script" | "style" | "nav" | "footer" | "aside"
725                ) {
726                    Self::extract_text_recursive(child_element, text_parts);
727                }
728            }
729        }
730    }
731
732    fn generate_snippet(content: &str, max_length: usize) -> String {
733        if content.len() <= max_length {
734            return content.to_string();
735        }
736
737        // Find a good breaking point near the limit
738        let truncated = &content[..max_length];
739        if let Some(last_space) = truncated.rfind(' ') {
740            format!("{}...", &content[..last_space])
741        } else {
742            format!("{}...", truncated)
743        }
744    }
745
746    async fn is_allowed_by_robots(url: &str, _user_agent: &str, client: &Client) -> Result<bool> {
747        let parsed_url = Url::parse(url)?;
748        let robots_url = format!(
749            "{}://{}/robots.txt",
750            parsed_url.scheme(),
751            parsed_url.host_str().unwrap_or_default()
752        );
753
754        // Simple robots.txt check - in production you'd want more sophisticated parsing
755        match client.get(&robots_url).send().await {
756            Ok(response) if response.status().is_success() => {
757                let robots_content = response.text().await.unwrap_or_default();
758                // Very basic check - look for "Disallow: /" for our user agent
759                if robots_content.contains("User-agent: *")
760                    && robots_content.contains("Disallow: /")
761                {
762                    debug!("Robots.txt found with restrictions for {}", url);
763                    return Ok(false);
764                }
765            }
766            _ => {
767                // If we can't fetch robots.txt, assume it's allowed
768                debug!("Could not fetch robots.txt for {}, assuming allowed", url);
769            }
770        }
771
772        Ok(true)
773    }
774}
775
776#[cfg(test)]
777mod tests {
778    use super::*;
779    use crate::config::{ApplicationConfig, AzureConfig};
780    use chrono::{Duration as ChronoDuration, Utc};
781
782    fn create_test_config() -> Arc<Config> {
783        Arc::new(Config {
784            environment: "test".to_string(),
785            azure: AzureConfig {
786                search_service_name: "test".to_string(),
787                search_api_key: "test".to_string(),
788                search_api_version: "2023-11-01".to_string(),
789                search_index_name: "test".to_string(),
790                cosmos_endpoint: "test".to_string(),
791                cosmos_key: "test".to_string(),
792                cosmos_database_name: "test".to_string(),
793                cosmos_container_name: "test".to_string(),
794            },
795            application: ApplicationConfig {
796                max_crawl_depth: 5,
797                crawl_delay_ms: 1000,
798                max_concurrent_requests: 10,
799                user_agent: "test".to_string(),
800                allowed_domains: vec!["example.com".to_string(), "test.org".to_string()],
801                periodic_index_interval_days: 7,
802                duplicate_removal_interval_hours: 24,
803                admin_api_key: "test-admin-key".to_string(),
804            },
805        })
806    }
807
808    #[tokio::test]
809    async fn test_queue_domains_with_check_skips_recently_indexed() {
810        // This test would require mocking the storage service
811        // For now, we'll test the configuration and basic logic
812        let config = create_test_config();
813
814        // Test that the periodic interval is correctly configured
815        assert_eq!(config.application.periodic_index_interval_days, 7);
816
817        // Test allowed domains logic
818        assert!(config.is_domain_allowed("example.com"));
819        assert!(config.is_domain_allowed("test.org"));
820        assert!(!config.is_domain_allowed("not-allowed.com"));
821    }
822
823    #[test]
824    fn test_domain_filtering_logic() {
825        let config = create_test_config();
826
827        // Test with various domain formats
828        let domains = vec![
829            "example.com".to_string(),
830            "https://test.org".to_string(),
831            "not-allowed.com".to_string(),
832        ];
833
834        let mut allowed_count = 0;
835        for domain in &domains {
836            let url = if domain.starts_with("http") {
837                domain.clone()
838            } else {
839                format!("https://{}", domain)
840            };
841
842            if let Ok(parsed_url) = Url::parse(&url) {
843                if let Some(host) = parsed_url.host_str() {
844                    if config.is_domain_allowed(host) {
845                        allowed_count += 1;
846                    }
847                }
848            }
849        }
850
851        assert_eq!(allowed_count, 2); // example.com and test.org should be allowed
852    }
853
854    #[test]
855    fn test_periodic_indexing_time_logic() {
856        let config = create_test_config();
857        let interval_days = config.application.periodic_index_interval_days as i64;
858
859        // Simulate current time
860        let now = Utc::now();
861
862        // Test recent indexing (should be skipped)
863        let recent_time = now - ChronoDuration::days(interval_days - 1);
864        let threshold = now - ChronoDuration::days(interval_days);
865        assert!(
866            recent_time > threshold,
867            "Recent indexing should be after threshold"
868        );
869
870        // Test old indexing (should be re-indexed)
871        let old_time = now - ChronoDuration::days(interval_days + 1);
872        assert!(
873            old_time <= threshold,
874            "Old indexing should be before or at threshold"
875        );
876
877        // Test exactly at threshold
878        let threshold_time = now - ChronoDuration::days(interval_days);
879        assert!(
880            threshold_time <= threshold,
881            "Threshold time should trigger re-indexing"
882        );
883    }
884
885    #[test]
886    fn test_trigger_force_process_queue() {
887        let _config = create_test_config();
888
889        // Create a mock IndexerService with just the force processing capability
890        let force_process_flag = Arc::new(AtomicBool::new(false));
891
892        // Simulate the trigger
893        force_process_flag.store(true, Ordering::Relaxed);
894        assert!(
895            force_process_flag.load(Ordering::Relaxed),
896            "Force flag should be set after trigger"
897        );
898
899        // Simulate processing the flag (like the queue processor would do)
900        force_process_flag.store(false, Ordering::Relaxed);
901        assert!(
902            !force_process_flag.load(Ordering::Relaxed),
903            "Force flag should be cleared after processing"
904        );
905    }
906
907    #[test]
908    fn test_domain_extraction_from_discovered_urls() {
909        use url::Url;
910
911        // Test domain extraction for different URL types
912        let test_cases = vec![
913            ("https://example.com/page1", "example.com"),
914            ("https://test.org/docs/intro", "test.org"),
915            (
916                "https://subdomain.example.com/path",
917                "subdomain.example.com",
918            ),
919            ("http://example.com:8080/api", "example.com"),
920        ];
921
922        for (url_str, expected_domain) in test_cases {
923            let parsed_url = Url::parse(url_str).expect("Should parse valid URL");
924            let extracted_domain = parsed_url.host_str().expect("Should have host");
925            assert_eq!(
926                extracted_domain, expected_domain,
927                "Domain extraction failed for URL: {}",
928                url_str
929            );
930        }
931    }
932
933    #[test]
934    fn test_crawl_queue_domain_assignment() {
935        use crate::storage::{CrawlQueue, CrawlStatus};
936        use chrono::Utc;
937        use uuid::Uuid;
938
939        // Simulate the corrected logic for domain assignment
940        let parent_domain = "example.com";
941        let discovered_urls = [
942            "https://example.com/page2".to_string(),
943            "https://test.org/external".to_string(),
944            "https://subdomain.example.com/sub".to_string(),
945        ];
946
947        let expected_domains = ["example.com", "test.org", "subdomain.example.com"];
948
949        for (url_str, expected_domain) in discovered_urls.iter().zip(expected_domains.iter()) {
950            // Extract domain from the discovered URL instead of using parent's domain
951            let discovered_domain = if let Ok(parsed_url) = Url::parse(url_str) {
952                parsed_url.host_str().unwrap_or(parent_domain).to_string()
953            } else {
954                parent_domain.to_string() // Fallback to parent domain if URL parsing fails
955            };
956
957            let crawl_item = CrawlQueue {
958                id: Uuid::new_v4().to_string(),
959                url: url_str.clone(),
960                domain: discovered_domain.clone(),
961                depth: 1,
962                status: CrawlStatus::Pending,
963                created_at: Utc::now(),
964                processed_at: None,
965                error_message: None,
966                retry_count: 0,
967            };
968
969            assert_eq!(
970                crawl_item.domain, *expected_domain,
971                "Domain should be extracted from discovered URL: {}",
972                url_str
973            );
974            assert_eq!(crawl_item.url, *url_str, "URL should match discovered URL");
975        }
976    }
977
978    #[test]
979    fn test_domain_extraction_includes_subdomains_not_just_tld() {
980        use url::Url;
981
982        // Test that we extract the full domain including subdomains, not just TLD
983        let test_cases = vec![
984            ("https://api.example.com/v1/users", "api.example.com"), // Should include subdomain
985            (
986                "https://blog.subdomain.example.com/post",
987                "blog.subdomain.example.com",
988            ), // Should include full subdomain chain
989            (
990                "https://cdn.assets.company.org/images",
991                "cdn.assets.company.org",
992            ), // Should include all subdomains
993            ("https://example.com/page", "example.com"), // Root domain without subdomain
994            ("https://www.example.co.uk/page", "www.example.co.uk"), // Should include www and country TLD
995            ("http://localhost:8080/test", "localhost"),             // Should work with localhost
996        ];
997
998        for (url_str, expected_domain) in test_cases {
999            let parsed_url = Url::parse(url_str).expect("Should parse valid URL");
1000            let extracted_domain = parsed_url.host_str().expect("Should have host");
1001            assert_eq!(
1002                extracted_domain, expected_domain,
1003                "Domain extraction should include full subdomain path for URL: {}",
1004                url_str
1005            );
1006
1007            // Verify we're not just extracting TLD
1008            if expected_domain.contains('.') {
1009                let tld_only = expected_domain.split('.').next_back().unwrap();
1010                assert_ne!(
1011                    extracted_domain, tld_only,
1012                    "Should extract full domain, not just TLD '{}' for URL: {}",
1013                    tld_only, url_str
1014                );
1015            }
1016        }
1017    }
1018
1019    #[test]
1020    fn test_url_to_id_generates_deterministic_ids() {
1021        // Test that the same URL generates the same ID consistently
1022        let url1 = "https://example.com/page";
1023        let url2 = "https://example.com/page";
1024        let url3 = "https://example.com/different";
1025
1026        let id1 = IndexerService::url_to_id(url1);
1027        let id2 = IndexerService::url_to_id(url2);
1028        let id3 = IndexerService::url_to_id(url3);
1029
1030        assert_eq!(id1, id2, "Same URL should generate same ID");
1031        assert_ne!(id1, id3, "Different URLs should generate different IDs");
1032
1033        // IDs should be valid hex strings
1034        assert!(id1.chars().all(|c| c.is_ascii_hexdigit()));
1035        assert!(id3.chars().all(|c| c.is_ascii_hexdigit()));
1036
1037        // IDs should be 64 characters (SHA256 hex)
1038        assert_eq!(id1.len(), 64);
1039        assert_eq!(id3.len(), 64);
1040    }
1041
1042    #[test]
1043    fn test_url_deduplication_in_crawl_queue() {
1044        use crate::storage::{CrawlQueue, CrawlStatus};
1045        use chrono::Utc;
1046
1047        // Test that the same URL gets the same ID in CrawlQueue items
1048        let url = "https://example.com/test-page";
1049        let domain = "example.com";
1050
1051        let crawl_item1 = CrawlQueue {
1052            id: IndexerService::url_to_id(url),
1053            url: url.to_string(),
1054            domain: domain.to_string(),
1055            depth: 1,
1056            status: CrawlStatus::Pending,
1057            created_at: Utc::now(),
1058            processed_at: None,
1059            error_message: None,
1060            retry_count: 0,
1061        };
1062
1063        let crawl_item2 = CrawlQueue {
1064            id: IndexerService::url_to_id(url),
1065            url: url.to_string(),
1066            domain: domain.to_string(),
1067            depth: 2, // Different depth
1068            status: CrawlStatus::Pending,
1069            created_at: Utc::now(),
1070            processed_at: None,
1071            error_message: None,
1072            retry_count: 0,
1073        };
1074
1075        // Both items should have the same ID since they have the same URL
1076        assert_eq!(
1077            crawl_item1.id, crawl_item2.id,
1078            "Same URL should produce same ID for deduplication"
1079        );
1080        assert_eq!(crawl_item1.url, crawl_item2.url, "URLs should match");
1081    }
1082
1083    #[test]
1084    fn test_link_discovery_queues_discovered_urls() {
1085        use crate::storage::{CrawlQueue, CrawlStatus};
1086        use chrono::Utc;
1087
1088        // Test that discovered links from a page get properly queued with correct metadata
1089        let base_url = "https://clojure.org";
1090        let discovered_link = "https://clojure.org/guides/getting_started";
1091
1092        // Simulate creating a crawl queue item for a discovered link
1093        let base_item = CrawlQueue {
1094            id: IndexerService::url_to_id(base_url),
1095            url: base_url.to_string(),
1096            domain: "clojure.org".to_string(),
1097            depth: 0, // Root domain
1098            status: CrawlStatus::Completed,
1099            created_at: Utc::now(),
1100            processed_at: Some(Utc::now()),
1101            error_message: None,
1102            retry_count: 0,
1103        };
1104
1105        // This would be created when processing the base_item and discovering links
1106        let discovered_item = CrawlQueue {
1107            id: IndexerService::url_to_id(discovered_link),
1108            url: discovered_link.to_string(),
1109            domain: "clojure.org".to_string(),
1110            depth: base_item.depth + 1,   // Should be depth 1
1111            status: CrawlStatus::Pending, // Should be pending for processing
1112            created_at: Utc::now(),
1113            processed_at: None, // Not processed yet
1114            error_message: None,
1115            retry_count: 0,
1116        };
1117
1118        // Validate the discovered item has the expected properties
1119        assert_eq!(discovered_item.url, discovered_link);
1120        assert_eq!(discovered_item.domain, "clojure.org");
1121        assert_eq!(discovered_item.depth, 1);
1122        assert_eq!(discovered_item.status as u8, CrawlStatus::Pending as u8);
1123        assert!(discovered_item.processed_at.is_none());
1124
1125        // Verify the URLs are different but from the same domain
1126        assert_ne!(base_item.url, discovered_item.url);
1127        assert_eq!(base_item.domain, discovered_item.domain);
1128        assert!(discovered_item.url.starts_with(&base_item.url));
1129    }
1130}