From 24d07b1149f39b84e64ca391605a201fa1d6297d Mon Sep 17 00:00:00 2001 From: Josh Knapp Date: Thu, 12 Dec 2024 08:02:39 -0800 Subject: [PATCH] adding files to be merged into main --- vault_reader.py | 123 +++++++++++++++++++++++++++++++++++++++++++ vault_tool.py | 136 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 259 insertions(+) create mode 100644 vault_reader.py create mode 100644 vault_tool.py diff --git a/vault_reader.py b/vault_reader.py new file mode 100644 index 0000000..b13831e --- /dev/null +++ b/vault_reader.py @@ -0,0 +1,123 @@ +import sys +import subprocess +import logging +import urllib3 +import os + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +# Try to import hvac, install if not present +try: + import hvac +except ImportError: + logger.info("hvac package not found. Attempting to install...") + try: + subprocess.check_call([sys.executable, "-m", "pip", "install", "hvac"]) + import hvac + logger.info("hvac package installed successfully") + except subprocess.CalledProcessError as e: + logger.error(f"Failed to install hvac package: {str(e)}") + sys.exit(1) + +import argparse + +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + +def get_vault_secret(token, path, vault_addr='http://127.0.0.1:8200', verify=True): + try: + # Initialize the Vault client + client = hvac.Client( + url=vault_addr, + token=token, + verify=verify + ) + + # Check if client is authenticated + if not client.is_authenticated(): + logger.error("Failed to authenticate with Vault") + return None + + # Split path to separate the key if it exists + path_parts = path.rsplit('.', 1) + secret_path = path_parts[0] + key = path_parts[1] if len(path_parts) > 1 else 'value' + + # Try KV v2 first + try: + # For KV v2, try with and without /data/ in the path + try: + if '/data/' not in secret_path: + v2_path = secret_path.replace('//', '/').strip('/') + mount_point = v2_path.split('/')[0] + v2_path = '/'.join(v2_path.split('/')[1:]) + else: + # Remove /data/ for the API call + v2_path = secret_path.replace('/data/', '/') + mount_point = v2_path.split('/')[0] + v2_path = '/'.join(v2_path.split('/')[1:]) + + secret = client.secrets.kv.v2.read_secret_version( + path=v2_path, + mount_point=mount_point, + raise_on_deleted_version=False, + ) + if secret and 'data' in secret and 'data' in secret['data']: + secret_data = secret['data']['data'] + if key in secret_data: + return secret_data[key] + logger.warning(f"Key '{key}' not found in KV v2 secret") + except Exception as e: + logger.debug(f"KV v2 attempt failed: {str(e)}") + + # Try KV v1 + try: + secret = client.read(secret_path) + if secret and 'data' in secret: + secret_data = secret['data'] + if key in secret_data: + return secret_data[key] + logger.warning(f"Key '{key}' not found in KV v1 secret") + except Exception as e: + logger.debug(f"KV v1 attempt failed: {str(e)}") + + logger.error(f"Secret not found at path: {path}") + return None + + except Exception as e: + logger.error(f"Error reading secret: {str(e)}") + return None + + except Exception as e: + logger.error(f"Error connecting to Vault: {str(e)}") + return None + +def main(): + parser = argparse.ArgumentParser(description='Retrieve secrets from HashiCorp Vault') + parser.add_argument('--token', default=os.environ.get('VAULT_TOKEN'), + help='Vault authentication token (defaults to VAULT_TOKEN environment variable)') + parser.add_argument('--path', required=True, help='Path to the secret in Vault (with optional .key)') + parser.add_argument('--vault-addr', default='http://127.0.0.1:8200', help='Vault server address') + parser.add_argument('--no-verify', action='store_true', help='Disable TLS verification') + + args = parser.parse_args() + + if not args.token: + logger.error("No token provided. Please set VAULT_TOKEN environment variable or use --token") + sys.exit(1) + + secret = get_vault_secret( + token=args.token, + path=args.path, + vault_addr=args.vault_addr, + verify=not args.no_verify + ) + + if secret is not None: + print(secret) + else: + logger.error("Failed to retrieve secret") + +if __name__ == '__main__': + main() diff --git a/vault_tool.py b/vault_tool.py new file mode 100644 index 0000000..d11b5f8 --- /dev/null +++ b/vault_tool.py @@ -0,0 +1,136 @@ +import sys +import subprocess +import logging +import urllib3 +import os + + +class Tools: + def __init__(self): + pass + + # Add your custom tools using pure Python code here, make sure to add type hints + # Use Sphinx-style docstrings to document your tools, they will be used for generating tools specifications + # Please refer to function_calling_filter_pipeline.py file from pipelines project for an example + + # Configure logging + logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + logger = logging.getLogger(__name__) + + # Try to import hvac, install if not present + try: + import hvac + except ImportError: + logger.info("hvac package not found. Attempting to install...") + try: + subprocess.check_call([sys.executable, "-m", "pip", "install", "hvac"]) + import hvac + logger.info("hvac package installed successfully") + except subprocess.CalledProcessError as e: + logger.error(f"Failed to install hvac package: {str(e)}") + sys.exit(1) + + import argparse + + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + def get_vault_secret(token, path, vault_addr='http://127.0.0.1:8200', verify=True): + try: + # Initialize the Vault client + client = hvac.Client( + url=vault_addr, + token=token, + verify=verify + ) + + # Check if client is authenticated + if not client.is_authenticated(): + logger.error("Failed to authenticate with Vault") + return None + + # Split path to separate the key if it exists + path_parts = path.rsplit('.', 1) + secret_path = path_parts[0] + key = path_parts[1] if len(path_parts) > 1 else 'value' + + # Try KV v2 first + try: + # For KV v2, try with and without /data/ in the path + try: + if '/data/' not in secret_path: + v2_path = secret_path.replace('//', '/').strip('/') + mount_point = v2_path.split('/')[0] + v2_path = '/'.join(v2_path.split('/')[1:]) + else: + # Remove /data/ for the API call + v2_path = secret_path.replace('/data/', '/') + mount_point = v2_path.split('/')[0] + v2_path = '/'.join(v2_path.split('/')[1:]) + + secret = client.secrets.kv.v2.read_secret_version( + path=v2_path, + mount_point=mount_point, + raise_on_deleted_version=False, + ) + if secret and 'data' in secret and 'data' in secret['data']: + secret_data = secret['data']['data'] + if key in secret_data: + return secret_data[key] + logger.warning(f"Key '{key}' not found in KV v2 secret") + except Exception as e: + logger.debug(f"KV v2 attempt failed: {str(e)}") + + # Try KV v1 + try: + secret = client.read(secret_path) + if secret and 'data' in secret: + secret_data = secret['data'] + if key in secret_data: + return secret_data[key] + logger.warning(f"Key '{key}' not found in KV v1 secret") + except Exception as e: + logger.debug(f"KV v1 attempt failed: {str(e)}") + + print(f"Secret not found at path: {path}") + return None + + except Exception as e: + print(f"Error reading secret: {str(e)}") + return None + + except Exception as e: + print(f"Error connecting to Vault: {str(e)}") + return None + + def vault_access(): + """ + Query Vault for secrets based on path and optionally a key + User needs to provide a path, optionally a key, and the Vault token if not set in the environment. + """ + parser = argparse.ArgumentParser(description='Retrieve secrets from HashiCorp Vault') + parser.add_argument('--token', default=os.environ.get('VAULT_TOKEN'), + help='Vault authentication token (defaults to VAULT_TOKEN environment variable)') + parser.add_argument('--path', required=True, help='Path to the secret in Vault (with optional .key)') + parser.add_argument('--vault-addr', default='https://192.168.1.8:8200', help='Vault server address') + parser.add_argument('--no-verify', action='store_true', help='Disable TLS verification') + + args = parser.parse_args() + + if not args.token: + print("No token provided. Please set VAULT_TOKEN environment variable or use --token") + sys.exit(1) + + secret = Tools.get_vault_secret( + token=args.token, + path=args.path, + vault_addr=args.vault_addr, + verify=not args.no_verify + ) + + if secret is not None: + print(secret) + else: + print("Failed to retrieve secret") + + # if __name__ == '__main__': + # main()