Add IP blocking functionality to HAProxy Manager
All checks were successful
HAProxy Manager Build and Push / Build-and-Push (push) Successful in 1m1s

- Add blocked_ips database table to store blocked IP addresses
- Implement API endpoints for IP blocking management:
  - GET /api/blocked-ips: List all blocked IPs
  - POST /api/blocked-ips: Block an IP address
  - DELETE /api/blocked-ips: Unblock an IP address
- Update HAProxy configuration generation to include blocked IP ACLs
- Create blocked IP page template for denied access
- Add comprehensive API documentation for WHP integration
- Include test script for IP blocking functionality
- Update .gitignore with Python patterns
- Add CLAUDE.md for codebase documentation

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-08-21 18:32:47 -07:00
parent a7ce40f600
commit ca37a68255
7 changed files with 954 additions and 1 deletions

View File

@@ -100,6 +100,17 @@ def init_db():
FOREIGN KEY (backend_id) REFERENCES backends (id)
)
''')
# Create blocked_ips table
cursor.execute('''
CREATE TABLE IF NOT EXISTS blocked_ips (
id INTEGER PRIMARY KEY,
ip_address TEXT UNIQUE NOT NULL,
reason TEXT,
blocked_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
blocked_by TEXT
)
''')
conn.commit()
def certbot_register():
@@ -699,6 +710,86 @@ def remove_domain():
log_operation('remove_domain', False, str(e))
return jsonify({'status': 'error', 'message': str(e)}), 500
@app.route('/api/blocked-ips', methods=['GET'])
@require_api_key
def get_blocked_ips():
"""Get all blocked IP addresses"""
try:
with sqlite3.connect(DB_FILE) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute('SELECT * FROM blocked_ips ORDER BY blocked_at DESC')
blocked_ips = [dict(row) for row in cursor.fetchall()]
log_operation('get_blocked_ips', True)
return jsonify(blocked_ips)
except Exception as e:
log_operation('get_blocked_ips', False, str(e))
return jsonify({'status': 'error', 'message': str(e)}), 500
@app.route('/api/blocked-ips', methods=['POST'])
@require_api_key
def add_blocked_ip():
"""Add an IP address to the blocked list"""
data = request.get_json()
ip_address = data.get('ip_address')
reason = data.get('reason', 'No reason provided')
blocked_by = data.get('blocked_by', 'API')
if not ip_address:
log_operation('add_blocked_ip', False, 'IP address is required')
return jsonify({'status': 'error', 'message': 'IP address is required'}), 400
try:
with sqlite3.connect(DB_FILE) as conn:
cursor = conn.cursor()
cursor.execute('INSERT INTO blocked_ips (ip_address, reason, blocked_by) VALUES (?, ?, ?)',
(ip_address, reason, blocked_by))
blocked_ip_id = cursor.lastrowid
# Regenerate HAProxy config to apply the block
generate_config()
log_operation('add_blocked_ip', True, f'IP {ip_address} blocked successfully')
return jsonify({'status': 'success', 'blocked_ip_id': blocked_ip_id, 'message': f'IP {ip_address} has been blocked'})
except sqlite3.IntegrityError:
log_operation('add_blocked_ip', False, f'IP {ip_address} is already blocked')
return jsonify({'status': 'error', 'message': 'IP address is already blocked'}), 409
except Exception as e:
log_operation('add_blocked_ip', False, str(e))
return jsonify({'status': 'error', 'message': str(e)}), 500
@app.route('/api/blocked-ips', methods=['DELETE'])
@require_api_key
def remove_blocked_ip():
"""Remove an IP address from the blocked list"""
data = request.get_json()
ip_address = data.get('ip_address')
if not ip_address:
log_operation('remove_blocked_ip', False, 'IP address is required')
return jsonify({'status': 'error', 'message': 'IP address is required'}), 400
try:
with sqlite3.connect(DB_FILE) as conn:
cursor = conn.cursor()
cursor.execute('SELECT id FROM blocked_ips WHERE ip_address = ?', (ip_address,))
ip_result = cursor.fetchone()
if not ip_result:
log_operation('remove_blocked_ip', False, f'IP {ip_address} not found in blocked list')
return jsonify({'status': 'error', 'message': 'IP address not found in blocked list'}), 404
cursor.execute('DELETE FROM blocked_ips WHERE ip_address = ?', (ip_address,))
# Regenerate HAProxy config to remove the block
generate_config()
log_operation('remove_blocked_ip', True, f'IP {ip_address} unblocked successfully')
return jsonify({'status': 'success', 'message': f'IP {ip_address} has been unblocked'})
except Exception as e:
log_operation('remove_blocked_ip', False, str(e))
return jsonify({'status': 'error', 'message': str(e)}), 500
def generate_config():
try:
conn = sqlite3.connect(DB_FILE)
@@ -722,6 +813,11 @@ def generate_config():
# Fetch and immediately convert to list of dicts to avoid any cursor issues
domains = [dict(domain) for domain in cursor.fetchall()]
# Get blocked IPs
cursor.execute('SELECT ip_address FROM blocked_ips')
blocked_ips = [row[0] for row in cursor.fetchall()]
config_parts = []
# Add Haproxy Default Headers
@@ -730,7 +826,8 @@ def generate_config():
# Add Listener Block
listener_block = template_env.get_template('hap_listener.tpl').render(
crt_path = SSL_CERTS_DIR
crt_path = SSL_CERTS_DIR,
blocked_ips = blocked_ips
)
config_parts.append(listener_block)
@@ -973,6 +1070,11 @@ if __name__ == '__main__':
secondary_message=os.environ.get('HAPROXY_DEFAULT_SECONDARY_MESSAGE', 'If you believe this is an error, please check the domain name and try again.')
)
@default_app.route('/blocked-ip')
def blocked_ip_page():
"""Serve the blocked IP page for blocked clients"""
return render_template('blocked_ip_page.html')
default_app.run(host='0.0.0.0', port=8080)
# Start the default page server in a separate thread