164 lines
5.6 KiB
Python
164 lines
5.6 KiB
Python
import sys
|
|
import subprocess
|
|
import logging
|
|
import urllib3
|
|
import os
|
|
from typing import Optional, Any
|
|
import asyncio
|
|
import contextlib
|
|
|
|
|
|
class Tools:
|
|
def __init__(self) -> None:
|
|
pass
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Try to import hvac, install if not present
|
|
try:
|
|
import hvac
|
|
except ImportError:
|
|
logger.info("hvac package not found. Attempting to install...")
|
|
try:
|
|
subprocess.check_call([sys.executable, "-m", "pip", "install", "hvac"])
|
|
import hvac
|
|
|
|
logger.info("hvac package installed successfully")
|
|
except subprocess.CalledProcessError as e:
|
|
logger.error(f"Failed to install hvac package: {str(e)}")
|
|
sys.exit(1)
|
|
|
|
import argparse
|
|
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
|
|
@staticmethod
|
|
def get_vault_secret(
|
|
token: str,
|
|
path: str,
|
|
vault_addr: str = "http://127.0.0.1:8200",
|
|
verify: bool = True,
|
|
) -> Optional[Any]:
|
|
try:
|
|
# Initialize the Vault client
|
|
client = hvac.Client(url=vault_addr, token=token, verify=verify)
|
|
with contextlib.closing(client):
|
|
|
|
# Check if client is authenticated
|
|
if not client.is_authenticated():
|
|
print("Failed to authenticate with Vault")
|
|
return None
|
|
|
|
# Split path to separate the key if it exists
|
|
path_parts = path.rsplit(".", 1)
|
|
secret_path = path_parts[0]
|
|
key = path_parts[1] if len(path_parts) > 1 else "value"
|
|
|
|
# Try KV v2 first
|
|
try:
|
|
# For KV v2, try with and without /data/ in the path
|
|
try:
|
|
if "/data/" not in secret_path:
|
|
v2_path = secret_path.replace("//", "/").strip("/")
|
|
mount_point = v2_path.split("/")[0]
|
|
v2_path = "/".join(v2_path.split("/")[1:])
|
|
else:
|
|
# Remove /data/ for the API call
|
|
v2_path = secret_path.replace("/data/", "/")
|
|
mount_point = v2_path.split("/")[0]
|
|
v2_path = "/".join(v2_path.split("/")[1:])
|
|
|
|
secret = client.secrets.kv.v2.read_secret_version(
|
|
path=v2_path,
|
|
mount_point=mount_point,
|
|
raise_on_deleted_version=False,
|
|
)
|
|
if secret and "data" in secret and "data" in secret["data"]:
|
|
secret_data = secret["data"]["data"]
|
|
if key in secret_data:
|
|
return secret_data[key]
|
|
logger.warning(f"Key '{key}' not found in KV v2 secret")
|
|
except Exception as e:
|
|
logger.debug(f"KV v2 attempt failed: {str(e)}")
|
|
|
|
# Try KV v1
|
|
try:
|
|
secret = client.read(secret_path)
|
|
if secret and "data" in secret:
|
|
secret_data = secret["data"]
|
|
if key in secret_data:
|
|
return secret_data[key]
|
|
logger.warning(f"Key '{key}' not found in KV v1 secret")
|
|
except Exception as e:
|
|
logger.debug(f"KV v1 attempt failed: {str(e)}")
|
|
|
|
print(f"Secret not found at path: {path}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
print(f"Error reading secret: {str(e)}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
print(f"Error connecting to Vault: {str(e)}")
|
|
return None
|
|
|
|
@staticmethod
|
|
def vault_access() -> None:
|
|
"""
|
|
Query Vault for secrets based on path and optionally a key
|
|
User needs to provide a path, optionally a key, and the Vault token if not set in the environment.
|
|
"""
|
|
parser = argparse.ArgumentParser(
|
|
description="Retrieve secrets from HashiCorp Vault"
|
|
)
|
|
parser.add_argument(
|
|
"--token",
|
|
default=os.environ.get("VAULT_TOKEN"),
|
|
help="Vault authentication token (defaults to VAULT_TOKEN environment variable)",
|
|
)
|
|
parser.add_argument(
|
|
"--path",
|
|
required=True,
|
|
help="Path to the secret in Vault (with optional .key)",
|
|
)
|
|
parser.add_argument(
|
|
"--vault-addr",
|
|
default="https://192.168.1.8:8200",
|
|
help="Vault server address",
|
|
)
|
|
parser.add_argument(
|
|
"--no-verify", action="store_true", help="Disable TLS verification"
|
|
)
|
|
|
|
args: argparse.Namespace = parser.parse_args()
|
|
|
|
if not args.token:
|
|
print(
|
|
"No token provided. Please set VAULT_TOKEN environment variable or use --token"
|
|
)
|
|
try:
|
|
secret: Optional[Any] = Tools.get_vault_secret(
|
|
token=args.token,
|
|
path=args.path,
|
|
vault_addr=args.vault_addr,
|
|
verify=not args.no_verify,
|
|
)
|
|
|
|
if secret is not None:
|
|
print(secret)
|
|
else:
|
|
print("Failed to retrieve secret")
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
finally:
|
|
if asyncio.get_event_loop().is_running():
|
|
pending = asyncio.all_tasks(asyncio.get_event_loop())
|
|
for task in pending:
|
|
task.cancel()
|
|
|