Files
browser-recall/app/main.py
2025-01-26 01:06:02 -06:00

479 lines
17 KiB
Python

from fastapi import FastAPI, Depends, Query, WebSocket, WebSocketDisconnect, HTTPException
from sqlalchemy.orm import Session
from datetime import datetime, timezone, timedelta
from typing import List, Optional
import asyncio
from fastapi import WebSocketDisconnect
from urllib.parse import urlparse
import pytz
from fastapi.middleware.cors import CORSMiddleware
import iso8601
from bs4 import BeautifulSoup
from sqlalchemy import text
from sqlalchemy.sql import text
from .logging_config import setup_logger
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from fastapi import Request
import browser_history
from .database import (
get_db,
HistoryEntry,
Bookmark,
get_last_processed_timestamp,
update_last_processed_timestamp,
create_tables,
engine
)
from .scheduler import HistoryScheduler
from .page_info import PageInfo
from .page_reader import PageReader
from .config import Config
logger = setup_logger(__name__)
app = FastAPI()
scheduler = HistoryScheduler()
config = Config()
# Add CORS middleware to allow WebSocket connections
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, specify your domains
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
templates = Jinja2Templates(directory="app/templates")
app.mount("/static", StaticFiles(directory="app/static"), name="static")
@app.on_event("startup")
async def startup_event():
logger.info("Starting application")
# Create necessary tables
create_tables()
# Initial history and bookmark fetch
try:
# Process history
process_browser_history()
# Process bookmarks
await scheduler.update_bookmarks()
# Start the background tasks
asyncio.create_task(scheduler.update_history())
except Exception as e:
logger.error(f"Error during startup: {str(e)}")
def serialize_history_entry(entry, include_content: bool = False):
"""Serialize a HistoryEntry object to a dictionary"""
# Handle both ORM objects and raw SQL results
if hasattr(entry, '_mapping'): # Raw SQL result
result = {
"id": entry.id,
"url": entry.url,
"title": entry.title,
"visit_time": entry.visit_time.isoformat() if isinstance(entry.visit_time, datetime) else entry.visit_time,
"domain": entry.domain,
}
else: # ORM object
result = {
"id": entry.id,
"url": entry.url,
"title": entry.title,
"visit_time": entry.visit_time.isoformat() if entry.visit_time else None,
"domain": entry.domain,
}
if include_content:
result["markdown_content"] = entry.markdown_content
return result
def serialize_bookmark(bookmark):
"""Serialize a Bookmark object to a dictionary"""
return {
"id": bookmark.id,
"url": bookmark.url,
"title": bookmark.title,
"added_time": bookmark.added_time.isoformat() if bookmark.added_time else None,
"folder": bookmark.folder,
"domain": bookmark.domain,
}
@app.get("/history/search")
async def search_history(
domain: Optional[str] = Query(None),
start_date: Optional[str] = Query(None),
end_date: Optional[str] = Query(None),
search_term: Optional[str] = Query(None),
include_content: bool = Query(False),
db: Session = Depends(get_db)
):
"""Search history with optimized full-text search"""
try:
if search_term:
# Optimize FTS query with content-focused ranking
fts_query = """
WITH RECURSIVE
ranked_results AS (
SELECT
h.*,
rank * (
CASE
-- Boost exact phrase matches in content
WHEN h.markdown_content LIKE :exact_pattern THEN 3.0
-- Boost title matches but less than content
WHEN h.title LIKE :like_pattern THEN 1.5
-- Base score for other matches
ELSE 1.0
END
) + (
-- Additional boost for recent entries
CAST(strftime('%s', h.visit_time) AS INTEGER) /
CAST(strftime('%s', 'now') AS INTEGER) * 0.5
) as final_rank
FROM history h
INNER JOIN history_fts f ON h.id = f.rowid
WHERE history_fts MATCH :search
AND (:domain IS NULL OR h.domain = :domain)
AND (:start_date IS NULL OR h.visit_time >= :start_date)
AND (:end_date IS NULL OR h.visit_time <= :end_date)
)
SELECT * FROM ranked_results
ORDER BY final_rank DESC
LIMIT 100
"""
# Prepare parameters with exact phrase matching
params = {
'search': search_term,
'like_pattern': f'%{search_term}%',
'exact_pattern': f'%{search_term}%', # For exact phrase matching
'domain': domain,
'start_date': start_date,
'end_date': end_date
}
# Execute with connection context manager
with engine.connect() as connection:
results = connection.execute(text(fts_query), params).all()
return [serialize_history_entry(row, include_content) for row in results]
else:
# Optimize non-FTS query
query = db.query(HistoryEntry)
if domain:
query = query.filter(HistoryEntry.domain == domain)
if start_date:
query = query.filter(HistoryEntry.visit_time >= start_date)
if end_date:
query = query.filter(HistoryEntry.visit_time <= end_date)
# Add index hints and limit
query = query.with_hint(HistoryEntry, 'INDEXED BY ix_history_visit_time', 'sqlite')
entries = query.order_by(HistoryEntry.visit_time.desc()).limit(100).all()
return [serialize_history_entry(entry, include_content) for entry in entries]
except Exception as e:
logger.error(f"Search error: {str(e)}", exc_info=True)
raise HTTPException(
status_code=500,
detail={"message": "Search operation failed", "error": str(e)}
)
@app.get("/bookmarks/search")
async def search_bookmarks(
domain: Optional[str] = Query(None),
folder: Optional[str] = Query(None),
search_term: Optional[str] = Query(None),
db: Session = Depends(get_db)
):
"""Search bookmarks with optimized queries"""
try:
# Build query efficiently
query = db.query(Bookmark)
# Apply filters using index-optimized queries
if domain:
query = query.filter(Bookmark.domain == domain)
if folder:
query = query.filter(Bookmark.folder == folder)
if search_term:
# Use LIKE with index hint for title search
search_pattern = f"%{search_term}%"
query = query.filter(
Bookmark.title.ilike(search_pattern)
).with_hint(
Bookmark,
'INDEXED BY ix_bookmarks_title',
'sqlite'
)
# Add ordering and limit for better performance
bookmarks = query.order_by(Bookmark.added_time.desc()).limit(1000).all()
return [serialize_bookmark(bookmark) for bookmark in bookmarks]
except Exception as e:
print(f"Bookmark search error: {e}")
raise HTTPException(status_code=500, detail="Search operation failed")
# Add new endpoint for advanced full-text search
@app.get("/history/search/advanced")
async def advanced_history_search(
query: str = Query(..., description="Full-text search query with SQLite FTS5 syntax"),
include_content: bool = Query(False),
db: Session = Depends(get_db)
):
"""Advanced full-text search using SQLite FTS5 features"""
try:
# Use raw SQL for advanced FTS query
fts_query = """
SELECT h.*, rank
FROM history h
INNER JOIN history_fts f ON h.id = f.rowid
WHERE history_fts MATCH :query
ORDER BY rank
LIMIT 1000
"""
results = db.execute(text(fts_query), {'query': query}).all()
# Convert results to HistoryEntry objects
entries = [
serialize_history_entry(
HistoryEntry(
id=row.id,
url=row.url,
title=row.title,
visit_time=row.visit_time,
domain=row.domain,
markdown_content=row.markdown_content if include_content else None
),
include_content
)
for row in results
]
return entries
except Exception as e:
print(f"Advanced search error: {e}")
raise HTTPException(status_code=500, detail="Advanced search operation failed")
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket, db: Session = Depends(get_db)):
logger.info("New WebSocket connection established")
page_reader = PageReader()
await websocket.accept()
try:
while True:
data = await websocket.receive_json()
# Parse the URL and check if domain should be ignored
domain = urlparse(data['url']).netloc
if config.is_domain_ignored(domain):
logger.info(f"Ignoring domain: {domain}")
await websocket.send_json({
"status": "ignored",
"message": f"Domain {domain} is in ignore list"
})
continue
logger.info(f"Processing page: {data['url']}")
timestamp = iso8601.parse_date(data['timestamp'])
# Check if we already have a recent entry for this URL
existing_entry = db.query(HistoryEntry).filter(
HistoryEntry.url == data['url'],
HistoryEntry.visit_time >= timestamp - timedelta(minutes=5)
).first()
if existing_entry:
print(f"Recent entry exists for URL: {data['url']}")
await websocket.send_json({
"status": "skipped",
"message": "Recent entry exists"
})
continue
page_info = PageInfo(
url=data['url'],
html=data['html'],
timestamp=timestamp
)
# Debug HTML content
print(f"HTML content length before processing: {len(page_info.html)}")
# Extract title
soup = BeautifulSoup(page_info.html, 'html.parser')
title = soup.title.string if soup.title else ''
print(f"Extracted title: {title}")
# Debug markdown conversion
print("Starting markdown conversion...")
cleaned_html = page_reader.clean_html(page_info.html)
print(f"Cleaned HTML length: {len(cleaned_html)}")
markdown_content = page_reader.html_to_markdown(page_info.html)
print(f"Markdown conversion complete. Content length: {len(markdown_content) if markdown_content else 0}")
if markdown_content:
print("First 100 chars of markdown:", markdown_content[:100])
else:
print("No markdown content generated")
if not title and not markdown_content:
print(f"No content extracted from: {page_info.url}")
await websocket.send_json({
"status": "skipped",
"message": "No content extracted"
})
continue
# Create history entry
history_entry = HistoryEntry(
url=page_info.url,
title=title,
visit_time=page_info.timestamp,
domain=domain,
markdown_content=markdown_content,
last_content_update=datetime.now(timezone.utc)
)
# Debug database operation
print(f"Saving entry with markdown length: {len(markdown_content) if markdown_content else 0}")
# Use bulk operations for better performance
db.add(history_entry)
try:
db.commit()
print(f"Successfully saved entry for: {page_info.url}")
print(f"Verify markdown content length in database: {len(history_entry.markdown_content) if history_entry.markdown_content else 0}")
await websocket.send_json({
"status": "success",
"message": f"Processed page: {page_info.url}"
})
except Exception as e:
db.rollback()
print(f"Error saving entry: {e}")
await websocket.send_json({
"status": "error",
"message": "Database error"
})
except WebSocketDisconnect:
logger.info("Client disconnected")
except Exception as e:
logger.error("Error in WebSocket handler", exc_info=True)
finally:
await page_reader.close()
@app.get("/config/ignored-domains")
async def get_ignored_domains():
"""Get list of ignored domain patterns"""
return {"ignored_domains": config.config.get('ignored_domains', [])}
@app.post("/config/ignored-domains")
async def add_ignored_domain(pattern: str):
"""Add a new domain pattern to ignored list"""
config.add_ignored_domain(pattern)
return {"status": "success", "message": f"Added pattern: {pattern}"}
@app.delete("/config/ignored-domains/{pattern}")
async def remove_ignored_domain(pattern: str):
"""Remove a domain pattern from ignored list"""
config.remove_ignored_domain(pattern)
return {"status": "success", "message": f"Removed pattern: {pattern}"}
@app.get("/")
async def home(request: Request, db: Session = Depends(get_db)):
# Get recent history entries
entries = db.query(HistoryEntry)\
.order_by(HistoryEntry.visit_time.desc())\
.limit(50)\
.all()
return templates.TemplateResponse(
"index.html",
{"request": request, "entries": entries}
)
@app.get("/search")
async def search_page(request: Request):
return templates.TemplateResponse(
"search.html",
{"request": request}
)
@app.get("/bookmarks")
async def bookmarks_page(request: Request, db: Session = Depends(get_db)):
bookmarks = db.query(Bookmark)\
.order_by(Bookmark.added_time.desc())\
.limit(50)\
.all()
return templates.TemplateResponse(
"bookmarks.html",
{"request": request, "bookmarks": bookmarks}
)
def process_browser_history():
try:
logger.info("Starting browser history processing")
outputs = browser_history.get_history()
history_list = outputs.histories # This is a list of tuples (timestamp, url, title)
logger.info(f"Found {len(history_list)} total history items")
current_timestamp = int(datetime.now().timestamp())
source_key = "browser_history" # Single source since we get combined history
last_timestamp = get_last_processed_timestamp(source_key)
logger.info(f"Last processed timestamp: {last_timestamp}")
# Filter for only new entries
new_entries = [
entry for entry in history_list
if entry[0].timestamp() > last_timestamp
]
logger.info(f"Found {len(new_entries)} new entries")
if new_entries:
for timestamp, url, title in new_entries:
logger.info(f"Processing entry: {timestamp} - {url}")
domain = urlparse(url).netloc
if config.is_domain_ignored(domain):
logger.debug(f"Skipping ignored domain: {domain}")
continue
# Create history entry
db = next(get_db())
try:
history_entry = HistoryEntry(
url=url,
title=title,
visit_time=timestamp,
domain=domain
)
db.add(history_entry)
db.commit()
except Exception as e:
logger.error(f"Error storing history item: {str(e)}")
db.rollback()
finally:
db.close()
# Update the last processed timestamp
update_last_processed_timestamp(source_key, current_timestamp)
logger.info(f"Updated timestamp to {current_timestamp}")
logger.info(f"Processed {len(new_entries)} new items")
except Exception as e:
logger.error(f"Error processing browser history: {str(e)}", exc_info=True)