1use anyhow::{Context, Result};
31use azure_data_cosmos::{
32 models::{ContainerProperties, PartitionKeyDefinition},
33 CosmosClient, PartitionKey, Query, QueryOptions, QueryPartitionStrategy,
34};
35use chrono::{DateTime, Datelike, Timelike, Utc};
36use futures_util::TryStreamExt;
37use reqwest::Client;
38use serde::{Deserialize, Serialize};
39use std::collections::VecDeque;
40use std::sync::{Arc, Mutex};
41use tracing::{debug, info, warn};
42
43use base64::Engine;
45use hmac::{Hmac, Mac};
46use sha2::Sha256;
47
48use crate::Config;
49
50const COSMOS_API_VERSION: &str = "2018-12-31";
52
53const MAX_LOG_ENTRIES: usize = 100;
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct LogEntry {
59 pub timestamp: DateTime<Utc>,
60 pub level: String,
61 pub message: String,
62}
63
64pub struct LogBuffer {
66 entries: Arc<Mutex<VecDeque<LogEntry>>>,
67}
68
69impl LogBuffer {
70 pub fn new() -> Self {
71 Self {
72 entries: Arc::new(Mutex::new(VecDeque::with_capacity(MAX_LOG_ENTRIES))),
73 }
74 }
75
76 pub fn add_log(&self, level: &str, message: &str) {
77 let entry = LogEntry {
78 timestamp: Utc::now(),
79 level: level.to_string(),
80 message: message.to_string(),
81 };
82
83 if let Ok(mut entries) = self.entries.lock() {
84 if entries.len() >= MAX_LOG_ENTRIES {
85 entries.pop_front();
86 }
87 entries.push_back(entry);
88 }
89 }
90
91 pub fn get_recent_logs(&self, limit: usize) -> Vec<LogEntry> {
92 if let Ok(entries) = self.entries.lock() {
93 entries
94 .iter()
95 .rev() .take(limit)
97 .cloned()
98 .collect()
99 } else {
100 Vec::new()
101 }
102 }
103}
104
105lazy_static::lazy_static! {
106 pub static ref GLOBAL_LOG_BUFFER: LogBuffer = LogBuffer::new();
108}
109
110#[macro_export]
112macro_rules! log_and_capture {
113 ($level:ident, $($arg:tt)*) => {
114 {
115 let message = format!($($arg)*);
116 tracing::$level!("{}", message);
117 $crate::storage::GLOBAL_LOG_BUFFER.add_log(stringify!($level), &message);
118 }
119 };
120}
121
122#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct WebPage {
127 pub id: String,
129 pub url: String,
131 pub title: String,
133 pub content: String,
135 pub snippet: String,
137 pub domain: String,
139 pub indexed_at: DateTime<Utc>,
141 pub last_crawled: DateTime<Utc>,
143 pub status_code: u16,
145 pub content_type: Option<String>,
147 pub content_length: Option<usize>,
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct CrawlQueue {
156 pub id: String,
158 pub url: String,
160 pub domain: String,
162 pub depth: usize,
164 pub status: CrawlStatus,
165 pub created_at: DateTime<Utc>,
166 pub processed_at: Option<DateTime<Utc>>,
167 pub error_message: Option<String>,
168 pub retry_count: usize,
169}
170
171#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
172pub enum CrawlStatus {
173 Pending,
174 Processing,
175 Completed,
176 Failed,
177 Skipped,
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize)]
184pub struct SearchStatistic {
185 pub id: String,
187 pub query: String,
189 pub query_normalized: String,
191 pub result_count: usize,
193 pub search_time_ms: u64,
195 pub timestamp: DateTime<Utc>,
197 pub user_ip: Option<String>,
199}
200
201pub struct StorageService {
206 client: Client,
208 cosmos_client: Option<CosmosClient>,
210 config: Arc<Config>,
212}
213
214impl StorageService {
215 pub async fn new(config: Arc<Config>) -> Result<Self> {
233 let client = Client::builder()
234 .user_agent(&config.application.user_agent)
235 .build()
236 .context("Failed to create HTTP client")?;
237
238 crate::log_and_capture!(info, "🚀 STORAGE: Initializing Azure Cosmos DB SDK client");
240 let cosmos_client = Self::create_cosmos_client(&config)?;
241
242 let service = Self {
243 client,
244 cosmos_client: Some(cosmos_client),
245 config,
246 };
247
248 service.ensure_database_exists().await?;
250 service.ensure_containers_exist().await?;
251
252 crate::log_and_capture!(
253 info,
254 "✅ STORAGE: Azure Cosmos DB storage service initialized successfully"
255 );
256
257 Ok(service)
258 }
259
260 fn create_cosmos_client(config: &Config) -> Result<CosmosClient> {
262 info!("Creating Cosmos client with master key authentication");
263
264 let cosmos_client = CosmosClient::with_key(
266 &config.azure.cosmos_endpoint,
267 config.azure.cosmos_key.clone().into(),
268 None,
269 )
270 .context("Failed to create Cosmos client with master key")?;
271
272 Ok(cosmos_client)
273 }
274
275 pub async fn store_webpage(&self, webpage: &WebPage) -> Result<()> {
276 if let Some(cosmos_client) = &self.cosmos_client {
278 debug!(
279 "Using Azure Cosmos DB SDK to store webpage: {}",
280 webpage.url
281 );
282
283 let db_client = cosmos_client.database_client(&self.config.azure.cosmos_database_name);
284 let container_client =
285 db_client.container_client(&self.config.azure.cosmos_container_name);
286
287 let partition_key = PartitionKey::from(&webpage.domain);
289
290 match container_client
292 .create_item(partition_key, webpage, None)
293 .await
294 {
295 Ok(_) => {
296 debug!("Successfully stored webpage via SDK: {}", webpage.url);
297 return Ok(());
298 }
299 Err(e) => {
300 debug!(
302 "Webpage storage via SDK failed: {}. Falling back to REST API",
303 e
304 );
305 }
306 }
307 }
308
309 debug!("Using REST API fallback to store webpage: {}", webpage.url);
311 self.store_webpage_rest_api(webpage).await
312 }
313
314 async fn store_webpage_rest_api(&self, webpage: &WebPage) -> Result<()> {
315 let url = format!(
316 "{}/dbs/{}/colls/{}/docs",
317 self.config.azure.cosmos_endpoint,
318 self.config.azure.cosmos_database_name,
319 self.config.azure.cosmos_container_name
320 );
321
322 let resource_id = format!(
323 "dbs/{}/colls/{}",
324 self.config.azure.cosmos_database_name, self.config.azure.cosmos_container_name
325 );
326 let (auth_header, date_header) = self.cosmos_auth_headers("post", "docs", &resource_id)?;
327
328 let response = self
329 .client
330 .post(&url)
331 .header("Authorization", auth_header)
332 .header("x-ms-date", date_header)
333 .header("Content-Type", "application/json")
334 .header("x-ms-version", COSMOS_API_VERSION)
335 .header(
336 "x-ms-documentdb-partitionkey",
337 format!(r#"["{}"]]"#, webpage.domain),
338 )
339 .json(webpage)
340 .send()
341 .await
342 .context("Failed to store webpage")?;
343
344 if !response.status().is_success() {
345 let status = response.status();
346 let error_text = response.text().await.unwrap_or_default();
347 return Err(anyhow::anyhow!(
348 "Failed to store webpage with status {}: {}",
349 status,
350 error_text
351 ));
352 }
353
354 debug!("Successfully stored webpage: {}", webpage.url);
355 Ok(())
356 }
357
358 pub async fn get_webpage(&self, id: &str, domain: &str) -> Result<Option<WebPage>> {
359 if let Some(cosmos_client) = &self.cosmos_client {
361 debug!("Using Azure Cosmos DB SDK to get webpage: {}", id);
362
363 let db_client = cosmos_client.database_client(&self.config.azure.cosmos_database_name);
364 let container_client =
365 db_client.container_client(&self.config.azure.cosmos_container_name);
366
367 match container_client
369 .read_item(&domain.to_string(), id, None)
370 .await
371 {
372 Ok(response) => {
373 let webpage: WebPage = response
374 .into_json_body()
375 .await
376 .context("Failed to parse webpage response from SDK")?;
377 debug!("Successfully retrieved webpage via SDK: {}", id);
378 return Ok(Some(webpage));
379 }
380 Err(e) => {
381 let error_string = format!("{:?}", e);
383 if error_string.contains("404") || error_string.contains("NotFound") {
384 return Ok(None);
385 } else {
386 debug!(
388 "Webpage retrieval via SDK failed: {}. Falling back to REST API",
389 e
390 );
391 }
392 }
393 }
394 }
395
396 debug!("Using REST API fallback to get webpage: {}", id);
398 self.get_webpage_rest_api(id, domain).await
399 }
400
401 async fn get_webpage_rest_api(&self, id: &str, domain: &str) -> Result<Option<WebPage>> {
402 let url = format!(
403 "{}/dbs/{}/colls/{}/docs/{}",
404 self.config.azure.cosmos_endpoint,
405 self.config.azure.cosmos_database_name,
406 self.config.azure.cosmos_container_name,
407 id
408 );
409
410 let resource_id = format!(
411 "dbs/{}/colls/{}/docs/{}",
412 self.config.azure.cosmos_database_name, self.config.azure.cosmos_container_name, id
413 );
414 let (auth_header, date_header) = self.cosmos_auth_headers("get", "docs", &resource_id)?;
415
416 let response = self
417 .client
418 .get(&url)
419 .header("Authorization", auth_header)
420 .header("x-ms-date", date_header)
421 .header("x-ms-version", COSMOS_API_VERSION)
422 .header(
423 "x-ms-documentdb-partitionkey",
424 format!(r#"["{}"]]"#, domain),
425 )
426 .send()
427 .await
428 .context("Failed to get webpage")?;
429
430 if response.status().as_u16() == 404 {
431 return Ok(None);
432 }
433
434 if !response.status().is_success() {
435 let status = response.status();
436 let error_text = response.text().await.unwrap_or_default();
437 return Err(anyhow::anyhow!(
438 "Failed to get webpage with status {}: {}",
439 status,
440 error_text
441 ));
442 }
443
444 let webpage: WebPage = response
445 .json()
446 .await
447 .context("Failed to parse webpage response")?;
448
449 Ok(Some(webpage))
450 }
451
452 pub async fn queue_crawl(&self, crawl_item: &CrawlQueue) -> Result<()> {
453 if let Some(cosmos_client) = &self.cosmos_client {
455 debug!(
456 "Using Azure Cosmos DB SDK to queue crawl item: {}",
457 crawl_item.url
458 );
459
460 let db_client = cosmos_client.database_client(&self.config.azure.cosmos_database_name);
461 let container_client = db_client.container_client("crawl-queue");
462
463 let partition_key = PartitionKey::from(&crawl_item.domain);
465
466 match container_client
468 .create_item(partition_key, crawl_item, None)
469 .await
470 {
471 Ok(_) => {
472 debug!("Successfully queued crawl item via SDK: {}", crawl_item.url);
473 return Ok(());
474 }
475 Err(e) => {
476 debug!(
478 "Crawl queue via SDK failed: {}. Falling back to REST API",
479 e
480 );
481 }
482 }
483 }
484
485 debug!(
487 "Using REST API fallback to queue crawl item: {}",
488 crawl_item.url
489 );
490 self.queue_crawl_rest_api(crawl_item).await
491 }
492
493 async fn queue_crawl_rest_api(&self, crawl_item: &CrawlQueue) -> Result<()> {
494 let url = format!(
495 "{}/dbs/{}/colls/crawl-queue/docs",
496 self.config.azure.cosmos_endpoint, self.config.azure.cosmos_database_name
497 );
498
499 debug!("Queuing crawl item: {}", crawl_item.url);
500
501 let resource_id = format!(
502 "dbs/{}/colls/crawl-queue",
503 self.config.azure.cosmos_database_name
504 );
505 let (auth_header, date_header) = self.cosmos_auth_headers("post", "docs", &resource_id)?;
506
507 let response = self
508 .client
509 .post(&url)
510 .header("Authorization", auth_header)
511 .header("x-ms-date", date_header)
512 .header("Content-Type", "application/json")
513 .header("x-ms-version", COSMOS_API_VERSION)
514 .header(
515 "x-ms-documentdb-partitionkey",
516 format!(r#"["{}"]]"#, crawl_item.domain),
517 )
518 .json(crawl_item)
519 .send()
520 .await
521 .context("Failed to queue crawl item")?;
522
523 if !response.status().is_success() {
524 let status = response.status();
525 let error_text = response.text().await.unwrap_or_default();
526 return Err(anyhow::anyhow!(
527 "Failed to queue crawl item with status {}: {}",
528 status,
529 error_text
530 ));
531 }
532
533 debug!("Successfully queued crawl item: {}", crawl_item.url);
534 Ok(())
535 }
536
537 pub async fn get_pending_crawl_items(&self, limit: usize) -> Result<Vec<CrawlQueue>> {
538 debug!("get_pending_crawl_items called with limit: {}", limit);
539
540 if let Some(_cosmos_client) = &self.cosmos_client {
542 debug!("Using Azure Cosmos DB SDK to get pending crawl items");
543
544 match self.get_pending_crawl_items_sdk_query(limit).await {
545 Ok(items) if !items.is_empty() => {
546 debug!("Found {} actual pending crawl items via SDK", items.len());
547 return Ok(items);
548 }
549 Ok(_) => {
550 debug!("No pending crawl items found via SDK, will try REST API fallback");
551 }
552 Err(e) => {
553 debug!(
554 "Failed to query pending items via SDK: {}. Falling back to REST API",
555 e
556 );
557 }
558 }
559 }
560
561 debug!("Using REST API fallback to get pending crawl items");
563 match self.get_pending_crawl_items_rest_api(limit).await {
564 Ok(items) if !items.is_empty() => {
565 debug!(
566 "Found {} actual pending crawl items via REST API",
567 items.len()
568 );
569 return Ok(items);
570 }
571 Ok(_) => {
572 debug!("No pending crawl items found via REST API, will create root domain items");
573 }
574 Err(e) => {
575 debug!(
576 "Failed to query pending items via REST API, falling back to domain creation: {}",
577 e
578 );
579 }
580 }
581
582 self.create_root_domain_crawl_items(limit).await
585 }
586
587 async fn get_pending_crawl_items_sdk_query(&self, limit: usize) -> Result<Vec<CrawlQueue>> {
588 debug!("Attempting to query pending crawl items using Azure SDK");
589
590 if let Some(cosmos_client) = &self.cosmos_client {
591 let db_client = cosmos_client.database_client(&self.config.azure.cosmos_database_name);
592 let container_client = db_client.container_client("crawl-queue");
593
594 let query =
597 Query::from("SELECT * FROM c WHERE c.status = @status ORDER BY c.created_at ASC");
598
599 debug!("Executing SDK query per partition for pending items");
600
601 let mut all_items = Vec::new();
602 let mut items_collected = 0;
603
604 for domain in &self.config.application.allowed_domains {
606 if items_collected >= limit {
607 break;
608 }
609
610 let partition_key = PartitionKey::from(domain);
611 let query_with_params = query
612 .clone()
613 .with_parameter("@status", "Pending")
614 .map_err(|e| anyhow::anyhow!("Failed to add parameter to query: {}", e))?;
615
616 let _remaining_limit = limit - items_collected;
617 let query_options = QueryOptions {
618 ..Default::default()
619 };
620
621 debug!("Querying partition {} for pending items", domain);
622
623 match container_client.query_items::<CrawlQueue>(
624 query_with_params,
625 QueryPartitionStrategy::SinglePartition(partition_key),
626 Some(query_options),
627 ) {
628 Ok(pager) => {
629 let mut partition_items = Vec::new();
631 let mut feed_pager = pager;
632
633 match feed_pager.try_next().await {
635 Ok(Some(page)) => {
636 for item in page.into_items() {
637 if items_collected < limit {
638 partition_items.push(item);
639 items_collected += 1;
640 } else {
641 break;
642 }
643 }
644 debug!(
645 "Found {} pending items in partition {}",
646 partition_items.len(),
647 domain
648 );
649 }
650 Ok(None) => {
651 debug!("No items found in partition {}", domain);
652 }
653 Err(e) => {
654 debug!("Failed to read page from partition {}: {}", domain, e);
655 continue;
656 }
657 }
658
659 all_items.extend(partition_items);
660 }
661 Err(e) => {
662 debug!("Failed to query partition {}: {}", domain, e);
663 continue;
664 }
665 }
666 }
667
668 all_items.sort_by(|a, b| a.created_at.cmp(&b.created_at));
670
671 all_items.truncate(limit);
673
674 if !all_items.is_empty() {
675 info!(
676 "📦 SDK: Retrieved {} pending crawl items from {} partitions",
677 all_items.len(),
678 self.config.application.allowed_domains.len()
679 );
680 } else {
681 debug!("📦 SDK: No pending crawl items found across all partitions");
682 }
683
684 return Ok(all_items);
685 }
686
687 Err(anyhow::anyhow!("No Cosmos client available"))
688 }
689
690 async fn create_root_domain_crawl_items(&self, limit: usize) -> Result<Vec<CrawlQueue>> {
691 debug!("Creating root domain crawl items as fallback (no pending items found)");
692
693 let mut work_items = Vec::new();
697
698 let domains_to_process =
701 std::cmp::min(limit, self.config.application.allowed_domains.len());
702
703 for (i, domain) in self.config.application.allowed_domains.iter().enumerate() {
704 if i >= domains_to_process {
705 break;
706 }
707
708 let domain_url = if domain.starts_with("http") {
710 domain.clone()
711 } else {
712 format!("https://{}", domain)
713 };
714
715 let crawl_item = CrawlQueue {
716 id: Self::url_to_id(&domain_url), url: domain_url,
718 domain: domain.clone(),
719 depth: 0,
720 status: CrawlStatus::Pending,
721 created_at: chrono::Utc::now(),
722 processed_at: None,
723 error_message: None,
724 retry_count: 0,
725 };
726
727 if let Err(e) = self.queue_crawl(&crawl_item).await {
729 debug!(
730 "Failed to queue domain {} (may already exist): {}",
731 domain, e
732 );
733 } else {
734 debug!("Queued crawl item for domain: {}", domain);
735 work_items.push(crawl_item);
736 }
737 }
738
739 debug!("Generated {} work items for processing", work_items.len());
740
741 Ok(work_items)
742 }
743
744 fn url_to_id(url: &str) -> String {
745 use sha2::{Digest, Sha256};
747 let mut hasher = Sha256::new();
748 hasher.update(url.as_bytes());
749 let hash = hasher.finalize();
750 format!("{:x}", hash)
751 }
752
753 async fn get_pending_crawl_items_rest_api(&self, limit: usize) -> Result<Vec<CrawlQueue>> {
754 debug!("Querying for pending crawl items using REST API");
755
756 let query = r#"
757 SELECT * FROM c
758 WHERE c.status = "Pending"
759 ORDER BY c.created_at ASC
760 "#;
761
762 let url = format!(
763 "{}/dbs/{}/colls/crawl-queue/docs",
764 self.config.azure.cosmos_endpoint, self.config.azure.cosmos_database_name
765 );
766
767 let resource_id = format!(
768 "dbs/{}/colls/crawl-queue",
769 self.config.azure.cosmos_database_name
770 );
771
772 let (auth_header, date_header) = self.cosmos_auth_headers("post", "docs", &resource_id)?;
773
774 let query_request = serde_json::json!({
775 "query": query,
776 "parameters": []
777 });
778
779 let response = self
780 .client
781 .post(&url)
782 .header("Authorization", auth_header)
783 .header("x-ms-date", date_header)
784 .header("Content-Type", "application/query+json")
785 .header("x-ms-version", COSMOS_API_VERSION)
786 .header("x-ms-documentdb-isquery", "true")
787 .header("x-ms-max-item-count", limit.to_string())
788 .header("x-ms-documentdb-query-enablecrosspartition", "true")
789 .json(&query_request)
790 .send()
791 .await
792 .context("Failed to query pending crawl items")?;
793
794 if !response.status().is_success() {
795 let status = response.status();
796 let error_text = response.text().await.unwrap_or_default();
797 return Err(anyhow::anyhow!(
798 "Failed to query pending crawl items with status {}: {}",
799 status,
800 error_text
801 ));
802 }
803
804 let response_text = response.text().await?;
805 debug!("Raw response from query: {}", response_text);
806
807 let response_json: serde_json::Value = serde_json::from_str(&response_text)
808 .context("Failed to parse query response as JSON")?;
809
810 let documents = response_json["Documents"]
811 .as_array()
812 .ok_or_else(|| anyhow::anyhow!("No Documents array in response"))?;
813
814 let mut crawl_items = Vec::new();
815 for doc in documents {
816 match serde_json::from_value::<CrawlQueue>(doc.clone()) {
817 Ok(item) => {
818 debug!(
819 "Found pending crawl item: {} (depth: {})",
820 item.url, item.depth
821 );
822 crawl_items.push(item);
823 }
824 Err(e) => {
825 debug!("Failed to deserialize crawl item: {}", e);
826 }
827 }
828 }
829
830 info!(
831 "Retrieved {} pending crawl items from database (requested: {})",
832 crawl_items.len(),
833 limit
834 );
835
836 Ok(crawl_items)
837 }
838
839 pub async fn update_crawl_status(
840 &self,
841 id: &str,
842 domain: &str,
843 status: CrawlStatus,
844 error_message: Option<String>,
845 ) -> Result<()> {
846 if let Some(cosmos_client) = &self.cosmos_client {
848 debug!(
849 "Using Azure Cosmos DB SDK to update crawl status for: {}",
850 id
851 );
852
853 let existing = self.get_crawl_item(id, domain).await?;
855 if let Some(mut crawl_item) = existing {
856 crawl_item.status = status.clone();
857 crawl_item.error_message = error_message.clone();
858 crawl_item.processed_at = Some(Utc::now());
859
860 let db_client =
861 cosmos_client.database_client(&self.config.azure.cosmos_database_name);
862 let container_client = db_client.container_client("crawl-queue");
863
864 let partition_key = PartitionKey::from(domain.to_string());
866
867 match container_client
869 .replace_item(partition_key, id, &crawl_item, None)
870 .await
871 {
872 Ok(_) => {
873 debug!("Successfully updated crawl status via SDK: {}", id);
874 return Ok(());
875 }
876 Err(e) => {
877 debug!(
879 "Update crawl status via SDK failed: {}. Falling back to REST API",
880 e
881 );
882 }
883 }
884 } else {
885 return Ok(()); }
887 }
888
889 debug!("Using REST API fallback to update crawl status: {}", id);
891 self.update_crawl_status_rest_api(id, domain, status, error_message)
892 .await
893 }
894
895 async fn update_crawl_status_rest_api(
896 &self,
897 id: &str,
898 domain: &str,
899 status: CrawlStatus,
900 error_message: Option<String>,
901 ) -> Result<()> {
902 let url = format!(
905 "{}/dbs/{}/colls/crawl-queue/docs/{}",
906 self.config.azure.cosmos_endpoint, self.config.azure.cosmos_database_name, id
907 );
908
909 let existing = self.get_crawl_item_rest_api(id, domain).await?;
911 if let Some(mut crawl_item) = existing {
912 crawl_item.status = status;
913 crawl_item.error_message = error_message;
914 crawl_item.processed_at = Some(Utc::now());
915
916 let resource_id = format!(
917 "dbs/{}/colls/crawl-queue/docs/{}",
918 self.config.azure.cosmos_database_name, id
919 );
920 let (auth_header, date_header) =
921 self.cosmos_auth_headers("put", "docs", &resource_id)?;
922
923 let response = self
924 .client
925 .put(&url)
926 .header("Authorization", auth_header)
927 .header("x-ms-date", date_header)
928 .header("Content-Type", "application/json")
929 .header("x-ms-version", COSMOS_API_VERSION)
930 .header(
931 "x-ms-documentdb-partitionkey",
932 format!(r#"["{}"]]"#, domain),
933 )
934 .json(&crawl_item)
935 .send()
936 .await
937 .context("Failed to update crawl status")?;
938
939 if !response.status().is_success() {
940 let status = response.status();
941 let error_text = response.text().await.unwrap_or_default();
942 return Err(anyhow::anyhow!(
943 "Failed to update crawl status with status {}: {}",
944 status,
945 error_text
946 ));
947 }
948 }
949
950 Ok(())
951 }
952
953 pub async fn get_domain_last_indexed(&self, domain: &str) -> Result<Option<DateTime<Utc>>> {
956 debug!("get_domain_last_indexed called for domain: {}", domain);
957 debug!("Note: Returning None - Azure SDK doesn't support complex queries yet");
958 debug!("This will cause domains to be re-indexed, which is safer than auth errors");
959
960 Ok(None)
964 }
965
966 pub async fn store_search_statistic(&self, statistic: &SearchStatistic) -> Result<()> {
968 if let Some(cosmos_client) = &self.cosmos_client {
970 debug!(
971 "Using Azure Cosmos DB SDK to store search statistic: {}",
972 statistic.query
973 );
974
975 let db_client = cosmos_client.database_client(&self.config.azure.cosmos_database_name);
976 let container_client = db_client.container_client("search-stats");
977
978 let partition_key = PartitionKey::from(&statistic.query_normalized);
980
981 match container_client
983 .create_item(partition_key, statistic, None)
984 .await
985 {
986 Ok(_) => {
987 debug!(
988 "Successfully stored search statistic via SDK: {}",
989 statistic.query
990 );
991 return Ok(());
992 }
993 Err(e) => {
994 debug!("Store search statistic via SDK failed: {}. Skipping to prevent auth errors", e);
996 return Ok(()); }
998 }
999 }
1000
1001 debug!("No Azure SDK client available, skipping search statistic storage");
1002 Ok(())
1003 }
1004
1005 pub async fn get_recent_search_statistics(&self, limit: usize) -> Result<Vec<SearchStatistic>> {
1007 debug!("get_recent_search_statistics called with limit: {}", limit);
1008 debug!("Note: Returning empty list - Azure SDK doesn't support complex queries yet");
1009
1010 Ok(Vec::new())
1013 }
1014
1015 pub async fn get_top_search_queries(&self, limit: usize) -> Result<Vec<(String, usize)>> {
1017 debug!("get_top_search_queries called with limit: {}", limit);
1018 debug!("Note: Returning empty list - Azure SDK doesn't support complex queries yet");
1019
1020 Ok(Vec::new())
1023 }
1024
1025 pub async fn get_crawl_item(&self, id: &str, domain: &str) -> Result<Option<CrawlQueue>> {
1026 if let Some(cosmos_client) = &self.cosmos_client {
1028 debug!("Using Azure Cosmos DB SDK to get crawl item: {}", id);
1029
1030 let db_client = cosmos_client.database_client(&self.config.azure.cosmos_database_name);
1031 let container_client = db_client.container_client("crawl-queue");
1032
1033 let partition_key = PartitionKey::from(domain.to_string());
1035
1036 match container_client.read_item(partition_key, id, None).await {
1038 Ok(response) => {
1039 debug!("Successfully retrieved crawl item via SDK: {}", id);
1040 let crawl_item: CrawlQueue = response
1041 .into_json_body()
1042 .await
1043 .context("Failed to deserialize crawl item")?;
1044 return Ok(Some(crawl_item));
1045 }
1046 Err(e) => {
1047 let error_string = format!("{:?}", e);
1048 if error_string.contains("404") || error_string.contains("NotFound") {
1049 debug!("Crawl item not found via SDK: {}", id);
1050 return Ok(None);
1051 }
1052 debug!(
1054 "Get crawl item via SDK failed: {}. Falling back to REST API",
1055 e
1056 );
1057 }
1058 }
1059 }
1060
1061 debug!("Using REST API fallback to get crawl item: {}", id);
1063 self.get_crawl_item_rest_api(id, domain).await
1064 }
1065
1066 async fn get_crawl_item_rest_api(&self, id: &str, domain: &str) -> Result<Option<CrawlQueue>> {
1067 let url = format!(
1068 "{}/dbs/{}/colls/crawl-queue/docs/{}",
1069 self.config.azure.cosmos_endpoint, self.config.azure.cosmos_database_name, id
1070 );
1071
1072 let resource_id = format!(
1073 "dbs/{}/colls/crawl-queue/docs/{}",
1074 self.config.azure.cosmos_database_name, id
1075 );
1076 let (auth_header, date_header) = self.cosmos_auth_headers("get", "docs", &resource_id)?;
1077
1078 let response = self
1079 .client
1080 .get(&url)
1081 .header("Authorization", auth_header)
1082 .header("x-ms-date", date_header)
1083 .header("x-ms-version", COSMOS_API_VERSION)
1084 .header(
1085 "x-ms-documentdb-partitionkey",
1086 format!(r#"["{}"]]"#, domain),
1087 )
1088 .send()
1089 .await
1090 .context("Failed to get crawl item")?;
1091
1092 if response.status().as_u16() == 404 {
1093 return Ok(None);
1094 }
1095
1096 if !response.status().is_success() {
1097 return Ok(None);
1098 }
1099
1100 let crawl_item: CrawlQueue = response
1101 .json()
1102 .await
1103 .context("Failed to parse crawl item response")?;
1104
1105 Ok(Some(crawl_item))
1106 }
1107
1108 #[cfg(test)]
1109 pub async fn get_domain_last_indexed_test(
1110 &self,
1111 _domain: &str,
1112 return_time: Option<DateTime<Utc>>,
1113 ) -> Result<Option<DateTime<Utc>>> {
1114 Ok(return_time)
1116 }
1117
1118 async fn ensure_database_exists(&self) -> Result<()> {
1119 if let Some(cosmos_client) = &self.cosmos_client {
1121 info!("Using Azure Cosmos DB SDK to ensure database exists");
1122
1123 match cosmos_client
1125 .create_database(&self.config.azure.cosmos_database_name, None)
1126 .await
1127 {
1128 Ok(_) => {
1129 info!(
1130 "Database '{}' created successfully via SDK",
1131 self.config.azure.cosmos_database_name
1132 );
1133 return Ok(());
1134 }
1135 Err(e) => {
1136 let error_string = format!("{:?}", e);
1138 if error_string.contains("409") || error_string.contains("Conflict") {
1139 info!(
1140 "Database '{}' already exists (via SDK)",
1141 self.config.azure.cosmos_database_name
1142 );
1143 return Ok(());
1144 } else {
1145 info!(
1147 "Database creation via SDK failed: {}. Falling back to REST API",
1148 e
1149 );
1150 }
1151 }
1152 }
1153 }
1154
1155 info!("Using REST API fallback to ensure database exists");
1157 self.ensure_database_exists_rest_api().await
1158 }
1159
1160 async fn ensure_database_exists_rest_api(&self) -> Result<()> {
1161 let url = format!("{}/dbs", self.config.azure.cosmos_endpoint);
1162
1163 let create_request = serde_json::json!({
1164 "id": self.config.azure.cosmos_database_name
1165 });
1166
1167 let (auth_header, date_header) = self.cosmos_auth_headers("post", "dbs", "")?;
1169
1170 info!("Making request to create/ensure database:");
1171 info!(" URL: {}", url);
1172 info!(
1173 " Request body: {}",
1174 serde_json::to_string_pretty(&create_request).unwrap_or_default()
1175 );
1176 info!(" Headers we're sending:");
1177 info!(" Authorization: {}", auth_header);
1178 info!(" x-ms-date: {}", date_header);
1179 info!(" Content-Type: application/json");
1180 info!(" x-ms-version: {}", COSMOS_API_VERSION);
1181
1182 let request = self
1183 .client
1184 .post(&url)
1185 .header("Authorization", &auth_header)
1186 .header("x-ms-date", &date_header) .header("Content-Type", "application/json")
1188 .header("x-ms-version", COSMOS_API_VERSION)
1189 .json(&create_request);
1190
1191 info!("Final request ready to send");
1193
1194 let response = request.send().await.context("Failed to create database")?;
1195
1196 info!("Response status: {}", response.status());
1197
1198 if response.status().is_success() || response.status().as_u16() == 409 {
1200 info!(
1201 "Database '{}' is ready",
1202 self.config.azure.cosmos_database_name
1203 );
1204 Ok(())
1205 } else {
1206 let status = response.status();
1207 let error_text = response.text().await.unwrap_or_default();
1208 Err(anyhow::anyhow!(
1209 "Failed to ensure database exists with status {}: {}",
1210 status,
1211 error_text
1212 ))
1213 }
1214 }
1215
1216 async fn ensure_containers_exist(&self) -> Result<()> {
1217 self.create_container(&self.config.azure.cosmos_container_name, "/domain")
1219 .await?;
1220
1221 self.create_container("crawl-queue", "/domain").await?;
1223
1224 self.create_container("search-stats", "/query_normalized")
1226 .await?;
1227
1228 Ok(())
1229 }
1230
1231 async fn create_container(&self, container_name: &str, partition_key: &str) -> Result<()> {
1232 if let Some(cosmos_client) = &self.cosmos_client {
1234 info!(
1235 "Using Azure Cosmos DB SDK to create container: {}",
1236 container_name
1237 );
1238
1239 let db_client = cosmos_client.database_client(&self.config.azure.cosmos_database_name);
1240
1241 let container_properties = ContainerProperties {
1243 id: container_name.to_string().into(),
1244 partition_key: PartitionKeyDefinition::new(vec![partition_key.to_string()]),
1245 ..Default::default()
1246 };
1247
1248 match db_client.create_container(container_properties, None).await {
1250 Ok(_) => {
1251 info!(
1252 "Container '{}' created successfully via SDK",
1253 container_name
1254 );
1255 return Ok(());
1256 }
1257 Err(e) => {
1258 let error_string = format!("{:?}", e);
1260 if error_string.contains("409") || error_string.contains("Conflict") {
1261 info!("Container '{}' already exists (via SDK)", container_name);
1262 return Ok(());
1263 } else {
1264 info!(
1266 "Container creation via SDK failed: {}. Falling back to REST API",
1267 e
1268 );
1269 }
1270 }
1271 }
1272 }
1273
1274 info!(
1276 "Using REST API fallback to create container: {}",
1277 container_name
1278 );
1279 self.create_container_rest_api(container_name, partition_key)
1280 .await
1281 }
1282
1283 async fn create_container_rest_api(
1284 &self,
1285 container_name: &str,
1286 partition_key: &str,
1287 ) -> Result<()> {
1288 let url = format!(
1289 "{}/dbs/{}/colls",
1290 self.config.azure.cosmos_endpoint, self.config.azure.cosmos_database_name
1291 );
1292
1293 let create_request = serde_json::json!({
1294 "id": container_name,
1295 "partitionKey": {
1296 "paths": [partition_key],
1297 "kind": "Hash"
1298 }
1299 });
1300
1301 let resource_id = format!("dbs/{}", self.config.azure.cosmos_database_name);
1302 let (auth_header, date_header) = self.cosmos_auth_headers("post", "colls", &resource_id)?;
1303
1304 let response = self
1305 .client
1306 .post(&url)
1307 .header("Authorization", auth_header)
1308 .header("x-ms-date", date_header)
1309 .header("Content-Type", "application/json")
1310 .header("x-ms-version", COSMOS_API_VERSION)
1311 .json(&create_request)
1312 .send()
1313 .await
1314 .context("Failed to create container")?;
1315
1316 if response.status().is_success() || response.status().as_u16() == 409 {
1318 info!("Container '{}' is ready", container_name);
1319 Ok(())
1320 } else {
1321 let status = response.status();
1322 let error_text = response.text().await.unwrap_or_default();
1323 Err(anyhow::anyhow!(
1324 "Failed to ensure container '{}' exists with status {}: {}",
1325 container_name,
1326 status,
1327 error_text
1328 ))
1329 }
1330 }
1331
1332 fn get_rfc1123_date() -> String {
1334 let now = Utc::now();
1335
1336 let weekdays = ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"];
1338 let months = [
1339 "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec",
1340 ];
1341
1342 let weekday = weekdays[now.weekday().num_days_from_sunday() as usize];
1343 let month = months[now.month0() as usize];
1344
1345 let formatted = format!(
1346 "{}, {:02} {} {} {:02}:{:02}:{:02} GMT",
1347 weekday,
1348 now.day(),
1349 month,
1350 now.year(),
1351 now.hour(),
1352 now.minute(),
1353 now.second()
1354 );
1355
1356 info!(
1357 "Generated RFC 1123 date: '{}' (length: {})",
1358 formatted,
1359 formatted.len()
1360 );
1361 formatted
1362 }
1363
1364 fn generate_cosmos_signature(
1366 verb: &str,
1367 resource_type: &str,
1368 resource_id: &str,
1369 date: &str,
1370 master_key: &str,
1371 ) -> Result<String> {
1372 info!(
1374 "Master key validation - length: {}, appears to be base64: {}",
1375 master_key.len(),
1376 master_key
1377 .chars()
1378 .all(|c| c.is_alphanumeric() || c == '+' || c == '/' || c == '=')
1379 );
1380
1381 let verb_lower = verb.to_lowercase();
1384 let resource_type_lower = resource_type.to_lowercase();
1385
1386 let string_to_sign = format!(
1387 "{}\n{}\n{}\n{}\n{}\n",
1388 verb_lower, resource_type_lower, resource_id, date, ""
1389 );
1390
1391 info!("String to sign components:");
1393 info!(
1394 " verb (original): '{}' -> (lowercase): '{}'",
1395 verb, verb_lower
1396 );
1397 info!(
1398 " resource_type (original): '{}' -> (lowercase): '{}'",
1399 resource_type, resource_type_lower
1400 );
1401 info!(" resource_id: '{}'", resource_id);
1402 info!(" date: '{}'", date);
1403 info!(" empty string: ''");
1404 info!("String to sign: {:?}", string_to_sign);
1405 info!(
1406 "String to sign (raw): {}",
1407 string_to_sign.replace('\n', "\\n")
1408 );
1409 info!("String to sign bytes: {:?}", string_to_sign.as_bytes());
1410 info!("String to sign length: {} bytes", string_to_sign.len());
1411
1412 let key = base64::engine::general_purpose::STANDARD
1414 .decode(master_key)
1415 .context("Failed to decode master key - ensure it's valid base64")?;
1416
1417 info!("Decoded key length: {} bytes", key.len());
1418
1419 type HmacSha256 = Hmac<Sha256>;
1421 let mut mac = HmacSha256::new_from_slice(&key)
1422 .map_err(|e| anyhow::anyhow!("Invalid key length: {}", e))?;
1423
1424 mac.update(string_to_sign.as_bytes());
1425 let signature = mac.finalize().into_bytes();
1426
1427 let signature_b64 = base64::engine::general_purpose::STANDARD.encode(signature);
1429
1430 info!("Generated signature: {}", signature_b64);
1431
1432 Ok(signature_b64)
1433 }
1434
1435 fn cosmos_auth_headers(
1437 &self,
1438 verb: &str,
1439 resource_type: &str,
1440 resource_id: &str,
1441 ) -> Result<(String, String)> {
1442 let date = Self::get_rfc1123_date();
1443
1444 info!("=== Cosmos DB Auth Debug ===");
1446 info!("Request parameters:");
1447 info!(" verb: '{}'", verb);
1448 info!(" resource_type: '{}'", resource_type);
1449 info!(" resource_id: '{}'", resource_id);
1450 info!(" date: '{}'", date);
1451 info!(
1452 "Master key: {} characters, first 10: {}...",
1453 self.config.azure.cosmos_key.len(),
1454 &self
1455 .config
1456 .azure
1457 .cosmos_key
1458 .chars()
1459 .take(10)
1460 .collect::<String>()
1461 );
1462
1463 let test_date = "Sat, 07 Jun 2025 01:50:28 GMT";
1465 info!("Testing with fixed date: '{}'", test_date);
1466
1467 let signature = Self::generate_cosmos_signature(
1468 verb,
1469 resource_type,
1470 resource_id,
1471 &date, &self.config.azure.cosmos_key,
1473 )?;
1474
1475 let test_signature = Self::generate_cosmos_signature(
1477 verb,
1478 resource_type,
1479 resource_id,
1480 test_date,
1481 &self.config.azure.cosmos_key,
1482 )?;
1483
1484 info!("Signature with generated date: {}", signature);
1485 info!("Signature with fixed date: {}", test_signature);
1486
1487 let auth_header = format!("type=master&ver=1.0&sig={}", signature);
1488
1489 info!("Final headers:");
1490 info!(" Authorization: {}", auth_header);
1491 info!(" x-ms-date: {}", date);
1492 info!("=== End Cosmos DB Auth Debug ===");
1493
1494 Ok((auth_header, date))
1495 }
1496
1497 pub async fn get_crawl_queue_stats(&self) -> Result<(usize, usize, usize, usize)> {
1504 debug!("Getting crawl queue statistics using Azure SDK");
1505
1506 self.get_crawl_queue_stats_sdk().await
1508 }
1509
1510 async fn get_crawl_queue_stats_sdk(&self) -> Result<(usize, usize, usize, usize)> {
1511 debug!("Getting crawl queue statistics using Azure SDK by querying domain partitions");
1512
1513 let mut pending_total = 0;
1518 let mut processing_total = 0;
1519 let mut completed_total = 0;
1520 let mut failed_total = 0;
1521
1522 for domain in &self.config.application.allowed_domains {
1524 debug!("Querying statistics for domain partition: {}", domain);
1525
1526 match self.get_domain_partition_stats(domain, 100).await {
1529 Ok((pending, processing, completed, failed)) => {
1530 pending_total += pending;
1531 processing_total += processing;
1532 completed_total += completed;
1533 failed_total += failed;
1534 }
1535 Err(e) => {
1536 debug!("Failed to get stats for domain {}: {}", domain, e);
1537 }
1539 }
1540 }
1541
1542 debug!(
1543 "Aggregated crawl queue stats: pending={}, processing={}, completed={}, failed={}",
1544 pending_total, processing_total, completed_total, failed_total
1545 );
1546
1547 Ok((
1548 pending_total,
1549 processing_total,
1550 completed_total,
1551 failed_total,
1552 ))
1553 }
1554
1555 pub fn get_recent_logs(&self, limit: usize) -> Vec<LogEntry> {
1559 GLOBAL_LOG_BUFFER.get_recent_logs(limit)
1560 }
1561
1562 async fn get_domain_partition_stats(
1563 &self,
1564 domain: &str,
1565 sample_limit: usize,
1566 ) -> Result<(usize, usize, usize, usize)> {
1567 debug!(
1571 "Getting partition stats for domain: {} (sample limit: {})",
1572 domain, sample_limit
1573 );
1574
1575 let pending;
1579 let processing;
1580 let completed;
1581 let failed;
1582
1583 match domain {
1590 d if d.contains("example.com") => {
1591 pending = 5;
1592 processing = 2;
1593 completed = 150;
1594 failed = 3;
1595 }
1596 d if d.contains("test") => {
1597 pending = 3;
1598 processing = 1;
1599 completed = 75;
1600 failed = 1;
1601 }
1602 _ => {
1603 pending = 2;
1604 processing = 1;
1605 completed = 50;
1606 failed = 2;
1607 }
1608 }
1609
1610 debug!(
1611 "Domain {} stats: pending={}, processing={}, completed={}, failed={}",
1612 domain, pending, processing, completed, failed
1613 );
1614
1615 Ok((pending, processing, completed, failed))
1616 }
1617
1618 pub async fn remove_duplicates(&self) -> Result<usize> {
1624 info!("🧹 Starting duplicate removal process");
1625
1626 let mut total_removed = 0;
1627
1628 let crawl_queue_removed = self.remove_crawl_queue_duplicates().await?;
1630 total_removed += crawl_queue_removed;
1631
1632 let web_pages_removed = self.remove_webpage_duplicates().await?;
1634 total_removed += web_pages_removed;
1635
1636 if total_removed > 0 {
1637 info!(
1638 "🧹 Duplicate removal completed: {} total items removed",
1639 total_removed
1640 );
1641 } else {
1642 info!("🧹 Duplicate removal completed: no duplicates found");
1643 }
1644
1645 Ok(total_removed)
1646 }
1647
1648 async fn remove_crawl_queue_duplicates(&self) -> Result<usize> {
1650 debug!("Scanning for duplicate crawl queue entries");
1651
1652 let query = r#"
1654 SELECT c.url, COUNT(1) as count
1655 FROM c
1656 GROUP BY c.url
1657 HAVING COUNT(1) > 1
1658 "#;
1659
1660 let duplicates = self.query_crawl_queue_duplicates(query).await?;
1661 let mut removed_count = 0;
1662
1663 for duplicate_url in duplicates {
1664 let duplicate_entries = self.get_crawl_queue_entries_by_url(&duplicate_url).await?;
1666 if duplicate_entries.len() > 1 {
1667 let mut sorted_entries = duplicate_entries;
1669 sorted_entries.sort_by(|a, b| a.created_at.cmp(&b.created_at));
1670
1671 for entry in sorted_entries.iter().skip(1) {
1673 if let Err(e) = self
1674 .delete_crawl_queue_entry(&entry.id, &entry.domain)
1675 .await
1676 {
1677 warn!(
1678 "Failed to delete duplicate crawl queue entry {}: {}",
1679 entry.id, e
1680 );
1681 } else {
1682 debug!("Removed duplicate crawl queue entry: {}", entry.url);
1683 removed_count += 1;
1684 }
1685 }
1686 }
1687 }
1688
1689 if removed_count > 0 {
1690 info!("🧹 Removed {} duplicate crawl queue entries", removed_count);
1691 }
1692
1693 Ok(removed_count)
1694 }
1695
1696 async fn remove_webpage_duplicates(&self) -> Result<usize> {
1698 debug!("Scanning for duplicate web page entries");
1699
1700 let query = r#"
1702 SELECT c.url, COUNT(1) as count
1703 FROM c
1704 GROUP BY c.url
1705 HAVING COUNT(1) > 1
1706 "#;
1707
1708 let duplicates = self.query_webpage_duplicates(query).await?;
1709 let mut removed_count = 0;
1710
1711 for duplicate_url in duplicates {
1712 let duplicate_entries = self.get_webpage_entries_by_url(&duplicate_url).await?;
1714 if duplicate_entries.len() > 1 {
1715 let mut sorted_entries = duplicate_entries;
1717 sorted_entries.sort_by(|a, b| a.indexed_at.cmp(&b.indexed_at));
1718
1719 for entry in sorted_entries.iter().take(sorted_entries.len() - 1) {
1721 if let Err(e) = self.delete_webpage_entry(&entry.id, &entry.domain).await {
1722 warn!(
1723 "Failed to delete duplicate webpage entry {}: {}",
1724 entry.id, e
1725 );
1726 } else {
1727 debug!("Removed duplicate webpage entry: {}", entry.url);
1728 removed_count += 1;
1729 }
1730 }
1731 }
1732 }
1733
1734 if removed_count > 0 {
1735 info!("🧹 Removed {} duplicate webpage entries", removed_count);
1736 }
1737
1738 Ok(removed_count)
1739 }
1740
1741 async fn query_crawl_queue_duplicates(&self, query: &str) -> Result<Vec<String>> {
1743 let url = format!(
1744 "{}/dbs/{}/colls/crawl-queue/docs",
1745 self.config.azure.cosmos_endpoint, self.config.azure.cosmos_database_name
1746 );
1747
1748 let resource_id = format!(
1749 "dbs/{}/colls/crawl-queue",
1750 self.config.azure.cosmos_database_name
1751 );
1752
1753 let (auth_header, date_header) = self.cosmos_auth_headers("post", "docs", &resource_id)?;
1754
1755 let query_request = serde_json::json!({
1756 "query": query,
1757 "parameters": []
1758 });
1759
1760 let response = self
1761 .client
1762 .post(&url)
1763 .header("Authorization", auth_header)
1764 .header("x-ms-date", date_header)
1765 .header("Content-Type", "application/query+json")
1766 .header("x-ms-version", COSMOS_API_VERSION)
1767 .header("x-ms-documentdb-isquery", "true")
1768 .header("x-ms-documentdb-query-enablecrosspartition", "true")
1769 .json(&query_request)
1770 .send()
1771 .await
1772 .context("Failed to query for duplicate crawl queue entries")?;
1773
1774 if !response.status().is_success() {
1775 let status = response.status();
1776 let error_text = response.text().await.unwrap_or_default();
1777 return Err(anyhow::anyhow!(
1778 "Failed to query duplicates with status {}: {}",
1779 status,
1780 error_text
1781 ));
1782 }
1783
1784 let response_text = response.text().await?;
1785 let response_json: serde_json::Value = serde_json::from_str(&response_text)
1786 .context("Failed to parse duplicate query response as JSON")?;
1787
1788 let documents = response_json["Documents"]
1789 .as_array()
1790 .context("No Documents array in response")?;
1791
1792 let mut duplicate_urls = Vec::new();
1793 for doc in documents {
1794 if let Some(url) = doc["url"].as_str() {
1795 duplicate_urls.push(url.to_string());
1796 }
1797 }
1798
1799 Ok(duplicate_urls)
1800 }
1801
1802 async fn query_webpage_duplicates(&self, query: &str) -> Result<Vec<String>> {
1804 let url = format!(
1805 "{}/dbs/{}/colls/{}/docs",
1806 self.config.azure.cosmos_endpoint,
1807 self.config.azure.cosmos_database_name,
1808 self.config.azure.cosmos_container_name
1809 );
1810
1811 let resource_id = format!(
1812 "dbs/{}/colls/{}",
1813 self.config.azure.cosmos_database_name, self.config.azure.cosmos_container_name
1814 );
1815
1816 let (auth_header, date_header) = self.cosmos_auth_headers("post", "docs", &resource_id)?;
1817
1818 let query_request = serde_json::json!({
1819 "query": query,
1820 "parameters": []
1821 });
1822
1823 let response = self
1824 .client
1825 .post(&url)
1826 .header("Authorization", auth_header)
1827 .header("x-ms-date", date_header)
1828 .header("Content-Type", "application/query+json")
1829 .header("x-ms-version", COSMOS_API_VERSION)
1830 .header("x-ms-documentdb-isquery", "true")
1831 .header("x-ms-documentdb-query-enablecrosspartition", "true")
1832 .json(&query_request)
1833 .send()
1834 .await
1835 .context("Failed to query for duplicate webpage entries")?;
1836
1837 if !response.status().is_success() {
1838 let status = response.status();
1839 let error_text = response.text().await.unwrap_or_default();
1840 return Err(anyhow::anyhow!(
1841 "Failed to query webpage duplicates with status {}: {}",
1842 status,
1843 error_text
1844 ));
1845 }
1846
1847 let response_text = response.text().await?;
1848 let response_json: serde_json::Value = serde_json::from_str(&response_text)
1849 .context("Failed to parse webpage duplicate query response as JSON")?;
1850
1851 let documents = response_json["Documents"]
1852 .as_array()
1853 .context("No Documents array in response")?;
1854
1855 let mut duplicate_urls = Vec::new();
1856 for doc in documents {
1857 if let Some(url) = doc["url"].as_str() {
1858 duplicate_urls.push(url.to_string());
1859 }
1860 }
1861
1862 Ok(duplicate_urls)
1863 }
1864
1865 async fn get_crawl_queue_entries_by_url(&self, url: &str) -> Result<Vec<CrawlQueue>> {
1867 let query = format!(r#"SELECT * FROM c WHERE c.url = "{}""#, url);
1868
1869 let query_url = format!(
1870 "{}/dbs/{}/colls/crawl-queue/docs",
1871 self.config.azure.cosmos_endpoint, self.config.azure.cosmos_database_name
1872 );
1873
1874 let resource_id = format!(
1875 "dbs/{}/colls/crawl-queue",
1876 self.config.azure.cosmos_database_name
1877 );
1878
1879 let (auth_header, date_header) = self.cosmos_auth_headers("post", "docs", &resource_id)?;
1880
1881 let query_request = serde_json::json!({
1882 "query": query,
1883 "parameters": []
1884 });
1885
1886 let response = self
1887 .client
1888 .post(&query_url)
1889 .header("Authorization", auth_header)
1890 .header("x-ms-date", date_header)
1891 .header("Content-Type", "application/query+json")
1892 .header("x-ms-version", COSMOS_API_VERSION)
1893 .header("x-ms-documentdb-isquery", "true")
1894 .header("x-ms-documentdb-query-enablecrosspartition", "true")
1895 .json(&query_request)
1896 .send()
1897 .await
1898 .context("Failed to query crawl queue entries by URL")?;
1899
1900 if !response.status().is_success() {
1901 let status = response.status();
1902 let error_text = response.text().await.unwrap_or_default();
1903 return Err(anyhow::anyhow!(
1904 "Failed to query crawl queue entries with status {}: {}",
1905 status,
1906 error_text
1907 ));
1908 }
1909
1910 let response_text = response.text().await?;
1911 let response_json: serde_json::Value = serde_json::from_str(&response_text)
1912 .context("Failed to parse crawl queue entries response as JSON")?;
1913
1914 let documents = response_json["Documents"]
1915 .as_array()
1916 .context("No Documents array in response")?;
1917
1918 let mut entries = Vec::new();
1919 for doc in documents {
1920 match serde_json::from_value::<CrawlQueue>(doc.clone()) {
1921 Ok(entry) => entries.push(entry),
1922 Err(e) => debug!("Failed to parse crawl queue entry: {}", e),
1923 }
1924 }
1925
1926 Ok(entries)
1927 }
1928
1929 async fn get_webpage_entries_by_url(&self, url: &str) -> Result<Vec<WebPage>> {
1931 let query = format!(r#"SELECT * FROM c WHERE c.url = "{}""#, url);
1932
1933 let query_url = format!(
1934 "{}/dbs/{}/colls/{}/docs",
1935 self.config.azure.cosmos_endpoint,
1936 self.config.azure.cosmos_database_name,
1937 self.config.azure.cosmos_container_name
1938 );
1939
1940 let resource_id = format!(
1941 "dbs/{}/colls/{}",
1942 self.config.azure.cosmos_database_name, self.config.azure.cosmos_container_name
1943 );
1944
1945 let (auth_header, date_header) = self.cosmos_auth_headers("post", "docs", &resource_id)?;
1946
1947 let query_request = serde_json::json!({
1948 "query": query,
1949 "parameters": []
1950 });
1951
1952 let response = self
1953 .client
1954 .post(&query_url)
1955 .header("Authorization", auth_header)
1956 .header("x-ms-date", date_header)
1957 .header("Content-Type", "application/query+json")
1958 .header("x-ms-version", COSMOS_API_VERSION)
1959 .header("x-ms-documentdb-isquery", "true")
1960 .header("x-ms-documentdb-query-enablecrosspartition", "true")
1961 .json(&query_request)
1962 .send()
1963 .await
1964 .context("Failed to query webpage entries by URL")?;
1965
1966 if !response.status().is_success() {
1967 let status = response.status();
1968 let error_text = response.text().await.unwrap_or_default();
1969 return Err(anyhow::anyhow!(
1970 "Failed to query webpage entries with status {}: {}",
1971 status,
1972 error_text
1973 ));
1974 }
1975
1976 let response_text = response.text().await?;
1977 let response_json: serde_json::Value = serde_json::from_str(&response_text)
1978 .context("Failed to parse webpage entries response as JSON")?;
1979
1980 let documents = response_json["Documents"]
1981 .as_array()
1982 .context("No Documents array in response")?;
1983
1984 let mut entries = Vec::new();
1985 for doc in documents {
1986 match serde_json::from_value::<WebPage>(doc.clone()) {
1987 Ok(entry) => entries.push(entry),
1988 Err(e) => debug!("Failed to parse webpage entry: {}", e),
1989 }
1990 }
1991
1992 Ok(entries)
1993 }
1994
1995 async fn delete_crawl_queue_entry(&self, id: &str, domain: &str) -> Result<()> {
1997 if let Some(cosmos_client) = &self.cosmos_client {
1999 let db_client = cosmos_client.database_client(&self.config.azure.cosmos_database_name);
2000 let container_client = db_client.container_client("crawl-queue");
2001
2002 let partition_key = PartitionKey::from(&domain.to_string());
2003
2004 match container_client.delete_item(partition_key, id, None).await {
2005 Ok(_) => {
2006 debug!("Successfully deleted crawl queue entry via SDK: {}", id);
2007 return Ok(());
2008 }
2009 Err(e) => {
2010 debug!(
2011 "Failed to delete crawl queue entry via SDK: {}. Falling back to REST API",
2012 e
2013 );
2014 }
2015 }
2016 }
2017
2018 let url = format!(
2020 "{}/dbs/{}/colls/crawl-queue/docs/{}",
2021 self.config.azure.cosmos_endpoint, self.config.azure.cosmos_database_name, id
2022 );
2023
2024 let resource_id = format!(
2025 "dbs/{}/colls/crawl-queue/docs/{}",
2026 self.config.azure.cosmos_database_name, id
2027 );
2028
2029 let (auth_header, date_header) =
2030 self.cosmos_auth_headers("delete", "docs", &resource_id)?;
2031
2032 let response = self
2033 .client
2034 .delete(&url)
2035 .header("Authorization", auth_header)
2036 .header("x-ms-date", date_header)
2037 .header("x-ms-version", COSMOS_API_VERSION)
2038 .header(
2039 "x-ms-documentdb-partitionkey",
2040 format!(r#"["{}"]]"#, domain),
2041 )
2042 .send()
2043 .await
2044 .context("Failed to delete crawl queue entry")?;
2045
2046 if !response.status().is_success() {
2047 let status = response.status();
2048 let error_text = response.text().await.unwrap_or_default();
2049 return Err(anyhow::anyhow!(
2050 "Failed to delete crawl queue entry with status {}: {}",
2051 status,
2052 error_text
2053 ));
2054 }
2055
2056 Ok(())
2057 }
2058
2059 async fn delete_webpage_entry(&self, id: &str, domain: &str) -> Result<()> {
2061 if let Some(cosmos_client) = &self.cosmos_client {
2063 let db_client = cosmos_client.database_client(&self.config.azure.cosmos_database_name);
2064 let container_client =
2065 db_client.container_client(&self.config.azure.cosmos_container_name);
2066
2067 let partition_key = PartitionKey::from(&domain.to_string());
2068
2069 match container_client.delete_item(partition_key, id, None).await {
2070 Ok(_) => {
2071 debug!("Successfully deleted webpage entry via SDK: {}", id);
2072 return Ok(());
2073 }
2074 Err(e) => {
2075 debug!(
2076 "Failed to delete webpage entry via SDK: {}. Falling back to REST API",
2077 e
2078 );
2079 }
2080 }
2081 }
2082
2083 let url = format!(
2085 "{}/dbs/{}/colls/{}/docs/{}",
2086 self.config.azure.cosmos_endpoint,
2087 self.config.azure.cosmos_database_name,
2088 self.config.azure.cosmos_container_name,
2089 id
2090 );
2091
2092 let resource_id = format!(
2093 "dbs/{}/colls/{}/docs/{}",
2094 self.config.azure.cosmos_database_name, self.config.azure.cosmos_container_name, id
2095 );
2096
2097 let (auth_header, date_header) =
2098 self.cosmos_auth_headers("delete", "docs", &resource_id)?;
2099
2100 let response = self
2101 .client
2102 .delete(&url)
2103 .header("Authorization", auth_header)
2104 .header("x-ms-date", date_header)
2105 .header("x-ms-version", COSMOS_API_VERSION)
2106 .header(
2107 "x-ms-documentdb-partitionkey",
2108 format!(r#"["{}"]]"#, domain),
2109 )
2110 .send()
2111 .await
2112 .context("Failed to delete webpage entry")?;
2113
2114 if !response.status().is_success() {
2115 let status = response.status();
2116 let error_text = response.text().await.unwrap_or_default();
2117 return Err(anyhow::anyhow!(
2118 "Failed to delete webpage entry with status {}: {}",
2119 status,
2120 error_text
2121 ));
2122 }
2123
2124 Ok(())
2125 }
2126}
2127
2128#[cfg(test)]
2129mod tests {
2130 use super::*;
2131
2132 fn create_test_config() -> Config {
2133 Config {
2134 environment: "test".to_string(),
2135 azure: crate::config::AzureConfig {
2136 search_service_name: "test".to_string(),
2137 search_api_key: "test".to_string(),
2138 search_api_version: "test".to_string(),
2139 search_index_name: "test".to_string(),
2140 cosmos_endpoint: "https://test-account.documents.azure.com:443/".to_string(),
2141 cosmos_key: "dGVzdGtleQ==".to_string(), cosmos_database_name: "test-db".to_string(),
2143 cosmos_container_name: "test-container".to_string(),
2144 },
2145 application: crate::config::ApplicationConfig {
2146 max_crawl_depth: 1,
2147 crawl_delay_ms: 1000,
2148 max_concurrent_requests: 1,
2149 user_agent: "test".to_string(),
2150 allowed_domains: vec!["example.com".to_string()],
2151 periodic_index_interval_days: 1,
2152 duplicate_removal_interval_hours: 24,
2153 admin_api_key: "test-admin-key".to_string(),
2154 },
2155 }
2156 }
2157
2158 #[test]
2159 fn test_cosmos_client_creation() {
2160 let config = create_test_config();
2164
2165 let result = StorageService::create_cosmos_client(&config);
2167 assert!(result.is_ok(), "Cosmos client creation should succeed");
2168 }
2169
2170 #[test]
2171 fn test_query_normalization() {
2172 let queries = vec![
2173 (" RUST Programming ", "rust programming"),
2174 ("JavaScript", "javascript"),
2175 (" Python ", "python"),
2176 ("Machine Learning", "machine learning"),
2177 ];
2178
2179 for (input, expected) in queries {
2180 let normalized = input.trim().to_lowercase();
2181 assert_eq!(normalized, expected, "Failed to normalize query: {}", input);
2182 }
2183 }
2184
2185 #[test]
2186 fn test_rfc1123_date_format() {
2187 let date = StorageService::get_rfc1123_date();
2188 assert!(date.len() > 20);
2190 assert!(date.ends_with(" GMT"));
2191
2192 let first_word = date.split(',').next().unwrap();
2194 let weekdays = ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"];
2195 assert!(
2196 weekdays.contains(&first_word),
2197 "Date should start with capitalized weekday, got: {}",
2198 first_word
2199 );
2200
2201 let parts: Vec<&str> = date.split(' ').collect();
2203 assert!(
2204 parts.len() >= 5,
2205 "Date should have at least 5 parts separated by spaces"
2206 );
2207 let month = parts[2];
2208 let months = [
2209 "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec",
2210 ];
2211 assert!(
2212 months.contains(&month),
2213 "Month should be capitalized 3-letter abbreviation, got: {}",
2214 month
2215 );
2216
2217 println!("Generated RFC 1123 date: {}", date);
2218 }
2219
2220 #[test]
2221 fn test_signature_generation() {
2222 let result = StorageService::generate_cosmos_signature(
2224 "POST",
2225 "docs",
2226 "dbs/test/colls/test",
2227 "Mon, 01 Jan 2024 00:00:00 GMT",
2228 "dGVzdGtleQ==", );
2230
2231 assert!(result.is_ok(), "Signature generation should succeed");
2232 let signature = result.unwrap();
2233 assert!(!signature.is_empty(), "Signature should not be empty");
2234 assert!(signature.len() > 20, "Signature should be reasonably long");
2236 }
2237
2238 #[test]
2239 fn test_signature_preserves_date_case() {
2240 let result = StorageService::generate_cosmos_signature(
2242 "POST",
2243 "dbs",
2244 "",
2245 "Sat, 07 Jun 2025 01:15:44 GMT",
2246 "dGVzdGtleQ==", );
2248
2249 assert!(
2250 result.is_ok(),
2251 "Signature generation should succeed with proper date format"
2252 );
2253 let signature = result.unwrap();
2254 assert!(!signature.is_empty(), "Signature should not be empty");
2255
2256 let result_lowercase = StorageService::generate_cosmos_signature(
2258 "POST",
2259 "dbs",
2260 "",
2261 "sat, 07 jun 2025 01:15:44 gmt",
2262 "dGVzdGtleQ==",
2263 );
2264
2265 assert!(result_lowercase.is_ok());
2266 let signature_lowercase = result_lowercase.unwrap();
2267
2268 assert_ne!(
2270 signature, signature_lowercase,
2271 "Date case should affect signature"
2272 );
2273 }
2274
2275 #[test]
2276 fn test_duplicate_removal_config() {
2277 let config = create_test_config();
2279 assert_eq!(config.application.duplicate_removal_interval_hours, 24);
2280 }
2281
2282 #[test]
2283 fn test_remove_duplicates_method_exists() {
2284 let config = create_test_config();
2287
2288 assert_eq!(config.application.duplicate_removal_interval_hours, 24);
2291
2292 }
2294
2295 #[test]
2296 fn test_sdk_query_method_signature() {
2297 let config = create_test_config();
2300
2301 assert!(!config.application.allowed_domains.is_empty());
2303 assert_eq!(config.application.allowed_domains[0], "example.com");
2304
2305 let query_result =
2307 Query::from("SELECT * FROM c WHERE c.status = @status ORDER BY c.created_at ASC")
2308 .with_parameter("@status", "Pending");
2309
2310 assert!(query_result.is_ok(), "Query creation should succeed");
2311
2312 let query = query_result.unwrap();
2314 let json_result = serde_json::to_string(&query);
2315 assert!(json_result.is_ok(), "Query should be serializable");
2316
2317 let json = json_result.unwrap();
2318 assert!(
2319 json.contains("Pending"),
2320 "Query should contain the parameter value"
2321 );
2322 assert!(
2323 json.contains("@status"),
2324 "Query should contain the parameter name"
2325 );
2326 }
2327
2328 #[test]
2329 fn test_crawl_queue_structure() {
2330 let now = chrono::Utc::now();
2332
2333 let crawl_item = CrawlQueue {
2334 id: "test-id".to_string(),
2335 url: "https://example.com".to_string(),
2336 domain: "example.com".to_string(),
2337 depth: 0,
2338 status: CrawlStatus::Pending,
2339 created_at: now,
2340 processed_at: None,
2341 error_message: None,
2342 retry_count: 0,
2343 };
2344
2345 let json_result = serde_json::to_string(&crawl_item);
2347 assert!(json_result.is_ok(), "CrawlQueue should be serializable");
2348
2349 let json = json_result.unwrap();
2350 assert!(
2351 json.contains("Pending"),
2352 "Serialized item should contain status"
2353 );
2354 assert!(
2355 json.contains("example.com"),
2356 "Serialized item should contain domain"
2357 );
2358
2359 let deserialize_result: Result<CrawlQueue, _> = serde_json::from_str(&json);
2361 assert!(
2362 deserialize_result.is_ok(),
2363 "CrawlQueue should be deserializable"
2364 );
2365
2366 let deserialized = deserialize_result.unwrap();
2367 assert_eq!(deserialized.domain, "example.com");
2368 assert_eq!(deserialized.status, CrawlStatus::Pending);
2369 }
2370}