Add /api/certificates/request endpoint for programmatic certificate requests, update docs and add test script
All checks were successful
HAProxy Manager Build and Push / Build-and-Push (push) Successful in 37s

This commit is contained in:
2025-07-11 17:14:01 -07:00
parent 7b0b4c0476
commit ef488a253d
4 changed files with 394 additions and 4 deletions

View File

@@ -289,6 +289,7 @@ def index():
@app.route('/api/ssl', methods=['POST'])
@require_api_key
def request_ssl():
"""Legacy endpoint for requesting SSL certificate for a single domain"""
data = request.get_json()
domain = data.get('domain')
@@ -310,6 +311,9 @@ def request_ssl():
key_path = f'/etc/letsencrypt/live/{domain}/privkey.pem'
combined_path = f'{SSL_CERTS_DIR}/{domain}.pem'
# Ensure SSL certs directory exists
os.makedirs(SSL_CERTS_DIR, exist_ok=True)
with open(combined_path, 'w') as combined:
subprocess.run(['cat', cert_path, key_path], stdout=combined)
@@ -326,7 +330,12 @@ def request_ssl():
conn.close()
generate_config()
log_operation('request_ssl', True, f'SSL certificate obtained for {domain}')
return jsonify({'status': 'success'})
return jsonify({
'status': 'success',
'domain': domain,
'cert_path': combined_path,
'message': 'Certificate obtained successfully'
})
else:
error_msg = f'Failed to obtain SSL certificate: {result.stderr}'
log_operation('request_ssl', False, error_msg)
@@ -491,6 +500,137 @@ def get_certificate_status():
log_operation('get_certificate_status', False, str(e))
return jsonify({'status': 'error', 'message': str(e)}), 500
@app.route('/api/certificates/request', methods=['POST'])
@require_api_key
def request_certificates():
"""Request certificate generation for one or more domains"""
data = request.get_json()
domains = data.get('domains', [])
force_renewal = data.get('force_renewal', False)
include_www = data.get('include_www', True)
if not domains:
log_operation('request_certificates', False, 'No domains provided')
return jsonify({'status': 'error', 'message': 'At least one domain is required'}), 400
if not isinstance(domains, list):
domains = [domains] # Convert single domain to list
results = []
success_count = 0
error_count = 0
for domain in domains:
try:
# Prepare domain list for certbot (include www subdomain if requested)
certbot_domains = [domain]
if include_www and not domain.startswith('www.'):
certbot_domains.append(f'www.{domain}')
# Build certbot command
cmd = [
'certbot', 'certonly', '-n', '--standalone',
'--preferred-challenges', 'http', '--http-01-port=8688'
]
if force_renewal:
cmd.append('--force-renewal')
# Add domains
for d in certbot_domains:
cmd.extend(['-d', d])
# Request certificate
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
# Combine cert files and copy to HAProxy certs directory
cert_path = f'/etc/letsencrypt/live/{domain}/fullchain.pem'
key_path = f'/etc/letsencrypt/live/{domain}/privkey.pem'
combined_path = f'{SSL_CERTS_DIR}/{domain}.pem'
# Ensure SSL certs directory exists
os.makedirs(SSL_CERTS_DIR, exist_ok=True)
with open(combined_path, 'w') as combined:
subprocess.run(['cat', cert_path, key_path], stdout=combined)
# Update database (add domain if it doesn't exist)
with sqlite3.connect(DB_FILE) as conn:
cursor = conn.cursor()
# Check if domain exists
cursor.execute('SELECT id FROM domains WHERE domain = ?', (domain,))
domain_exists = cursor.fetchone()
if domain_exists:
# Update existing domain
cursor.execute('''
UPDATE domains
SET ssl_enabled = 1, ssl_cert_path = ?
WHERE domain = ?
''', (combined_path, domain))
else:
# Add new domain with SSL enabled
cursor.execute('''
INSERT INTO domains (domain, ssl_enabled, ssl_cert_path)
VALUES (?, 1, ?)
''', (domain, combined_path))
results.append({
'domain': domain,
'status': 'success',
'message': 'Certificate obtained successfully',
'cert_path': combined_path,
'domains_covered': certbot_domains
})
success_count += 1
else:
error_msg = f'Failed to obtain certificate for {domain}: {result.stderr}'
results.append({
'domain': domain,
'status': 'error',
'message': error_msg,
'stderr': result.stderr
})
error_count += 1
except Exception as e:
error_msg = f'Exception while processing {domain}: {str(e)}'
results.append({
'domain': domain,
'status': 'error',
'message': error_msg
})
error_count += 1
# Regenerate HAProxy config if any certificates were successful
if success_count > 0:
try:
generate_config()
log_operation('request_certificates', True, f'Successfully obtained {success_count} certificates, {error_count} failed')
except Exception as e:
log_operation('request_certificates', False, f'Certificates obtained but config generation failed: {str(e)}')
# Return results
response = {
'status': 'completed',
'summary': {
'total': len(domains),
'successful': success_count,
'failed': error_count
},
'results': results
}
if error_count == 0:
return jsonify(response), 200
elif success_count > 0:
return jsonify(response), 207 # Multi-status (some succeeded, some failed)
else:
return jsonify(response), 500 # All failed
@app.route('/api/domain', methods=['DELETE'])
@require_api_key
def remove_domain():