284 lines
11 KiB
Python
284 lines
11 KiB
Python
|
#!/usr/bin/env python3
|
||
|
import os
|
||
|
import sys
|
||
|
import argparse
|
||
|
import requests
|
||
|
import time
|
||
|
from urllib.parse import urlparse, urljoin
|
||
|
from bs4 import BeautifulSoup
|
||
|
from markitdown import MarkItDown
|
||
|
import json
|
||
|
import logging
|
||
|
import contextlib
|
||
|
|
||
|
# Configure logging
|
||
|
logging.basicConfig(level=logging.INFO,
|
||
|
format='%(asctime)s - %(levelname)s - %(message)s')
|
||
|
logger = logging.getLogger(__name__)
|
||
|
|
||
|
class WebScraper:
|
||
|
def __init__(self, base_url, max_depth=2, delay=1.0, exclude_patterns=None):
|
||
|
self.base_url = base_url
|
||
|
self.domain = urlparse(base_url).netloc
|
||
|
self.visited_urls = set()
|
||
|
self.max_depth = max_depth
|
||
|
self.delay = delay
|
||
|
self.exclude_patterns = exclude_patterns or []
|
||
|
self.pages = {} # Dictionary to store URL: HTML content
|
||
|
self.session = requests.Session()
|
||
|
|
||
|
def should_exclude(self, url):
|
||
|
"""Check if URL should be excluded based on patterns."""
|
||
|
for pattern in self.exclude_patterns:
|
||
|
if pattern in url:
|
||
|
return True
|
||
|
return False
|
||
|
|
||
|
def is_valid_url(self, url):
|
||
|
"""Check if the URL is valid and belongs to the same domain."""
|
||
|
parsed = urlparse(url)
|
||
|
return bool(parsed.netloc) and parsed.netloc == self.domain
|
||
|
|
||
|
def get_links(self, url, html):
|
||
|
"""Extract all links from the HTML content."""
|
||
|
soup = BeautifulSoup(html, 'html.parser')
|
||
|
for a_tag in soup.find_all('a', href=True):
|
||
|
href = a_tag['href']
|
||
|
# Handle relative URLs
|
||
|
full_url = urljoin(url, href)
|
||
|
# Filter URLs to only include those from the same domain
|
||
|
if self.is_valid_url(full_url) and not self.should_exclude(full_url):
|
||
|
yield full_url
|
||
|
|
||
|
def crawl(self, url=None, depth=0):
|
||
|
"""Crawl the website starting from the URL up to max_depth."""
|
||
|
if url is None:
|
||
|
url = self.base_url
|
||
|
|
||
|
# Stop if we've reached max depth or already visited this URL
|
||
|
if depth > self.max_depth or url in self.visited_urls:
|
||
|
return
|
||
|
|
||
|
# Mark this URL as visited
|
||
|
self.visited_urls.add(url)
|
||
|
|
||
|
try:
|
||
|
logger.info(f"Crawling: {url} (Depth: {depth})")
|
||
|
response = self.session.get(url, timeout=10)
|
||
|
|
||
|
if response.status_code == 200:
|
||
|
# Store the HTML content
|
||
|
self.pages[url] = response.text
|
||
|
|
||
|
# Extract and follow links
|
||
|
if depth < self.max_depth:
|
||
|
for link in self.get_links(url, response.text):
|
||
|
# Be nice to the server - add delay
|
||
|
time.sleep(self.delay)
|
||
|
self.crawl(link, depth + 1)
|
||
|
else:
|
||
|
logger.warning(f"Failed to fetch {url}: HTTP {response.status_code}")
|
||
|
|
||
|
except Exception as e:
|
||
|
logger.error(f"Error crawling {url}: {e}")
|
||
|
|
||
|
def get_pages(self):
|
||
|
"""Return the dictionary of crawled pages."""
|
||
|
return self.pages
|
||
|
|
||
|
def close(self):
|
||
|
"""Close the requests session."""
|
||
|
if hasattr(self, 'session') and self.session:
|
||
|
self.session.close()
|
||
|
|
||
|
|
||
|
class OpenWebUIUploader:
|
||
|
def __init__(self, base_url, api_token):
|
||
|
self.base_url = base_url.rstrip('/')
|
||
|
self.api_token = api_token
|
||
|
self.session = requests.Session()
|
||
|
self.session.headers.update({
|
||
|
"Authorization": f"Bearer {api_token}",
|
||
|
"Accept": "application/json"
|
||
|
})
|
||
|
|
||
|
def create_knowledge_base(self, name, purpose=None):
|
||
|
"""Create a new knowledge base in OpenWebUI."""
|
||
|
endpoint = f"{self.base_url}/api/v1/knowledge/create"
|
||
|
|
||
|
payload = {
|
||
|
"name": name,
|
||
|
"description": purpose or "Documentation"
|
||
|
}
|
||
|
|
||
|
try:
|
||
|
response = self.session.post(endpoint, json=payload)
|
||
|
response.raise_for_status()
|
||
|
return response.json()
|
||
|
except requests.exceptions.RequestException as e:
|
||
|
logger.error(f"Error creating knowledge base: {e}")
|
||
|
raise
|
||
|
|
||
|
def upload_file(self, kb_id, content, filename):
|
||
|
"""Upload a file to the knowledge base."""
|
||
|
upload_endpoint = f"{self.base_url}/api/v1/files/"
|
||
|
|
||
|
# Create a temporary file for the upload
|
||
|
temp_file_path = f"/tmp/{filename}"
|
||
|
with open(temp_file_path, 'w') as f:
|
||
|
f.write(content)
|
||
|
|
||
|
try:
|
||
|
# Use context manager for file upload request
|
||
|
with open(temp_file_path, 'rb') as f:
|
||
|
files = {'file': (filename, f, 'text/markdown')}
|
||
|
with self.session.post(
|
||
|
upload_endpoint,
|
||
|
headers={"Authorization": f"Bearer {self.api_token}"},
|
||
|
files=files
|
||
|
) as upload_response:
|
||
|
upload_response.raise_for_status()
|
||
|
file_id = upload_response.json().get('id')
|
||
|
|
||
|
# Add the file to the knowledge base
|
||
|
add_file_endpoint = f"{self.base_url}/api/v1/knowledge/{kb_id}/file/add"
|
||
|
with self.session.post(
|
||
|
add_file_endpoint,
|
||
|
headers={
|
||
|
"Authorization": f"Bearer {self.api_token}",
|
||
|
"Content-Type": "application/json"
|
||
|
},
|
||
|
json={'file_id': file_id}
|
||
|
) as add_response:
|
||
|
add_response.raise_for_status()
|
||
|
return add_response.json()
|
||
|
|
||
|
except requests.exceptions.RequestException as e:
|
||
|
logger.error(f"Error uploading file: {e}")
|
||
|
raise
|
||
|
finally:
|
||
|
# Clean up the temporary file
|
||
|
if os.path.exists(temp_file_path):
|
||
|
os.unlink(temp_file_path)
|
||
|
|
||
|
def close(self):
|
||
|
"""Close the requests session."""
|
||
|
if hasattr(self, 'session') and self.session:
|
||
|
self.session.close()
|
||
|
|
||
|
|
||
|
def convert_to_markdown(html_content, url):
|
||
|
"""Convert HTML content to Markdown using MarkItDown."""
|
||
|
try:
|
||
|
md = MarkItDown()
|
||
|
|
||
|
# Use BytesIO to provide a binary stream to convert_stream
|
||
|
from io import BytesIO
|
||
|
html_bytes = BytesIO(html_content.encode('utf-8'))
|
||
|
|
||
|
# Convert the HTML to Markdown
|
||
|
result = md.convert_stream(html_bytes, mime_type='text/html')
|
||
|
|
||
|
# Add a header with the source URL
|
||
|
markdown_with_header = f"# {url}\n\n{result.text_content}"
|
||
|
return markdown_with_header
|
||
|
except Exception as e:
|
||
|
logger.error(f"Error converting to markdown: {e}")
|
||
|
return f"# {url}\n\nError converting content: {str(e)}"
|
||
|
|
||
|
|
||
|
def main():
|
||
|
parser = argparse.ArgumentParser(description='Scrape a website and create an Open WebUI knowledge base')
|
||
|
parser.add_argument('--token', '-t', required=True, help='Your OpenWebUI API token')
|
||
|
parser.add_argument('--base-url', '-u', required=True, help='Base URL of your OpenWebUI instance (e.g., http://localhost:3000)')
|
||
|
parser.add_argument('--website-url', '-w', required=True, help='URL of the website to scrape')
|
||
|
parser.add_argument('--kb-name', '-n', required=True, help='Name for the knowledge base')
|
||
|
parser.add_argument('--kb-purpose', '-p', help='Purpose description for the knowledge base', default=None)
|
||
|
parser.add_argument('--depth', '-d', type=int, default=2, help='Maximum depth to crawl (default: 2)')
|
||
|
parser.add_argument('--delay', type=float, default=1.0, help='Delay between requests in seconds (default: 1.0)')
|
||
|
parser.add_argument('--exclude', '-e', action='append', help='URL patterns to exclude from crawling (can be specified multiple times)')
|
||
|
|
||
|
args = parser.parse_args()
|
||
|
|
||
|
# Initialize resources that need to be closed
|
||
|
scraper = None
|
||
|
uploader = None
|
||
|
|
||
|
try:
|
||
|
# 1. Crawl the website
|
||
|
logger.info(f"Starting web crawl of {args.website_url} to depth {args.depth}")
|
||
|
scraper = WebScraper(
|
||
|
base_url=args.website_url,
|
||
|
max_depth=args.depth,
|
||
|
delay=args.delay,
|
||
|
exclude_patterns=args.exclude or []
|
||
|
)
|
||
|
scraper.crawl()
|
||
|
|
||
|
crawled_pages = scraper.get_pages()
|
||
|
logger.info(f"Crawled {len(crawled_pages)} pages")
|
||
|
|
||
|
if not crawled_pages:
|
||
|
logger.error("No pages were crawled. Exiting.")
|
||
|
return 1
|
||
|
|
||
|
# 2. Convert HTML pages to Markdown
|
||
|
logger.info("Converting HTML pages to Markdown")
|
||
|
markdown_pages = {}
|
||
|
for url, html in crawled_pages.items():
|
||
|
markdown_content = convert_to_markdown(html, url)
|
||
|
# Create a safe filename from the URL
|
||
|
parsed_url = urlparse(url)
|
||
|
filename = f"{parsed_url.netloc}{parsed_url.path}".replace('/', '_').replace('.', '_')
|
||
|
if not filename.endswith('.md'):
|
||
|
filename = f"{filename}.md"
|
||
|
markdown_pages[filename] = markdown_content
|
||
|
|
||
|
# 3. Upload to Open WebUI
|
||
|
logger.info(f"Creating knowledge base '{args.kb_name}' in Open WebUI")
|
||
|
uploader = OpenWebUIUploader(args.base_url, args.token)
|
||
|
kb = uploader.create_knowledge_base(args.kb_name, args.kb_purpose)
|
||
|
|
||
|
kb_id = kb.get('id')
|
||
|
if not kb_id:
|
||
|
logger.error("Failed to get knowledge base ID")
|
||
|
return 1
|
||
|
|
||
|
logger.info(f"Created knowledge base with ID: {kb_id}")
|
||
|
|
||
|
# 4. Upload each markdown page
|
||
|
success_count = 0
|
||
|
error_count = 0
|
||
|
|
||
|
for filename, content in markdown_pages.items():
|
||
|
try:
|
||
|
logger.info(f"Uploading {filename}")
|
||
|
uploader.upload_file(kb_id, content, filename)
|
||
|
success_count += 1
|
||
|
# Add a small delay between uploads
|
||
|
time.sleep(0.5)
|
||
|
except Exception as e:
|
||
|
logger.error(f"Failed to upload {filename}: {e}")
|
||
|
error_count += 1
|
||
|
|
||
|
logger.info(f"Upload complete: {success_count} files uploaded successfully, {error_count} errors")
|
||
|
|
||
|
return 0
|
||
|
|
||
|
except Exception as e:
|
||
|
logger.error(f"An unexpected error occurred: {e}")
|
||
|
return 1
|
||
|
finally:
|
||
|
# Ensure all resources are properly closed
|
||
|
if scraper:
|
||
|
scraper.close()
|
||
|
if uploader:
|
||
|
uploader.close()
|
||
|
|
||
|
# Force closing any remaining connections
|
||
|
# This is a bit heavy-handed but effective for cleanup
|
||
|
requests.sessions.close_all_sessions()
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
sys.exit(main())
|