Refactor all the things

This commit is contained in:
2025-04-12 14:41:04 -05:00
parent 90dc188dc8
commit 5629a92acf
5 changed files with 210 additions and 75 deletions

64
src/base_crawler.py Normal file
View File

@@ -0,0 +1,64 @@
from typing import Tuple
from urllib.parse import urlparse
from database import Database
from domain_exclusions import DomainExclusions
from logger import Logger
from crawl4ai import AsyncWebCrawler
class BaseCrawler:
def __init__(self, db: Database, domain_exclusions: DomainExclusions, logger: Logger):
self.db = db
self.domain_exclusions = domain_exclusions
self.logger = logger
self.crawler = AsyncWebCrawler()
async def __aenter__(self):
await self.crawler.__aenter__()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.crawler.__aexit__(exc_type, exc_val, exc_tb)
def should_skip_url(self, url: str) -> Tuple[bool, str]:
# Skip about: or chrome: URLs
if url.startswith("about:") or url.startswith("chrome:"):
return True, "Browser internal URL"
domain = urlparse(url).netloc
# Check domain exclusions
if self.domain_exclusions.is_excluded(domain):
return True, "Excluded domain"
# Check if URL exists
if self.db.url_exists(url):
return True, "URL already processed"
return False, ""
async def crawl_url(self, url: str, default_title: str = None) -> Tuple[bool, dict]:
try:
result = await self.crawler.arun(url=url)
crawl_result = result[0]
title = crawl_result.metadata.get('title') or default_title or url.split("/")[-1]
content = crawl_result.markdown
self.db.add_history(
url=url,
title=title,
content=content
)
return True, {
"url": url,
"title": title,
"status": "received"
}
except Exception as e:
self.logger.error(f"Error processing URL {url}: {str(e)}")
return False, {
"url": url,
"title": default_title or url.split("/")[-1],
"status": "error",
"error": str(e)
}

View File

@@ -46,6 +46,17 @@ class Database:
updated TIMESTAMP NOT NULL
)
''')
# Add index on url column
self.cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_history_url ON history(url)
''')
# Add unique index on url column
self.cursor.execute('''
CREATE UNIQUE INDEX IF NOT EXISTS idx_history_url ON history(url)
''')
self.conn.commit()
def add_history(self, url: str, title: str, content: str) -> int:
@@ -104,6 +115,11 @@ class Database:
self.conn.commit()
return self.cursor.rowcount > 0
def url_exists(self, url: str) -> bool:
"""Check if a URL already exists in the database."""
self.cursor.execute('SELECT 1 FROM history WHERE url = ? LIMIT 1', (url,))
return self.cursor.fetchone() is not None
def __del__(self):
"""Cleanup database connection."""
if hasattr(self, 'conn'):

View File

@@ -2,7 +2,7 @@ import yaml
from fnmatch import fnmatch
class DomainExclusions:
def __init__(self, config_path="history_config.yaml"):
def __init__(self, config_path="config/history_config.yaml"):
self.excluded_domains = []
self.load_config(config_path)
@@ -12,8 +12,11 @@ class DomainExclusions:
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
# Get the excluded_domains list from config, defaulting to empty list if not found
self.excluded_domains = config.get('excluded_domains', [])
# Handle both direct list and dict with 'excluded_domains' key
if isinstance(config, list):
self.excluded_domains = config
else:
self.excluded_domains = config.get('excluded_domains', [])
except FileNotFoundError:
print(f"Warning: Configuration file {config_path} not found. No domains will be excluded.")
except yaml.YAMLError as e:
@@ -23,12 +26,30 @@ class DomainExclusions:
def is_excluded(self, domain):
"""
Check if a domain matches any of the excluded domain patterns.
Supports wildcards (*, ?) in the excluded domain patterns.
Args:
domain (str): The domain to check
Returns:
bool: True if the domain should be excluded, False otherwise
"""
return any(fnmatch(domain.lower(), pattern.lower()) for pattern in self.excluded_domains)
# Strip protocol (http:// or https://) if present
domain = domain.lower().strip('/')
if '://' in domain:
domain = domain.split('://', 1)[1]
# Strip query parameters if present
if '?' in domain:
domain = domain.split('?', 1)[0]
# Split domain and path
if '/' in domain:
domain = domain.split('/', 1)[0]
for pattern in self.excluded_domains:
pattern = pattern.lower().strip('/')
if '/' in pattern:
pattern = pattern.split('/', 1)[0]
# Remove trailing wildcard if present
if pattern.endswith('*'):
pattern = pattern.rstrip('*').rstrip('.')
# Use fnmatch for proper wildcard pattern matching
if fnmatch(domain, pattern):
return True
return False

View File

@@ -1,76 +1,115 @@
from fastapi import FastAPI, WebSocket
from starlette.websockets import WebSocketDisconnect
import uvicorn
from logger import Logger
import os
from database import Database
from crawl4ai import AsyncWebCrawler
from domain_exclusions import DomainExclusions
from urllib.parse import urlparse
from base_crawler import BaseCrawler
import asyncio
from contextlib import asynccontextmanager
from browser_history import get_history
# Create logs directory if it doesn't exist
os.makedirs('logs', exist_ok=True)
app = FastAPI()
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
global history_crawler
logger.info("Initializing crawler and loading browser history...")
try:
# Initialize history crawler
history_crawler = HistoryCrawler(db, domain_exclusions, logger)
async with history_crawler: # Use async context manager
outputs = get_history()
history_crawler.crawl_queue = outputs.histories
logger.info(f"Loaded {len(history_crawler.crawl_queue)} URLs from browser history")
# Start the crawler in the background
task = asyncio.create_task(history_crawler.start_crawler())
yield
# Stop the crawler
history_crawler.is_running = False
await task # Wait for crawler to finish
except Exception as e:
logger.error(f"Error during startup: {str(e)}")
yield
app = FastAPI(lifespan=lifespan)
logger = Logger()
db = Database()
domain_exclusions = DomainExclusions()
class HistoryCrawler(BaseCrawler):
def __init__(self, db: Database, domain_exclusions: DomainExclusions, logger: Logger):
super().__init__(db, domain_exclusions, logger)
self.crawl_queue = []
self.is_running = True
async def start_crawler(self):
while self.is_running and self.crawl_queue:
timestamp, url, title = self.crawl_queue.pop(0)
should_skip, skip_reason = self.should_skip_url(url)
if should_skip:
self.logger.info(f"Skipping URL from history: {url} ({skip_reason})")
continue
success, result = await self.crawl_url(url, title)
if success:
self.logger.info(f"Processed historical URL: {url}")
await asyncio.sleep(30) # Wait 30 seconds before next crawl
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
logger.info("New WebSocket connection established")
async with AsyncWebCrawler() as crawler:
try:
while True:
data = await websocket.receive_json()
url = data["url"]
domain = urlparse(url).netloc
ws_crawler = BaseCrawler(db, domain_exclusions, logger)
# Check if domain is excluded
if domain_exclusions.is_excluded(domain):
logger.info(f"Skipping excluded domain: {domain}")
await websocket.send_json({
"status": "skipped",
"data": {
"url": url,
"title": "Excluded Domain",
"timestamp": data["timestamp"]
}
})
continue
try:
while True:
data = await websocket.receive_json()
url = data["url"]
try:
result = await crawler.arun(url=url)
crawl_result = result[0]
title = crawl_result.metadata.get('title') or url.split("/")[-1]
content = crawl_result.markdown
except Exception as crawl_error:
logger.error(f"Crawling error for {url}: {str(crawl_error)}")
title = url.split("/")[-1]
content = str(data)
db.add_history(
url=url,
title=title,
content=content
)
logger.info(f"Processed URL: {url} - {title}")
should_skip, skip_reason = ws_crawler.should_skip_url(url)
if should_skip:
logger.info(f"Skipping URL: {url} ({skip_reason})")
await websocket.send_json({
"status": "received",
"status": "skipped",
"data": {
"url": url,
"title": title,
"title": skip_reason,
"timestamp": data["timestamp"]
}
})
except Exception as e:
logger.error(f"WebSocket error: {str(e)}")
continue
success, result = await ws_crawler.crawl_url(url)
await websocket.send_json({
"status": result["status"],
"data": {
"url": result["url"],
"title": result["title"],
"timestamp": data["timestamp"]
}
})
except WebSocketDisconnect:
logger.info("WebSocket connection closed by client")
except Exception as e:
logger.error(f"WebSocket error: {str(e)}")
try:
await websocket.close()
finally:
logger.info("WebSocket connection closed")
except RuntimeError:
# Connection might already be closed
pass
finally:
logger.info("WebSocket connection closed")
if __name__ == "__main__":
logger.info("Starting WebSocket server...")