import sys import subprocess import logging import urllib3 import os from typing import Optional, Any import asyncio import contextlib class Tools: def __init__(self) -> None: pass # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) # Try to import hvac, install if not present try: import hvac except ImportError: logger.info("hvac package not found. Attempting to install...") try: subprocess.check_call([sys.executable, "-m", "pip", "install", "hvac"]) import hvac logger.info("hvac package installed successfully") except subprocess.CalledProcessError as e: logger.error(f"Failed to install hvac package: {str(e)}") sys.exit(1) import argparse urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) @staticmethod def get_vault_secret( token: str, path: str, vault_addr: str = "http://127.0.0.1:8200", verify: bool = True, ) -> Optional[Any]: try: # Initialize the Vault client client = hvac.Client(url=vault_addr, token=token, verify=verify) with contextlib.closing(client): # Check if client is authenticated if not client.is_authenticated(): print("Failed to authenticate with Vault") return None # Split path to separate the key if it exists path_parts = path.rsplit(".", 1) secret_path = path_parts[0] key = path_parts[1] if len(path_parts) > 1 else "value" # Try KV v2 first try: # For KV v2, try with and without /data/ in the path try: if "/data/" not in secret_path: v2_path = secret_path.replace("//", "/").strip("/") mount_point = v2_path.split("/")[0] v2_path = "/".join(v2_path.split("/")[1:]) else: # Remove /data/ for the API call v2_path = secret_path.replace("/data/", "/") mount_point = v2_path.split("/")[0] v2_path = "/".join(v2_path.split("/")[1:]) secret = client.secrets.kv.v2.read_secret_version( path=v2_path, mount_point=mount_point, raise_on_deleted_version=False, ) if secret and "data" in secret and "data" in secret["data"]: secret_data = secret["data"]["data"] if key in secret_data: return secret_data[key] logger.warning(f"Key '{key}' not found in KV v2 secret") except Exception as e: logger.debug(f"KV v2 attempt failed: {str(e)}") # Try KV v1 try: secret = client.read(secret_path) if secret and "data" in secret: secret_data = secret["data"] if key in secret_data: return secret_data[key] logger.warning(f"Key '{key}' not found in KV v1 secret") except Exception as e: logger.debug(f"KV v1 attempt failed: {str(e)}") print(f"Secret not found at path: {path}") return None except Exception as e: print(f"Error reading secret: {str(e)}") return None except Exception as e: print(f"Error connecting to Vault: {str(e)}") return None @staticmethod def vault_access() -> None: """ Query Vault for secrets based on path and optionally a key User needs to provide a path, optionally a key, and the Vault token if not set in the environment. """ parser = argparse.ArgumentParser( description="Retrieve secrets from HashiCorp Vault" ) parser.add_argument( "--token", default=os.environ.get("VAULT_TOKEN"), help="Vault authentication token (defaults to VAULT_TOKEN environment variable)", ) parser.add_argument( "--path", required=True, help="Path to the secret in Vault (with optional .key)", ) parser.add_argument( "--vault-addr", default="https://192.168.1.8:8200", help="Vault server address", ) parser.add_argument( "--no-verify", action="store_true", help="Disable TLS verification" ) args: argparse.Namespace = parser.parse_args() if not args.token: print( "No token provided. Please set VAULT_TOKEN environment variable or use --token" ) try: secret: Optional[Any] = Tools.get_vault_secret( token=args.token, path=args.path, vault_addr=args.vault_addr, verify=not args.no_verify, ) if secret is not None: print(secret) else: print("Failed to retrieve secret") except Exception as e: print(f"Error: {e}") finally: if asyncio.get_event_loop().is_running(): pending = asyncio.all_tasks(asyncio.get_event_loop()) for task in pending: task.cancel()