From 687bbb198e198ce4b13950c96d30b4261d28c588 Mon Sep 17 00:00:00 2001 From: Zetaphor Date: Sun, 26 Jan 2025 01:01:21 -0600 Subject: [PATCH] Fix duplicate records --- app/database.py | 48 ++++++++++++++- app/main.py | 87 +++++++++++++++++++++++++-- app/scheduler.py | 126 ++++++++++++++++++++++++++-------------- run-browser-recall.fish | 8 +++ 4 files changed, 219 insertions(+), 50 deletions(-) create mode 100755 run-browser-recall.fish diff --git a/app/database.py b/app/database.py index dfbf7b2..38a930c 100644 --- a/app/database.py +++ b/app/database.py @@ -1,4 +1,4 @@ -from sqlalchemy import create_engine, Column, Integer, String, DateTime, Text, event +from sqlalchemy import create_engine, Column, Integer, String, DateTime, Text, event, text from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from datetime import datetime @@ -141,5 +141,51 @@ def get_db(): db = SessionLocal() try: yield db + finally: + db.close() + +def get_last_processed_timestamp(source): + """ + Get last processed timestamp for a source (e.g., 'chrome_history', 'chrome_bookmarks') + """ + db = next(get_db()) + try: + result = db.execute( + text('SELECT last_timestamp FROM last_processed WHERE source = :source'), + {'source': source} + ).fetchone() + return result[0] if result else 0 + finally: + db.close() + +def update_last_processed_timestamp(source, timestamp): + """ + Update last processed timestamp for a source + """ + db = next(get_db()) + try: + db.execute( + text(''' + INSERT OR REPLACE INTO last_processed (source, last_timestamp) + VALUES (:source, :timestamp) + '''), + {'source': source, 'timestamp': timestamp} + ) + db.commit() + finally: + db.close() + +def create_tables(): + db = next(get_db()) + try: + db.execute( + text(''' + CREATE TABLE IF NOT EXISTS last_processed ( + source TEXT PRIMARY KEY, + last_timestamp INTEGER + ) + ''') + ) + db.commit() finally: db.close() \ No newline at end of file diff --git a/app/main.py b/app/main.py index eac0600..6df2bfa 100644 --- a/app/main.py +++ b/app/main.py @@ -15,8 +15,16 @@ from .logging_config import setup_logger from fastapi.templating import Jinja2Templates from fastapi.staticfiles import StaticFiles from fastapi import Request +import browser_history -from .database import get_db, HistoryEntry, Bookmark +from .database import ( + get_db, + HistoryEntry, + Bookmark, + get_last_processed_timestamp, + update_last_processed_timestamp, + create_tables +) from .scheduler import HistoryScheduler from .page_info import PageInfo from .page_reader import PageReader @@ -43,10 +51,22 @@ app.mount("/static", StaticFiles(directory="app/static"), name="static") @app.on_event("startup") async def startup_event(): logger.info("Starting application") - # Initial bookmark fetch - await scheduler.update_bookmarks() - # Start the background task - asyncio.create_task(scheduler.update_history()) + + # Create necessary tables + create_tables() + + # Initial history and bookmark fetch + try: + # Process history + process_browser_history() + + # Process bookmarks + await scheduler.update_bookmarks() + + # Start the background tasks + asyncio.create_task(scheduler.update_history()) + except Exception as e: + logger.error(f"Error during startup: {str(e)}") def serialize_history_entry(entry, include_content: bool = False): """Serialize a HistoryEntry object to a dictionary""" @@ -378,4 +398,59 @@ async def bookmarks_page(request: Request, db: Session = Depends(get_db)): return templates.TemplateResponse( "bookmarks.html", {"request": request, "bookmarks": bookmarks} - ) \ No newline at end of file + ) + +def process_browser_history(): + try: + logger.info("Starting browser history processing") + outputs = browser_history.get_history() + history_list = outputs.histories # This is a list of tuples (timestamp, url, title) + logger.info(f"Found {len(history_list)} total history items") + + current_timestamp = int(datetime.now().timestamp()) + source_key = "browser_history" # Single source since we get combined history + last_timestamp = get_last_processed_timestamp(source_key) + + logger.info(f"Last processed timestamp: {last_timestamp}") + + # Filter for only new entries + new_entries = [ + entry for entry in history_list + if entry[0].timestamp() > last_timestamp + ] + + logger.info(f"Found {len(new_entries)} new entries") + + if new_entries: + for timestamp, url, title in new_entries: + logger.info(f"Processing entry: {timestamp} - {url}") + domain = urlparse(url).netloc + if config.is_domain_ignored(domain): + logger.debug(f"Skipping ignored domain: {domain}") + continue + + # Create history entry + db = next(get_db()) + try: + history_entry = HistoryEntry( + url=url, + title=title, + visit_time=timestamp, + domain=domain + ) + db.add(history_entry) + db.commit() + except Exception as e: + logger.error(f"Error storing history item: {str(e)}") + db.rollback() + finally: + db.close() + + # Update the last processed timestamp + update_last_processed_timestamp(source_key, current_timestamp) + logger.info(f"Updated timestamp to {current_timestamp}") + + logger.info(f"Processed {len(new_entries)} new items") + + except Exception as e: + logger.error(f"Error processing browser history: {str(e)}", exc_info=True) \ No newline at end of file diff --git a/app/scheduler.py b/app/scheduler.py index a730888..a81de64 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -1,7 +1,7 @@ from fastapi import BackgroundTasks from datetime import datetime, timedelta import asyncio -from .database import SessionLocal, HistoryEntry, Bookmark +from .database import SessionLocal, HistoryEntry, Bookmark, get_last_processed_timestamp, update_last_processed_timestamp from .browser import BrowserHistoryCollector from .page_reader import PageReader from sqlalchemy import func @@ -10,6 +10,9 @@ import pytz from .config import Config from .database import get_db from urllib.parse import urlparse +import logging + +logger = logging.getLogger(__name__) class HistoryScheduler: def __init__(self): @@ -18,6 +21,7 @@ class HistoryScheduler: self.last_history_update = None self.content_update_interval = timedelta(hours=24) # Update content daily self.config = Config() + self.db_lock = asyncio.Lock() def _normalize_datetime(self, dt: datetime) -> datetime: """Convert datetime to UTC if it has timezone, or make it timezone-aware if it doesn't""" @@ -32,68 +36,104 @@ class HistoryScheduler: return dt.astimezone(pytz.UTC) async def update_bookmarks(self): - """Update bookmarks from browser""" + """Update bookmarks from browsers""" try: - db = next(get_db()) + current_timestamp = int(datetime.now().timestamp()) + source_key = "browser_bookmarks" + last_timestamp = get_last_processed_timestamp(source_key) + + logger.info(f"Fetching bookmarks. Last processed timestamp: {last_timestamp}") bookmarks = self.browser_collector.fetch_bookmarks() + logger.info(f"Found {len(bookmarks)} total bookmarks") - for added_time, url, title, folder in bookmarks: # Unpack the tuple - # Extract domain and check if it should be ignored - domain = urlparse(url).netloc - if self.config.is_domain_ignored(domain): - continue + # Filter for only new bookmarks + new_bookmarks = [ + (added_time, url, title, folder) for added_time, url, title, folder in bookmarks + if self._normalize_datetime(added_time).timestamp() > last_timestamp + ] - # Normalize the datetime - added_time = self._normalize_datetime(added_time) + logger.info(f"Found {len(new_bookmarks)} new bookmarks to process") - # Process the bookmark only if domain is not ignored - bookmark_entry = Bookmark( - url=url, - title=title, - added_time=added_time, - folder=folder, - domain=domain - ) - db.add(bookmark_entry) + if new_bookmarks: + async with self.db_lock: + with next(get_db()) as db: + added_count = 0 + for added_time, url, title, folder in new_bookmarks: + domain = urlparse(url).netloc + if self.config.is_domain_ignored(domain): + logger.debug(f"Skipping ignored domain: {domain}") + continue - db.commit() + added_time = self._normalize_datetime(added_time) + + bookmark = Bookmark( + url=url, + title=title, + added_time=added_time, + folder=folder, + domain=domain + ) + db.add(bookmark) + added_count += 1 + + db.commit() + logger.info(f"Successfully added {added_count} new bookmarks") + + update_last_processed_timestamp(source_key, current_timestamp) + logger.info(f"Updated last processed timestamp to {current_timestamp}") except Exception as e: - print(f"Error updating bookmarks: {e}") - finally: - db.close() + logger.error(f"Error updating bookmarks: {str(e)}", exc_info=True) async def update_history(self): """Background task to update history periodically""" while True: try: - db = next(get_db()) + current_timestamp = int(datetime.now().timestamp()) + source_key = "browser_history" + last_timestamp = get_last_processed_timestamp(source_key) + + logger.info(f"Fetching history. Last processed timestamp: {last_timestamp}") history_entries = self.browser_collector.fetch_history() + logger.info(f"Found {len(history_entries)} total history entries") - for visit_time, url, title in history_entries: # Unpack the tuple - # Extract domain and check if it should be ignored - domain = urlparse(url).netloc - if self.config.is_domain_ignored(domain): - continue + # Filter for only new entries + new_entries = [ + (visit_time, url, title) for visit_time, url, title in history_entries + if self._normalize_datetime(visit_time).timestamp() > last_timestamp + ] - # Normalize the datetime - visit_time = self._normalize_datetime(visit_time) + logger.info(f"Found {len(new_entries)} new history entries to process") - # Process the entry only if domain is not ignored - history_entry = HistoryEntry( - url=url, - title=title, - visit_time=visit_time, - domain=domain - ) - db.add(history_entry) + if new_entries: + async with self.db_lock: + with next(get_db()) as db: + added_count = 0 + for visit_time, url, title in new_entries: + domain = urlparse(url).netloc + if self.config.is_domain_ignored(domain): + logger.debug(f"Skipping ignored domain: {domain}") + continue - db.commit() + visit_time = self._normalize_datetime(visit_time) + + history_entry = HistoryEntry( + url=url, + title=title, + visit_time=visit_time, + domain=domain + ) + db.add(history_entry) + added_count += 1 + + db.commit() + logger.info(f"Successfully added {added_count} new history entries") + + update_last_processed_timestamp(source_key, current_timestamp) + logger.info(f"Updated last processed timestamp to {current_timestamp}") except Exception as e: - print(f"Error updating history: {e}") - finally: - db.close() + logger.error(f"Error updating history: {str(e)}", exc_info=True) await asyncio.sleep(300) # Wait 5 minutes before next update diff --git a/run-browser-recall.fish b/run-browser-recall.fish new file mode 100755 index 0000000..d30889f --- /dev/null +++ b/run-browser-recall.fish @@ -0,0 +1,8 @@ +#!/usr/bin/env fish + +# Activate the virtual environment and run main.py silently +vf activate general +python main.py > /dev/null 2>&1 & + +# Print a simple confirmation message +echo "Browser Recall started in background" \ No newline at end of file