""" title: Hashicorp Vault Tool author: Josh Knapp version: 0.4.5 """ import subprocess import os import hvac import urllib3 from pydantic import BaseModel, Field # Try to import hvac, install if not present try: import hvac except ImportError: print("hvac package not found. Attempting to install...") try: subprocess.check_call([sys.executable, "-m", "pip", "install", "hvac"]) import hvac print("hvac package installed successfully") except subprocess.CalledProcessError as e: print(f"Failed to install hvac package: {str(e)}") class Tools: class Valves(BaseModel): VAULT_ADDR: str = Field( default="", description="The Web Address for the Vault Server", ) TLS_VERIFY: bool = Field( default="False", description="Check the TLS Certificate for the Vault Server", ) def __init__(self): self.valves = self.Valves() pass def get_vault_secret(self, path: str, token: str) -> str: """ Read secrets stored within HashiCorp Vault based on the path provided by the user. :param token: Vault authentication token :param path: Path to the secret in Vault :return: The value of the secret or the error message if one is produced """ # Check if Vault Address has been set if not self.valves.VAULT_ADDR: return "The Vault Address has not been set. Please define it in the Tool's Valves" # Check if a Token is set if not token: return f"No token defined, please either provide one in the prompt or via an Environment Variable {token}" vault_addr = self.valves.VAULT_ADDR # Attempt to connect to HashiCorp Vault try: if self.valves.TLS_VERIFY == True: client = hvac.Client( url=vault_addr, token=token, verify=self.valves.TLS_VERIFY ) else: urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) client = hvac.Client( url=vault_addr, token=token, verify=self.valves.TLS_VERIFY ) # Check Authentication if not client.is_authenticated(): return "Failed to authenticate with Vault" # Split path to separate the key if it exists path_parts = path.rsplit(".", 1) secret_path = path_parts[0] key = path_parts[1] if len(path_parts) > 1 else "value" try: secret = client.read(secret_path) if secret and "data" in secret: secret_data = secret["data"] if key in secret_data: return secret_data[key] else: return f"Key '{key}' not found in KV v1 secret path. Check your secret path and ensure the token used has the appropriate permissions." except Exception as e: print(f"KV v1 attempt failed: {str(e)}") return f"Secret not found at path: {path}" except Exception as e: return f"Error connecting to Vault: {str(e)}"