#!/usr/bin/env python3 import os import sys import argparse import requests import time from urllib.parse import urlparse, urljoin from bs4 import BeautifulSoup from markitdown import MarkItDown import json import logging import contextlib # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class WebScraper: def __init__(self, base_url, max_depth=2, delay=1.0, exclude_patterns=None): self.base_url = base_url self.domain = urlparse(base_url).netloc self.visited_urls = set() self.max_depth = max_depth self.delay = delay self.exclude_patterns = exclude_patterns or [] self.pages = {} # Dictionary to store URL: HTML content self.session = requests.Session() def should_exclude(self, url): """Check if URL should be excluded based on patterns.""" for pattern in self.exclude_patterns: if pattern in url: return True return False def is_valid_url(self, url): """Check if the URL is valid and belongs to the same domain.""" parsed = urlparse(url) return bool(parsed.netloc) and parsed.netloc == self.domain def get_links(self, url, html): """Extract all links from the HTML content.""" soup = BeautifulSoup(html, 'html.parser') for a_tag in soup.find_all('a', href=True): href = a_tag['href'] # Handle relative URLs full_url = urljoin(url, href) # Filter URLs to only include those from the same domain if self.is_valid_url(full_url) and not self.should_exclude(full_url): yield full_url def crawl(self, url=None, depth=0): """Crawl the website starting from the URL up to max_depth.""" if url is None: url = self.base_url # Stop if we've reached max depth or already visited this URL if depth > self.max_depth or url in self.visited_urls: return # Mark this URL as visited self.visited_urls.add(url) try: logger.info(f"Crawling: {url} (Depth: {depth})") response = self.session.get(url, timeout=10) if response.status_code == 200: # Store the HTML content self.pages[url] = response.text # Extract and follow links if depth < self.max_depth: for link in self.get_links(url, response.text): # Be nice to the server - add delay time.sleep(self.delay) self.crawl(link, depth + 1) else: logger.warning(f"Failed to fetch {url}: HTTP {response.status_code}") except Exception as e: logger.error(f"Error crawling {url}: {e}") def get_pages(self): """Return the dictionary of crawled pages.""" return self.pages def close(self): """Close the requests session.""" if hasattr(self, 'session') and self.session: self.session.close() class OpenWebUIUploader: def __init__(self, base_url, api_token): self.base_url = base_url.rstrip('/') self.api_token = api_token self.session = requests.Session() self.session.headers.update({ "Authorization": f"Bearer {api_token}", "Accept": "application/json" }) def create_knowledge_base(self, name, purpose=None): """Create a new knowledge base in OpenWebUI.""" endpoint = f"{self.base_url}/api/v1/knowledge/create" payload = { "name": name, "description": purpose or "Documentation" } try: response = self.session.post(endpoint, json=payload) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: logger.error(f"Error creating knowledge base: {e}") raise def upload_file(self, kb_id, content, filename): """Upload a file to the knowledge base.""" upload_endpoint = f"{self.base_url}/api/v1/files/" # Create a temporary file for the upload temp_file_path = f"/tmp/{filename}" with open(temp_file_path, 'w') as f: f.write(content) try: # Use context manager for file upload request with open(temp_file_path, 'rb') as f: files = {'file': (filename, f, 'text/markdown')} with self.session.post( upload_endpoint, headers={"Authorization": f"Bearer {self.api_token}"}, files=files ) as upload_response: upload_response.raise_for_status() file_id = upload_response.json().get('id') # Add the file to the knowledge base add_file_endpoint = f"{self.base_url}/api/v1/knowledge/{kb_id}/file/add" with self.session.post( add_file_endpoint, headers={ "Authorization": f"Bearer {self.api_token}", "Content-Type": "application/json" }, json={'file_id': file_id} ) as add_response: add_response.raise_for_status() return add_response.json() except requests.exceptions.RequestException as e: logger.error(f"Error uploading file: {e}") raise finally: # Clean up the temporary file if os.path.exists(temp_file_path): os.unlink(temp_file_path) def close(self): """Close the requests session.""" if hasattr(self, 'session') and self.session: self.session.close() def convert_to_markdown(html_content, url): """Convert HTML content to Markdown using MarkItDown.""" try: md = MarkItDown() # Use BytesIO to provide a binary stream to convert_stream from io import BytesIO html_bytes = BytesIO(html_content.encode('utf-8')) # Convert the HTML to Markdown result = md.convert_stream(html_bytes, mime_type='text/html') # Add a header with the source URL markdown_with_header = f"# {url}\n\n{result.text_content}" return markdown_with_header except Exception as e: logger.error(f"Error converting to markdown: {e}") return f"# {url}\n\nError converting content: {str(e)}" def main(): parser = argparse.ArgumentParser(description='Scrape a website and create an Open WebUI knowledge base') parser.add_argument('--token', '-t', required=True, help='Your OpenWebUI API token') parser.add_argument('--base-url', '-u', required=True, help='Base URL of your OpenWebUI instance (e.g., http://localhost:3000)') parser.add_argument('--website-url', '-w', required=True, help='URL of the website to scrape') parser.add_argument('--kb-name', '-n', required=True, help='Name for the knowledge base') parser.add_argument('--kb-purpose', '-p', help='Purpose description for the knowledge base', default=None) parser.add_argument('--depth', '-d', type=int, default=2, help='Maximum depth to crawl (default: 2)') parser.add_argument('--delay', type=float, default=1.0, help='Delay between requests in seconds (default: 1.0)') parser.add_argument('--exclude', '-e', action='append', help='URL patterns to exclude from crawling (can be specified multiple times)') args = parser.parse_args() # Initialize resources that need to be closed scraper = None uploader = None try: # 1. Crawl the website logger.info(f"Starting web crawl of {args.website_url} to depth {args.depth}") scraper = WebScraper( base_url=args.website_url, max_depth=args.depth, delay=args.delay, exclude_patterns=args.exclude or [] ) scraper.crawl() crawled_pages = scraper.get_pages() logger.info(f"Crawled {len(crawled_pages)} pages") if not crawled_pages: logger.error("No pages were crawled. Exiting.") return 1 # 2. Convert HTML pages to Markdown logger.info("Converting HTML pages to Markdown") markdown_pages = {} for url, html in crawled_pages.items(): markdown_content = convert_to_markdown(html, url) # Create a safe filename from the URL parsed_url = urlparse(url) filename = f"{parsed_url.netloc}{parsed_url.path}".replace('/', '_').replace('.', '_') if not filename.endswith('.md'): filename = f"{filename}.md" markdown_pages[filename] = markdown_content # 3. Upload to Open WebUI logger.info(f"Creating knowledge base '{args.kb_name}' in Open WebUI") uploader = OpenWebUIUploader(args.base_url, args.token) kb = uploader.create_knowledge_base(args.kb_name, args.kb_purpose) kb_id = kb.get('id') if not kb_id: logger.error("Failed to get knowledge base ID") return 1 logger.info(f"Created knowledge base with ID: {kb_id}") # 4. Upload each markdown page success_count = 0 error_count = 0 for filename, content in markdown_pages.items(): try: logger.info(f"Uploading {filename}") uploader.upload_file(kb_id, content, filename) success_count += 1 # Add a small delay between uploads time.sleep(0.5) except Exception as e: logger.error(f"Failed to upload {filename}: {e}") error_count += 1 logger.info(f"Upload complete: {success_count} files uploaded successfully, {error_count} errors") return 0 except Exception as e: logger.error(f"An unexpected error occurred: {e}") return 1 finally: # Ensure all resources are properly closed if scraper: scraper.close() if uploader: uploader.close() if __name__ == "__main__": sys.exit(main())