Add comprehensive anti-scan and brute force protection
All checks were successful
HAProxy Manager Build and Push / Build-and-Push (push) Successful in 54s

Implement multi-layered security system to protect against exploit
scanning and brute force attacks while maintaining legitimate traffic flow.

Security Features:
- Attack detection for common exploit paths (WordPress, phpMyAdmin, shells)
- Malicious user agent filtering (sqlmap, nikto, metasploit, etc.)
- SQL injection and directory traversal pattern detection
- Progressive rate limiting (50 req/10s, 20 conn/10s, 10 err/10s)
- Three-tier response: tarpit → deny → repeat offender blocking
- Strict authentication endpoint protection (5 req/10s limit)
- Real IP detection through proxy headers (Cloudflare, X-Real-IP)

Management Tools:
- manage-blocked-ips.sh: Dynamic IP blocking/unblocking
- monitor-attacks.sh: Real-time threat monitoring
- API endpoints for security stats and temporary blocking
- Auto-expiring temporary blocks with cleanup endpoint

HAProxy 2.6 Compatibility:
- Removed silent-drop (not available in 2.6)
- Fixed stick table counter syntax
- Using standard tarpit and deny actions

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-09-22 16:50:35 -07:00
parent 002e79b565
commit e2f350ce95
4 changed files with 388 additions and 12 deletions

View File

@@ -8,8 +8,9 @@ import socket
import psutil
import functools
import logging
from datetime import datetime
from datetime import datetime, timedelta
import json
import ipaddress
import shutil
import tempfile
@@ -119,6 +120,14 @@ def init_db():
''')
conn.commit()
def validate_ip_address(ip_string):
"""Validate if a string is a valid IP address"""
try:
ipaddress.ip_address(ip_string)
return True
except ValueError:
return False
def certbot_register():
"""Register with Let's Encrypt using the certbot client and agree to the terms of service"""
result = subprocess.run(['certbot', 'show_account'], capture_output=True)
@@ -894,6 +903,166 @@ def sync_blocked_ips():
log_operation('sync_blocked_ips', False, str(e))
return jsonify({'status': 'error', 'message': str(e)}), 500
@app.route('/api/security/stats', methods=['GET'])
@require_api_key
def get_security_stats():
"""Get current security statistics from HAProxy stick table"""
try:
if os.path.exists(HAPROXY_SOCKET_PATH):
socket_path = HAPROXY_SOCKET_PATH
else:
socket_path = '/tmp/haproxy-cli'
# Get stick table data
cmd = f'echo "show table web" | socat stdio {socket_path}'
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode != 0:
return jsonify({'status': 'error', 'message': 'Failed to get stick table data'}), 500
# Parse stick table output
lines = result.stdout.strip().split('\n')
threats = []
for line in lines[1:]: # Skip header
parts = line.split()
if len(parts) >= 8:
ip = parts[0]
try:
gpc0 = int(parts[3]) if len(parts) > 3 else 0
gpc1 = int(parts[4]) if len(parts) > 4 else 0
req_rate = int(parts[5]) if len(parts) > 5 else 0
err_rate = int(parts[6]) if len(parts) > 6 else 0
conn_rate = int(parts[7]) if len(parts) > 7 else 0
# Only include IPs with significant activity
if gpc0 > 0 or gpc1 > 0 or req_rate > 30 or err_rate > 5 or conn_rate > 10:
threat_level = 'low'
if gpc1 > 2:
threat_level = 'critical'
elif gpc0 > 0 or err_rate > 10:
threat_level = 'high'
elif req_rate > 40 or conn_rate > 15:
threat_level = 'medium'
threats.append({
'ip': ip,
'blocked': gpc0 > 0,
'repeat_offender': gpc1 > 2,
'offense_count': gpc1,
'request_rate': req_rate,
'error_rate': err_rate,
'connection_rate': conn_rate,
'threat_level': threat_level
})
except (ValueError, IndexError):
continue
# Sort by threat level
threats.sort(key=lambda x: (x['offense_count'], x['error_rate'], x['request_rate']), reverse=True)
return jsonify({
'status': 'success',
'total_tracked_ips': len(lines) - 1,
'active_threats': len(threats),
'threats': threats[:50] # Limit to top 50
})
except Exception as e:
log_operation('get_security_stats', False, str(e))
return jsonify({'status': 'error', 'message': str(e)}), 500
@app.route('/api/security/temporary-block', methods=['POST'])
@require_api_key
def temporary_block():
"""Temporarily block an IP address (auto-unblocks after specified time)"""
data = request.get_json()
ip_address = data.get('ip_address')
duration_minutes = data.get('duration_minutes', 60) # Default 1 hour
if not ip_address:
return jsonify({'status': 'error', 'message': 'IP address is required'}), 400
if not validate_ip_address(ip_address):
return jsonify({'status': 'error', 'message': 'Invalid IP address format'}), 400
try:
# Add to blocked IPs with expiration time
expiry_time = datetime.now() + timedelta(minutes=duration_minutes)
with sqlite3.connect(DB_FILE) as conn:
cursor = conn.cursor()
# Check if table has expiry column, add if not
cursor.execute("PRAGMA table_info(blocked_ips)")
columns = [column[1] for column in cursor.fetchall()]
if 'expiry_time' not in columns:
cursor.execute('ALTER TABLE blocked_ips ADD COLUMN expiry_time TEXT')
# Add or update the blocked IP with expiry
cursor.execute('''
INSERT OR REPLACE INTO blocked_ips (ip_address, reason, expiry_time)
VALUES (?, ?, ?)
''', (ip_address, f'Temporary block for {duration_minutes} minutes', expiry_time.isoformat()))
# Update map file and add to runtime
if not update_blocked_ips_map():
return jsonify({'status': 'error', 'message': 'Failed to update map file'}), 500
add_ip_to_runtime_map(ip_address)
log_operation('temporary_block', True, f'Temporarily blocked {ip_address} for {duration_minutes} minutes')
return jsonify({
'status': 'success',
'message': f'IP {ip_address} temporarily blocked for {duration_minutes} minutes',
'expires_at': expiry_time.isoformat()
})
except Exception as e:
log_operation('temporary_block', False, str(e))
return jsonify({'status': 'error', 'message': str(e)}), 500
@app.route('/api/security/clear-expired', methods=['POST'])
@require_api_key
def clear_expired_blocks():
"""Remove expired temporary IP blocks"""
try:
current_time = datetime.now()
expired_ips = []
with sqlite3.connect(DB_FILE) as conn:
cursor = conn.cursor()
# Check if expiry_time column exists
cursor.execute("PRAGMA table_info(blocked_ips)")
columns = [column[1] for column in cursor.fetchall()]
if 'expiry_time' in columns:
# Find and remove expired blocks
cursor.execute('''
SELECT ip_address FROM blocked_ips
WHERE expiry_time IS NOT NULL AND expiry_time < ?
''', (current_time.isoformat(),))
expired_ips = [row[0] for row in cursor.fetchall()]
# Remove expired IPs
for ip in expired_ips:
cursor.execute('DELETE FROM blocked_ips WHERE ip_address = ?', (ip,))
remove_ip_from_runtime_map(ip)
# Update map file if any IPs were removed
if expired_ips:
update_blocked_ips_map()
log_operation('clear_expired_blocks', True, f'Cleared {len(expired_ips)} expired IP blocks')
return jsonify({
'status': 'success',
'message': f'Cleared {len(expired_ips)} expired IP blocks',
'cleared_ips': expired_ips
})
except Exception as e:
log_operation('clear_expired_blocks', False, str(e))
return jsonify({'status': 'error', 'message': str(e)}), 500
def generate_config():
try:
conn = sqlite3.connect(DB_FILE)