1use anyhow::{Context, Result};
2use chrono::{Duration as ChronoDuration, Utc};
3use reqwest::Client;
4use scraper::{Html, Selector};
5use sha2::{Digest, Sha256};
6use std::collections::HashSet;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::time::sleep;
11use tracing::{debug, error, info, warn};
12use url::Url;
13use uuid::Uuid;
14
15use crate::search::SearchDocument;
16use crate::storage::{CrawlQueue, CrawlStatus, WebPage};
17use crate::{Config, SearchService, StorageService};
18
19pub struct IndexerService {
20 config: Arc<Config>,
21 storage_service: Arc<StorageService>,
22 search_service: Arc<SearchService>,
23 client: Client,
24 force_process_flag: Arc<AtomicBool>,
25}
26
27impl IndexerService {
28 fn url_to_id(url: &str) -> String {
30 let mut hasher = Sha256::new();
31 hasher.update(url.as_bytes());
32 let hash = hasher.finalize();
33 format!("{:x}", hash)
34 }
35
36 pub async fn new(
37 config: Arc<Config>,
38 storage_service: Arc<StorageService>,
39 search_service: Arc<SearchService>,
40 ) -> Result<Self> {
41 let client = Client::builder()
42 .user_agent(&config.application.user_agent)
43 .timeout(Duration::from_secs(30))
44 .build()
45 .context("Failed to create HTTP client for indexer")?;
46
47 Ok(Self {
48 config,
49 storage_service,
50 search_service,
51 client,
52 force_process_flag: Arc::new(AtomicBool::new(false)),
53 })
54 }
55
56 pub async fn queue_domains(&self, domains: &[String]) -> Result<usize> {
58 self.queue_domains_with_check(domains, true).await
59 }
60
61 pub async fn queue_domains_with_check(
64 &self,
65 domains: &[String],
66 check_last_indexed: bool,
67 ) -> Result<usize> {
68 let mut queued_count = 0;
69
70 for domain in domains {
71 let url = if domain.starts_with("http") {
72 domain.clone()
73 } else {
74 format!("https://{}", domain)
75 };
76
77 let parsed_url = Url::parse(&url).with_context(|| format!("Invalid URL: {}", url))?;
78
79 let domain_name = parsed_url
80 .host_str()
81 .ok_or_else(|| anyhow::anyhow!("No host in URL: {}", url))?
82 .to_string();
83
84 if !self.config.is_domain_allowed(&domain_name) {
86 warn!("Domain not in allowed list, skipping: {}", domain_name);
87 continue;
88 }
89
90 if check_last_indexed {
92 if let Ok(Some(last_indexed)) = self
93 .storage_service
94 .get_domain_last_indexed(&domain_name)
95 .await
96 {
97 let threshold = Utc::now()
98 - ChronoDuration::days(
99 self.config.application.periodic_index_interval_days as i64,
100 );
101 if last_indexed > threshold {
102 crate::log_and_capture!(info,
103 "📋 SKIPPING DOMAIN: {} was recently indexed on {} (within {} day interval), skipping indexing",
104 domain_name,
105 last_indexed.format("%Y-%m-%d %H:%M:%S UTC"),
106 self.config.application.periodic_index_interval_days
107 );
108 continue;
109 } else {
110 crate::log_and_capture!(info,
111 "🔄 INDEXING DOMAIN: {} was last indexed on {} (older than {} day interval), will re-index",
112 domain_name,
113 last_indexed.format("%Y-%m-%d %H:%M:%S UTC"),
114 self.config.application.periodic_index_interval_days
115 );
116 }
117 } else {
118 info!(
119 "🆕 INDEXING DOMAIN: {} has never been indexed, adding to queue",
120 domain_name
121 );
122 }
123 } else {
124 info!(
125 "➡️ INDEXING DOMAIN: {} added to queue (forced indexing, no recency check)",
126 domain_name
127 );
128 }
129
130 let crawl_item = CrawlQueue {
131 id: Uuid::new_v4().to_string(),
132 url: url.clone(),
133 domain: domain_name,
134 depth: 0,
135 status: CrawlStatus::Pending,
136 created_at: Utc::now(),
137 processed_at: None,
138 error_message: None,
139 retry_count: 0,
140 };
141
142 match self.storage_service.queue_crawl(&crawl_item).await {
143 Ok(_) => {
144 info!(
145 "✅ QUEUED: {} successfully added to crawl queue (depth: {})",
146 url, crawl_item.depth
147 );
148 queued_count += 1;
149 }
150 Err(e) => {
151 warn!("❌ QUEUE FAILED: Unable to queue domain {} - {}", url, e);
152 }
153 }
154 }
155
156 info!(
157 "📊 DOMAIN QUEUEING SUMMARY: Processed {} domains, queued {} for indexing",
158 domains.len(),
159 queued_count
160 );
161 Ok(queued_count)
162 }
163
164 pub async fn start_periodic_indexing(&self) -> Result<()> {
167 info!(
168 "Starting periodic indexing service with interval of {} days",
169 self.config.application.periodic_index_interval_days
170 );
171
172 let check_interval = Duration::from_secs(6 * 60 * 60); loop {
176 sleep(check_interval).await;
177
178 info!("Running periodic domain indexing check");
179
180 match self.check_and_queue_stale_domains().await {
181 Ok(count) => {
182 if count > 0 {
183 info!("Queued {} domains for periodic re-indexing", count);
184 } else {
185 debug!("No domains need re-indexing at this time");
186 }
187 }
188 Err(e) => {
189 error!("Failed to check and queue stale domains: {}", e);
190 }
191 }
192 }
193 }
194
195 pub async fn start_periodic_duplicate_removal(&self) -> Result<()> {
198 info!(
199 "Starting periodic duplicate removal service with interval of {} hours",
200 self.config.application.duplicate_removal_interval_hours
201 );
202
203 let check_interval =
205 Duration::from_secs(self.config.application.duplicate_removal_interval_hours * 60 * 60);
206
207 loop {
208 sleep(check_interval).await;
209
210 info!("Running periodic duplicate removal");
211
212 match self.storage_service.remove_duplicates().await {
213 Ok(count) => {
214 if count > 0 {
215 info!("Removed {} duplicate entries", count);
216 } else {
217 debug!("No duplicates found during this run");
218 }
219 }
220 Err(e) => {
221 error!("Failed to remove duplicates: {}", e);
222 }
223 }
224 }
225 }
226
227 pub fn trigger_force_process_queue(&self) -> Result<()> {
229 self.force_process_flag.store(true, Ordering::Relaxed);
230 info!("🚀 Force queue processing triggered");
231 Ok(())
232 }
233
234 async fn check_and_queue_stale_domains(&self) -> Result<usize> {
235 let allowed_domains = &self.config.application.allowed_domains;
237
238 self.queue_domains_with_check(allowed_domains, true).await
240 }
241
242 pub async fn process_crawl_queue(&self) -> Result<()> {
243 info!("🚀 Starting crawl queue processing service");
244
245 let mut last_status_log = std::time::Instant::now();
246 let status_log_interval = Duration::from_secs(30); let mut items_processed_count = 0;
248 let mut total_processing_time = Duration::from_secs(0);
249
250 loop {
251 if last_status_log.elapsed() >= status_log_interval {
253 self.log_crawl_queue_status(items_processed_count, total_processing_time)
254 .await;
255 last_status_log = std::time::Instant::now();
256 }
257
258 let pending_items = self
259 .storage_service
260 .get_pending_crawl_items(self.config.application.max_concurrent_requests)
261 .await?;
262
263 if pending_items.is_empty() {
264 debug!("📭 No pending crawl items, entering idle mode");
265
266 let mut sleep_duration = Duration::from_secs(10);
268 loop {
269 if self.force_process_flag.load(Ordering::Relaxed) {
271 self.force_process_flag.store(false, Ordering::Relaxed);
272 info!("⚡ Force queue processing signal received - checking for items immediately");
273 break;
274 }
275
276 let chunk_duration = Duration::from_secs(1);
278 sleep(chunk_duration).await;
279
280 if sleep_duration <= chunk_duration {
281 break;
282 }
283 sleep_duration -= chunk_duration;
284 }
285 continue;
286 }
287
288 let batch_start = std::time::Instant::now();
289
290 let root_domains = pending_items.iter().filter(|item| item.depth == 0).count();
292 let discovered_links = pending_items.iter().filter(|item| item.depth > 0).count();
293
294 info!(
295 "⚙️ Processing batch of {} crawl items: {} root domains, {} discovered links",
296 pending_items.len(),
297 root_domains,
298 discovered_links
299 );
300
301 if discovered_links > 0 {
302 info!("🎯 PROCESSING DISCOVERED LINKS: The queue processor is now processing {} previously discovered links - this should resolve the missing URLs issue!", discovered_links);
303 }
304
305 let semaphore = Arc::new(tokio::sync::Semaphore::new(
307 self.config.application.max_concurrent_requests,
308 ));
309 let mut tasks = Vec::new();
310
311 for item in pending_items {
312 let semaphore = semaphore.clone();
313 let storage_service = self.storage_service.clone();
314 let search_service = self.search_service.clone();
315 let config = self.config.clone();
316 let client = self.client.clone();
317
318 let task = tokio::spawn(async move {
319 let _permit = semaphore.acquire().await.unwrap();
320 Self::process_crawl_item(item, storage_service, search_service, config, client)
321 .await
322 });
323
324 tasks.push(task);
325 }
326
327 let mut batch_success_count = 0;
329 let mut batch_failure_count = 0;
330 for task in tasks {
331 match task.await {
332 Ok(_) => batch_success_count += 1,
333 Err(e) => {
334 error!("Crawl task failed: {}", e);
335 batch_failure_count += 1;
336 }
337 }
338 }
339
340 let batch_duration = batch_start.elapsed();
341 items_processed_count += batch_success_count + batch_failure_count;
342 total_processing_time += batch_duration;
343
344 info!(
345 "✅ Batch completed: {} successful, {} failed, took {:?} (total processed: {})",
346 batch_success_count, batch_failure_count, batch_duration, items_processed_count
347 );
348
349 sleep(Duration::from_millis(
351 self.config.application.crawl_delay_ms,
352 ))
353 .await;
354 }
355 }
356
357 async fn log_crawl_queue_status(&self, items_processed: usize, total_time: Duration) {
359 match self.storage_service.get_crawl_queue_stats().await {
360 Ok((pending, processing, completed, failed)) => {
361 let avg_time_per_item = if items_processed > 0 {
362 total_time.as_millis() as f64 / items_processed as f64
363 } else {
364 0.0
365 };
366
367 info!(
368 "📊 CRAWL QUEUE STATUS: Pending: {}, Processing: {}, Completed: {}, Failed: {} | Processed: {} items | Avg time/item: {:.1}ms",
369 pending, processing, completed, failed, items_processed, avg_time_per_item
370 );
371
372 if pending > self.config.application.max_concurrent_requests * 3 {
374 warn!(
375 "⚠️ QUEUE BACKLOG: {} pending items detected, consider scaling up processing",
376 pending
377 );
378 }
379
380 if failed > 0 && completed > 0 {
382 let failure_rate = (failed as f64 / (completed + failed) as f64) * 100.0;
383 if failure_rate > 20.0 {
384 warn!(
385 "⚠️ HIGH FAILURE RATE: {:.1}% of crawl attempts are failing",
386 failure_rate
387 );
388 } else {
389 debug!("✅ Crawl success rate: {:.1}%", 100.0 - failure_rate);
390 }
391 }
392 }
393 Err(e) => {
394 warn!("Failed to get crawl queue statistics: {}", e);
395 }
396 }
397 }
398
399 async fn process_crawl_item(
400 item: CrawlQueue,
401 storage_service: Arc<StorageService>,
402 search_service: Arc<SearchService>,
403 config: Arc<Config>,
404 client: Client,
405 ) -> Result<()> {
406 let start_time = std::time::Instant::now();
407 info!(
408 "🔍 PROCESSING: Starting to crawl {} (domain: {}, depth: {})",
409 item.url, item.domain, item.depth
410 );
411
412 if let Err(e) = storage_service
414 .update_crawl_status(&item.id, &item.domain, CrawlStatus::Processing, None)
415 .await
416 {
417 warn!("Failed to update crawl status to processing: {}", e);
418 }
419
420 match Self::crawl_and_index_page(&item, &storage_service, &search_service, &config, &client)
421 .await
422 {
423 Ok(_) => {
424 let processing_time = start_time.elapsed();
425 crate::log_and_capture!(
426 info,
427 "✅ INDEXED: Successfully processed and indexed {} in {:?}",
428 item.url,
429 processing_time
430 );
431 if let Err(e) = storage_service
432 .update_crawl_status(&item.id, &item.domain, CrawlStatus::Completed, None)
433 .await
434 {
435 warn!("Failed to update crawl status to completed: {}", e);
436 }
437 }
438 Err(e) => {
439 let processing_time = start_time.elapsed();
440 error!(
441 "❌ INDEX FAILED: Failed to process {} after {:?} - {}",
442 item.url, processing_time, e
443 );
444 if let Err(update_err) = storage_service
445 .update_crawl_status(
446 &item.id,
447 &item.domain,
448 CrawlStatus::Failed,
449 Some(e.to_string()),
450 )
451 .await
452 {
453 warn!("Failed to update crawl status to failed: {}", update_err);
454 }
455 }
456 }
457
458 Ok(())
459 }
460
461 async fn crawl_and_index_page(
462 item: &CrawlQueue,
463 storage_service: &Arc<StorageService>,
464 search_service: &Arc<SearchService>,
465 config: &Arc<Config>,
466 client: &Client,
467 ) -> Result<()> {
468 debug!("Crawling URL: {}", item.url);
469
470 if !Self::is_allowed_by_robots(&item.url, &config.application.user_agent, client).await? {
472 info!(
473 "🚫 ROBOTS.TXT BLOCKED: {} is disallowed by robots.txt, skipping",
474 item.url
475 );
476 return Ok(());
477 }
478
479 let response = client
481 .get(&item.url)
482 .send()
483 .await
484 .context("Failed to fetch page")?;
485
486 let status_code = response.status().as_u16();
487 let content_type = response
488 .headers()
489 .get("content-type")
490 .and_then(|v| v.to_str().ok())
491 .map(|s| s.to_string());
492
493 if !response.status().is_success() {
494 return Err(anyhow::anyhow!("HTTP error: {}", status_code));
495 }
496
497 let content = response
498 .text()
499 .await
500 .context("Failed to read response body")?;
501 let content_length = content.len();
502
503 debug!(
504 "📥 FETCHED: {} - {} bytes, status: {}",
505 item.url, content_length, status_code
506 );
507
508 if let Some(ref ct) = content_type {
510 if !ct.contains("text/html") {
511 info!(
512 "📄 CONTENT TYPE SKIPPED: {} has content type '{}', only HTML is indexed",
513 item.url, ct
514 );
515 return Ok(());
516 }
517 }
518
519 debug!(
521 "🔍 PARSING: Extracting content from {} ({} bytes)",
522 item.url,
523 content.len()
524 );
525 let (title, text_content, snippet, discovered_links) = {
526 let document = Html::parse_document(&content);
527
528 let title_selector = Selector::parse("title").unwrap();
530 let title = document
531 .select(&title_selector)
532 .next()
533 .map(|el| el.text().collect::<String>())
534 .unwrap_or_else(|| "Untitled".to_string());
535
536 let text_content = Self::extract_text_content(&document);
538 debug!(
539 "📝 CONTENT EXTRACTED: {} - title: '{}', content: {} chars",
540 item.url,
541 title,
542 text_content.len()
543 );
544
545 let snippet = Self::generate_snippet(&text_content, 200);
547
548 let discovered_links = if item.depth < config.application.max_crawl_depth {
550 debug!(
551 "🔗 LINK DISCOVERY: Extracting links from {} (current depth: {}, max: {})",
552 item.url, item.depth, config.application.max_crawl_depth
553 );
554 let base_url_parsed = Url::parse(&item.url)?;
555 let link_selector = Selector::parse("a[href]").unwrap();
556 let mut discovered_urls = HashSet::new();
557
558 let mut hrefs = Vec::new();
560 for link in document.select(&link_selector) {
561 if let Some(href) = link.value().attr("href") {
562 hrefs.push(href.to_string());
563 }
564 }
565
566 for href in hrefs {
568 if let Ok(absolute_url) = base_url_parsed.join(&href) {
569 let url_str = absolute_url.to_string();
570
571 if let Some(host) = absolute_url.host_str() {
573 if config.is_domain_allowed(host) && !discovered_urls.contains(&url_str)
574 {
575 discovered_urls.insert(url_str.clone());
576 }
577 }
578 }
579 }
580
581 discovered_urls.into_iter().collect::<Vec<_>>()
582 } else {
583 debug!(
584 "🔗 LINK DISCOVERY SKIPPED: {} at maximum depth ({}/{}), not extracting links",
585 item.url, item.depth, config.application.max_crawl_depth
586 );
587 Vec::new()
588 };
589
590 (title, text_content, snippet, discovered_links)
591 }; let webpage = WebPage {
595 id: Uuid::new_v4().to_string(),
596 url: item.url.clone(),
597 title: title.clone(),
598 content: text_content.clone(),
599 snippet: snippet.clone(),
600 domain: item.domain.clone(),
601 indexed_at: Utc::now(),
602 last_crawled: Utc::now(),
603 status_code,
604 content_type,
605 content_length: Some(content_length),
606 };
607
608 debug!("💾 STORING: Saving webpage {} to database", item.url);
610 storage_service
611 .store_webpage(&webpage)
612 .await
613 .context("Failed to store webpage")?;
614
615 debug!("🔍 INDEXING: Adding {} to search index", item.url);
617 let search_doc = SearchDocument {
618 id: webpage.id.clone(),
619 title: webpage.title.clone(),
620 url: webpage.url.clone(),
621 content: webpage.content.clone(),
622 snippet: webpage.snippet.clone(),
623 domain: webpage.domain.clone(),
624 indexed_at: webpage.indexed_at,
625 last_crawled: webpage.last_crawled,
626 };
627
628 search_service
629 .index_document(&search_doc)
630 .await
631 .context("Failed to index document")?;
632
633 let num_discovered = discovered_links.len();
635
636 info!(
637 "🎯 INDEXED SUCCESSFULLY: {} - title: '{}', content: {} chars, links discovered: {}",
638 item.url,
639 title,
640 text_content.len(),
641 num_discovered
642 );
643
644 if num_discovered > 0 {
645 info!(
646 "🔗 QUEUEING LINKS: Adding {} discovered links to crawl queue from {}",
647 num_discovered, item.url
648 );
649 }
650 let mut successfully_queued = 0;
651 for url_str in discovered_links {
652 let discovered_domain = if let Ok(parsed_url) = Url::parse(&url_str) {
654 parsed_url.host_str().unwrap_or(&item.domain).to_string()
655 } else {
656 item.domain.clone() };
658
659 let crawl_item = CrawlQueue {
660 id: Self::url_to_id(&url_str),
661 url: url_str.clone(),
662 domain: discovered_domain,
663 depth: item.depth + 1,
664 status: CrawlStatus::Pending,
665 created_at: Utc::now(),
666 processed_at: None,
667 error_message: None,
668 retry_count: 0,
669 };
670
671 if let Err(e) = storage_service.queue_crawl(&crawl_item).await {
672 let error_str = e.to_string();
673 if error_str.contains("409") || error_str.to_lowercase().contains("conflict") {
675 debug!("URL already queued (skipping duplicate): {}", url_str);
676 } else {
677 warn!("Failed to queue discovered link {}: {}", url_str, e);
678 }
679 } else {
680 info!(
681 "✅ QUEUED LINK: {} (depth: {}) discovered from {}",
682 url_str, crawl_item.depth, item.url
683 );
684 successfully_queued += 1;
685 }
686 }
687
688 if num_discovered > 0 {
689 info!(
690 "🔗 LINK DISCOVERY COMPLETE: Successfully queued {} of {} discovered links from {} (total indexed content: {} chars)",
691 successfully_queued, num_discovered, item.url, text_content.len()
692 );
693 }
694
695 Ok(())
696 }
697
698 fn extract_text_content(document: &Html) -> String {
699 let body_selector = Selector::parse("body").unwrap();
701
702 let body = document.select(&body_selector).next();
703 if let Some(body_el) = body {
704 let mut text_parts = Vec::new();
705 Self::extract_text_recursive(body_el, &mut text_parts);
706 text_parts.join(" ")
707 } else {
708 document.root_element().text().collect::<Vec<_>>().join(" ")
710 }
711 }
712
713 fn extract_text_recursive(element: scraper::ElementRef, text_parts: &mut Vec<String>) {
714 for child in element.children() {
715 if let Some(text) = child.value().as_text() {
716 let text = text.trim();
717 if !text.is_empty() {
718 text_parts.push(text.to_string());
719 }
720 } else if let Some(child_element) = scraper::ElementRef::wrap(child) {
721 if !matches!(
723 child_element.value().name(),
724 "script" | "style" | "nav" | "footer" | "aside"
725 ) {
726 Self::extract_text_recursive(child_element, text_parts);
727 }
728 }
729 }
730 }
731
732 fn generate_snippet(content: &str, max_length: usize) -> String {
733 if content.len() <= max_length {
734 return content.to_string();
735 }
736
737 let truncated = &content[..max_length];
739 if let Some(last_space) = truncated.rfind(' ') {
740 format!("{}...", &content[..last_space])
741 } else {
742 format!("{}...", truncated)
743 }
744 }
745
746 async fn is_allowed_by_robots(url: &str, _user_agent: &str, client: &Client) -> Result<bool> {
747 let parsed_url = Url::parse(url)?;
748 let robots_url = format!(
749 "{}://{}/robots.txt",
750 parsed_url.scheme(),
751 parsed_url.host_str().unwrap_or_default()
752 );
753
754 match client.get(&robots_url).send().await {
756 Ok(response) if response.status().is_success() => {
757 let robots_content = response.text().await.unwrap_or_default();
758 if robots_content.contains("User-agent: *")
760 && robots_content.contains("Disallow: /")
761 {
762 debug!("Robots.txt found with restrictions for {}", url);
763 return Ok(false);
764 }
765 }
766 _ => {
767 debug!("Could not fetch robots.txt for {}, assuming allowed", url);
769 }
770 }
771
772 Ok(true)
773 }
774}
775
776#[cfg(test)]
777mod tests {
778 use super::*;
779 use crate::config::{ApplicationConfig, AzureConfig};
780 use chrono::{Duration as ChronoDuration, Utc};
781
782 fn create_test_config() -> Arc<Config> {
783 Arc::new(Config {
784 environment: "test".to_string(),
785 azure: AzureConfig {
786 search_service_name: "test".to_string(),
787 search_api_key: "test".to_string(),
788 search_api_version: "2023-11-01".to_string(),
789 search_index_name: "test".to_string(),
790 cosmos_endpoint: "test".to_string(),
791 cosmos_key: "test".to_string(),
792 cosmos_database_name: "test".to_string(),
793 cosmos_container_name: "test".to_string(),
794 },
795 application: ApplicationConfig {
796 max_crawl_depth: 5,
797 crawl_delay_ms: 1000,
798 max_concurrent_requests: 10,
799 user_agent: "test".to_string(),
800 allowed_domains: vec!["example.com".to_string(), "test.org".to_string()],
801 periodic_index_interval_days: 7,
802 duplicate_removal_interval_hours: 24,
803 admin_api_key: "test-admin-key".to_string(),
804 },
805 })
806 }
807
808 #[tokio::test]
809 async fn test_queue_domains_with_check_skips_recently_indexed() {
810 let config = create_test_config();
813
814 assert_eq!(config.application.periodic_index_interval_days, 7);
816
817 assert!(config.is_domain_allowed("example.com"));
819 assert!(config.is_domain_allowed("test.org"));
820 assert!(!config.is_domain_allowed("not-allowed.com"));
821 }
822
823 #[test]
824 fn test_domain_filtering_logic() {
825 let config = create_test_config();
826
827 let domains = vec![
829 "example.com".to_string(),
830 "https://test.org".to_string(),
831 "not-allowed.com".to_string(),
832 ];
833
834 let mut allowed_count = 0;
835 for domain in &domains {
836 let url = if domain.starts_with("http") {
837 domain.clone()
838 } else {
839 format!("https://{}", domain)
840 };
841
842 if let Ok(parsed_url) = Url::parse(&url) {
843 if let Some(host) = parsed_url.host_str() {
844 if config.is_domain_allowed(host) {
845 allowed_count += 1;
846 }
847 }
848 }
849 }
850
851 assert_eq!(allowed_count, 2); }
853
854 #[test]
855 fn test_periodic_indexing_time_logic() {
856 let config = create_test_config();
857 let interval_days = config.application.periodic_index_interval_days as i64;
858
859 let now = Utc::now();
861
862 let recent_time = now - ChronoDuration::days(interval_days - 1);
864 let threshold = now - ChronoDuration::days(interval_days);
865 assert!(
866 recent_time > threshold,
867 "Recent indexing should be after threshold"
868 );
869
870 let old_time = now - ChronoDuration::days(interval_days + 1);
872 assert!(
873 old_time <= threshold,
874 "Old indexing should be before or at threshold"
875 );
876
877 let threshold_time = now - ChronoDuration::days(interval_days);
879 assert!(
880 threshold_time <= threshold,
881 "Threshold time should trigger re-indexing"
882 );
883 }
884
885 #[test]
886 fn test_trigger_force_process_queue() {
887 let _config = create_test_config();
888
889 let force_process_flag = Arc::new(AtomicBool::new(false));
891
892 force_process_flag.store(true, Ordering::Relaxed);
894 assert!(
895 force_process_flag.load(Ordering::Relaxed),
896 "Force flag should be set after trigger"
897 );
898
899 force_process_flag.store(false, Ordering::Relaxed);
901 assert!(
902 !force_process_flag.load(Ordering::Relaxed),
903 "Force flag should be cleared after processing"
904 );
905 }
906
907 #[test]
908 fn test_domain_extraction_from_discovered_urls() {
909 use url::Url;
910
911 let test_cases = vec![
913 ("https://example.com/page1", "example.com"),
914 ("https://test.org/docs/intro", "test.org"),
915 (
916 "https://subdomain.example.com/path",
917 "subdomain.example.com",
918 ),
919 ("http://example.com:8080/api", "example.com"),
920 ];
921
922 for (url_str, expected_domain) in test_cases {
923 let parsed_url = Url::parse(url_str).expect("Should parse valid URL");
924 let extracted_domain = parsed_url.host_str().expect("Should have host");
925 assert_eq!(
926 extracted_domain, expected_domain,
927 "Domain extraction failed for URL: {}",
928 url_str
929 );
930 }
931 }
932
933 #[test]
934 fn test_crawl_queue_domain_assignment() {
935 use crate::storage::{CrawlQueue, CrawlStatus};
936 use chrono::Utc;
937 use uuid::Uuid;
938
939 let parent_domain = "example.com";
941 let discovered_urls = [
942 "https://example.com/page2".to_string(),
943 "https://test.org/external".to_string(),
944 "https://subdomain.example.com/sub".to_string(),
945 ];
946
947 let expected_domains = ["example.com", "test.org", "subdomain.example.com"];
948
949 for (url_str, expected_domain) in discovered_urls.iter().zip(expected_domains.iter()) {
950 let discovered_domain = if let Ok(parsed_url) = Url::parse(url_str) {
952 parsed_url.host_str().unwrap_or(parent_domain).to_string()
953 } else {
954 parent_domain.to_string() };
956
957 let crawl_item = CrawlQueue {
958 id: Uuid::new_v4().to_string(),
959 url: url_str.clone(),
960 domain: discovered_domain.clone(),
961 depth: 1,
962 status: CrawlStatus::Pending,
963 created_at: Utc::now(),
964 processed_at: None,
965 error_message: None,
966 retry_count: 0,
967 };
968
969 assert_eq!(
970 crawl_item.domain, *expected_domain,
971 "Domain should be extracted from discovered URL: {}",
972 url_str
973 );
974 assert_eq!(crawl_item.url, *url_str, "URL should match discovered URL");
975 }
976 }
977
978 #[test]
979 fn test_domain_extraction_includes_subdomains_not_just_tld() {
980 use url::Url;
981
982 let test_cases = vec![
984 ("https://api.example.com/v1/users", "api.example.com"), (
986 "https://blog.subdomain.example.com/post",
987 "blog.subdomain.example.com",
988 ), (
990 "https://cdn.assets.company.org/images",
991 "cdn.assets.company.org",
992 ), ("https://example.com/page", "example.com"), ("https://www.example.co.uk/page", "www.example.co.uk"), ("http://localhost:8080/test", "localhost"), ];
997
998 for (url_str, expected_domain) in test_cases {
999 let parsed_url = Url::parse(url_str).expect("Should parse valid URL");
1000 let extracted_domain = parsed_url.host_str().expect("Should have host");
1001 assert_eq!(
1002 extracted_domain, expected_domain,
1003 "Domain extraction should include full subdomain path for URL: {}",
1004 url_str
1005 );
1006
1007 if expected_domain.contains('.') {
1009 let tld_only = expected_domain.split('.').next_back().unwrap();
1010 assert_ne!(
1011 extracted_domain, tld_only,
1012 "Should extract full domain, not just TLD '{}' for URL: {}",
1013 tld_only, url_str
1014 );
1015 }
1016 }
1017 }
1018
1019 #[test]
1020 fn test_url_to_id_generates_deterministic_ids() {
1021 let url1 = "https://example.com/page";
1023 let url2 = "https://example.com/page";
1024 let url3 = "https://example.com/different";
1025
1026 let id1 = IndexerService::url_to_id(url1);
1027 let id2 = IndexerService::url_to_id(url2);
1028 let id3 = IndexerService::url_to_id(url3);
1029
1030 assert_eq!(id1, id2, "Same URL should generate same ID");
1031 assert_ne!(id1, id3, "Different URLs should generate different IDs");
1032
1033 assert!(id1.chars().all(|c| c.is_ascii_hexdigit()));
1035 assert!(id3.chars().all(|c| c.is_ascii_hexdigit()));
1036
1037 assert_eq!(id1.len(), 64);
1039 assert_eq!(id3.len(), 64);
1040 }
1041
1042 #[test]
1043 fn test_url_deduplication_in_crawl_queue() {
1044 use crate::storage::{CrawlQueue, CrawlStatus};
1045 use chrono::Utc;
1046
1047 let url = "https://example.com/test-page";
1049 let domain = "example.com";
1050
1051 let crawl_item1 = CrawlQueue {
1052 id: IndexerService::url_to_id(url),
1053 url: url.to_string(),
1054 domain: domain.to_string(),
1055 depth: 1,
1056 status: CrawlStatus::Pending,
1057 created_at: Utc::now(),
1058 processed_at: None,
1059 error_message: None,
1060 retry_count: 0,
1061 };
1062
1063 let crawl_item2 = CrawlQueue {
1064 id: IndexerService::url_to_id(url),
1065 url: url.to_string(),
1066 domain: domain.to_string(),
1067 depth: 2, status: CrawlStatus::Pending,
1069 created_at: Utc::now(),
1070 processed_at: None,
1071 error_message: None,
1072 retry_count: 0,
1073 };
1074
1075 assert_eq!(
1077 crawl_item1.id, crawl_item2.id,
1078 "Same URL should produce same ID for deduplication"
1079 );
1080 assert_eq!(crawl_item1.url, crawl_item2.url, "URLs should match");
1081 }
1082
1083 #[test]
1084 fn test_link_discovery_queues_discovered_urls() {
1085 use crate::storage::{CrawlQueue, CrawlStatus};
1086 use chrono::Utc;
1087
1088 let base_url = "https://clojure.org";
1090 let discovered_link = "https://clojure.org/guides/getting_started";
1091
1092 let base_item = CrawlQueue {
1094 id: IndexerService::url_to_id(base_url),
1095 url: base_url.to_string(),
1096 domain: "clojure.org".to_string(),
1097 depth: 0, status: CrawlStatus::Completed,
1099 created_at: Utc::now(),
1100 processed_at: Some(Utc::now()),
1101 error_message: None,
1102 retry_count: 0,
1103 };
1104
1105 let discovered_item = CrawlQueue {
1107 id: IndexerService::url_to_id(discovered_link),
1108 url: discovered_link.to_string(),
1109 domain: "clojure.org".to_string(),
1110 depth: base_item.depth + 1, status: CrawlStatus::Pending, created_at: Utc::now(),
1113 processed_at: None, error_message: None,
1115 retry_count: 0,
1116 };
1117
1118 assert_eq!(discovered_item.url, discovered_link);
1120 assert_eq!(discovered_item.domain, "clojure.org");
1121 assert_eq!(discovered_item.depth, 1);
1122 assert_eq!(discovered_item.status as u8, CrawlStatus::Pending as u8);
1123 assert!(discovered_item.processed_at.is_none());
1124
1125 assert_ne!(base_item.url, discovered_item.url);
1127 assert_eq!(base_item.domain, discovered_item.domain);
1128 assert!(discovered_item.url.starts_with(&base_item.url));
1129 }
1130}