Initial commit

This commit is contained in:
2025-01-25 19:04:20 -06:00
commit d556823350
10 changed files with 334 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
__pycache__/

1
.python-version Normal file
View File

@@ -0,0 +1 @@
3.10.6

18
app/browser.py Normal file
View File

@@ -0,0 +1,18 @@
from datetime import datetime
from typing import List, Tuple
from browser_history import get_history, get_bookmarks
from urllib.parse import urlparse
class BrowserHistoryCollector:
@staticmethod
def get_domain(url: str) -> str:
return urlparse(url).netloc
def fetch_history(self) -> List[Tuple[datetime, str, str]]:
outputs = get_history()
# Returns list of tuples containing (datetime, url, title)
return [(entry[0], entry[1], entry[2]) for entry in outputs.histories]
def fetch_bookmarks(self) -> List[Tuple[datetime, str, str, str]]:
outputs = get_bookmarks()
return outputs.bookmarks

38
app/database.py Normal file
View File

@@ -0,0 +1,38 @@
from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
SQLALCHEMY_DATABASE_URL = "sqlite:///./browser_history.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
class HistoryEntry(Base):
__tablename__ = "history_entries"
id = Column(Integer, primary_key=True, index=True)
url = Column(String, index=True)
title = Column(String, nullable=True)
visit_time = Column(DateTime, index=True)
domain = Column(String, index=True)
class Bookmark(Base):
__tablename__ = "bookmarks"
id = Column(Integer, primary_key=True, index=True)
url = Column(String, index=True)
title = Column(String, nullable=True)
added_time = Column(DateTime, index=True)
folder = Column(String, index=True)
domain = Column(String, index=True)
Base.metadata.create_all(bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

62
app/main.py Normal file
View File

@@ -0,0 +1,62 @@
from fastapi import FastAPI, Depends, Query
from sqlalchemy.orm import Session
from datetime import datetime
from typing import List
import asyncio
from .database import get_db, HistoryEntry, Bookmark
from .scheduler import HistoryScheduler
app = FastAPI()
scheduler = HistoryScheduler()
@app.on_event("startup")
async def startup_event():
# Initial bookmark fetch
await scheduler.update_bookmarks()
# Start the background task
asyncio.create_task(scheduler.update_history())
@app.get("/history/search")
async def search_history(
domain: str = Query(None),
start_date: datetime = Query(None),
end_date: datetime = Query(None),
search_term: str = Query(None),
db: Session = Depends(get_db)
):
query = db.query(HistoryEntry)
if domain:
query = query.filter(HistoryEntry.domain == domain)
if start_date:
query = query.filter(HistoryEntry.visit_time >= start_date)
if end_date:
query = query.filter(HistoryEntry.visit_time <= end_date)
if search_term:
query = query.filter(HistoryEntry.title.ilike(f"%{search_term}%"))
return query.all()
@app.get("/bookmarks/search")
async def search_bookmarks(
domain: str = Query(None),
folder: str = Query(None),
search_term: str = Query(None),
db: Session = Depends(get_db)
):
query = db.query(Bookmark)
if domain:
query = query.filter(Bookmark.domain == domain)
if folder:
query = query.filter(Bookmark.folder == folder)
if search_term:
query = query.filter(Bookmark.title.ilike(f"%{search_term}%"))
return query.all()

16
app/page_info.py Normal file
View File

@@ -0,0 +1,16 @@
import asyncio
import aiohttp
from bs4 import BeautifulSoup
from typing import Optional
class PageInfoFetcher:
async def get_page_title(self, url: str) -> Optional[str]:
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=5) as response:
if response.status == 200:
html = await response.text()
soup = BeautifulSoup(html, 'html.parser')
return soup.title.string if soup.title else None
except:
return None

85
app/scheduler.py Normal file
View File

@@ -0,0 +1,85 @@
from fastapi import BackgroundTasks
from datetime import datetime, timedelta
import asyncio
from .database import SessionLocal, HistoryEntry, Bookmark
from .browser import BrowserHistoryCollector
from .page_info import PageInfoFetcher
from sqlalchemy import func
class HistoryScheduler:
def __init__(self):
self.browser_collector = BrowserHistoryCollector()
self.page_fetcher = PageInfoFetcher()
self.last_history_update = None
async def update_bookmarks(self):
bookmarks = self.browser_collector.fetch_bookmarks()
db = SessionLocal()
try:
# First, get all existing URLs to avoid duplicates
existing_urls = {
url: (added_time, folder)
for url, added_time, folder in
db.query(Bookmark.url, Bookmark.added_time, Bookmark.folder).all()
}
new_entries = []
for added_time, url, title, folder in bookmarks:
# Only add if URL doesn't exist or if it's in a different folder
if (url not in existing_urls or
existing_urls[url][1] != folder):
domain = self.browser_collector.get_domain(url)
entry = Bookmark(
url=url,
title=title,
added_time=added_time,
folder=folder,
domain=domain
)
new_entries.append(entry)
if new_entries:
db.bulk_save_objects(new_entries)
db.commit()
finally:
db.close()
async def update_history(self):
while True:
db = SessionLocal()
try:
# Get the latest timestamp from our database
latest_entry = db.query(func.max(HistoryEntry.visit_time)).scalar()
# Fetch new history
history = self.browser_collector.fetch_history()
# Filter to only get entries newer than our latest entry
new_entries = []
for visit_time, url, title in history:
if not latest_entry or visit_time > latest_entry:
domain = self.browser_collector.get_domain(url)
if not title:
title = await self.page_fetcher.get_page_title(url)
entry = HistoryEntry(
url=url,
title=title,
visit_time=visit_time,
domain=domain
)
new_entries.append(entry)
if new_entries:
db.bulk_save_objects(new_entries)
db.commit()
# Update bookmarks
await self.update_bookmarks()
finally:
db.close()
# Wait for 5 minutes before next update
await asyncio.sleep(300)

15
main.py Normal file
View File

@@ -0,0 +1,15 @@
import uvicorn
import os
import sys
# Add the app directory to the Python path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if __name__ == "__main__":
# Run the FastAPI application using uvicorn
uvicorn.run(
"app.main:app",
host="0.0.0.0", # Allows external access
port=8523,
reload=True # Enable auto-reload during development
)

90
page-reader.py Normal file
View File

@@ -0,0 +1,90 @@
import httpx
import re
from markdownify import markdownify as md
from bs4 import BeautifulSoup
# Patterns for cleaning
SCRIPT_PATTERN = r"<[ ]*script.*?\/[ ]*script[ ]*>"
STYLE_PATTERN = r"<[ ]*style.*?\/[ ]*style[ ]*>"
META_PATTERN = r"<[ ]*meta.*?>"
COMMENT_PATTERN = r"<[ ]*!--.*?--[ ]*>"
LINK_PATTERN = r"<[ ]*link.*?>"
BASE64_IMG_PATTERN = r'<img[^>]+src="data:image/[^;]+;base64,[^"]+"[^>]*>'
SVG_PATTERN = r"(<svg[^>]*>)(.*?)(<\/svg>)"
def clean_html(html: str) -> str:
"""Clean HTML by removing unwanted elements and patterns."""
# First use regex to remove problematic patterns
html = re.sub(SCRIPT_PATTERN, "", html, flags=re.IGNORECASE | re.MULTILINE | re.DOTALL)
html = re.sub(STYLE_PATTERN, "", html, flags=re.IGNORECASE | re.MULTILINE | re.DOTALL)
html = re.sub(META_PATTERN, "", html, flags=re.IGNORECASE | re.MULTILINE | re.DOTALL)
html = re.sub(COMMENT_PATTERN, "", html, flags=re.IGNORECASE | re.MULTILINE | re.DOTALL)
html = re.sub(LINK_PATTERN, "", html, flags=re.IGNORECASE | re.MULTILINE | re.DOTALL)
html = re.sub(SVG_PATTERN, "", html, flags=re.IGNORECASE | re.MULTILINE | re.DOTALL)
html = re.sub(BASE64_IMG_PATTERN, "", html)
# Use BeautifulSoup to remove additional elements we want to strip
soup = BeautifulSoup(html, 'html.parser')
# Remove unwanted elements
elements_to_remove = [
'canvas', 'img', 'picture', 'audio', 'video',
'iframe', 'embed', 'object', 'param', 'track',
'map', 'area', 'source'
]
for element in elements_to_remove:
for tag in soup.find_all(element):
tag.decompose()
return str(soup)
def get_page_html(url: str) -> str:
"""Fetch HTML content from a given URL using httpx."""
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
}
try:
with httpx.Client(follow_redirects=True) as client:
response = client.get(url, headers=headers)
response.raise_for_status()
return response.text
except httpx.HTTPError as e:
print(f"Error fetching page: {e}")
return ""
def clean_whitespace(text: str) -> str:
"""Clean excessive whitespace from text, collapsing more than 2 newlines."""
# Replace 3 or more newlines with 2 newlines
cleaned = re.sub(r'\n{3,}', '\n\n', text)
# Remove trailing whitespace from each line
cleaned = '\n'.join(line.rstrip() for line in cleaned.splitlines())
return cleaned.strip()
def html_to_markdown(url: str) -> str:
"""Convert webpage HTML to markdown."""
html = get_page_html(url)
if not html:
return ""
# Clean the HTML first
cleaned_html = clean_html(html)
# Convert to markdown using markdownify
# Configure markdownify options for clean output
markdown = md(cleaned_html,
heading_style="ATX", # Use # style headers
bullets="-", # Use - for bullets
autolinks=True, # Convert URLs to links
strip=['form'], # Additional elements to strip
escape_asterisks=True,
escape_underscores=True)
# Clean up excessive whitespace
return clean_whitespace(markdown)
if __name__ == "__main__":
# Example usage
url = "https://reddit.com"
markdown_content = html_to_markdown(url)
print(markdown_content)

8
requirements.txt Normal file
View File

@@ -0,0 +1,8 @@
fastapi==0.109.2
uvicorn==0.27.1
sqlalchemy==2.0.27
browser-history==0.4.1
aiohttp==3.9.3
beautifulsoup4==4.12.3
httpx==0.27.0
markdownify==0.11.6