import sys import subprocess import logging import urllib3 import os class Tools: def __init__(self): pass # Add your custom tools using pure Python code here, make sure to add type hints # Use Sphinx-style docstrings to document your tools, they will be used for generating tools specifications # Please refer to function_calling_filter_pipeline.py file from pipelines project for an example # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Try to import hvac, install if not present try: import hvac except ImportError: logger.info("hvac package not found. Attempting to install...") try: subprocess.check_call([sys.executable, "-m", "pip", "install", "hvac"]) import hvac logger.info("hvac package installed successfully") except subprocess.CalledProcessError as e: logger.error(f"Failed to install hvac package: {str(e)}") sys.exit(1) import argparse urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) def get_vault_secret(token, path, vault_addr='http://127.0.0.1:8200', verify=True): try: # Initialize the Vault client client = hvac.Client( url=vault_addr, token=token, verify=verify ) # Check if client is authenticated if not client.is_authenticated(): logger.error("Failed to authenticate with Vault") return None # Split path to separate the key if it exists path_parts = path.rsplit('.', 1) secret_path = path_parts[0] key = path_parts[1] if len(path_parts) > 1 else 'value' # Try KV v2 first try: # For KV v2, try with and without /data/ in the path try: if '/data/' not in secret_path: v2_path = secret_path.replace('//', '/').strip('/') mount_point = v2_path.split('/')[0] v2_path = '/'.join(v2_path.split('/')[1:]) else: # Remove /data/ for the API call v2_path = secret_path.replace('/data/', '/') mount_point = v2_path.split('/')[0] v2_path = '/'.join(v2_path.split('/')[1:]) secret = client.secrets.kv.v2.read_secret_version( path=v2_path, mount_point=mount_point, raise_on_deleted_version=False, ) if secret and 'data' in secret and 'data' in secret['data']: secret_data = secret['data']['data'] if key in secret_data: return secret_data[key] logger.warning(f"Key '{key}' not found in KV v2 secret") except Exception as e: logger.debug(f"KV v2 attempt failed: {str(e)}") # Try KV v1 try: secret = client.read(secret_path) if secret and 'data' in secret: secret_data = secret['data'] if key in secret_data: return secret_data[key] logger.warning(f"Key '{key}' not found in KV v1 secret") except Exception as e: logger.debug(f"KV v1 attempt failed: {str(e)}") print(f"Secret not found at path: {path}") return None except Exception as e: print(f"Error reading secret: {str(e)}") return None except Exception as e: print(f"Error connecting to Vault: {str(e)}") return None def vault_access(): """ Query Vault for secrets based on path and optionally a key User needs to provide a path, optionally a key, and the Vault token if not set in the environment. """ parser = argparse.ArgumentParser(description='Retrieve secrets from HashiCorp Vault') parser.add_argument('--token', default=os.environ.get('VAULT_TOKEN'), help='Vault authentication token (defaults to VAULT_TOKEN environment variable)') parser.add_argument('--path', required=True, help='Path to the secret in Vault (with optional .key)') parser.add_argument('--vault-addr', default='https://192.168.1.8:8200', help='Vault server address') parser.add_argument('--no-verify', action='store_true', help='Disable TLS verification') args = parser.parse_args() if not args.token: print("No token provided. Please set VAULT_TOKEN environment variable or use --token") sys.exit(1) secret = Tools.get_vault_secret( token=args.token, path=args.path, vault_addr=args.vault_addr, verify=not args.no_verify ) if secret is not None: print(secret) else: print("Failed to retrieve secret") # if __name__ == '__main__': # main()