update the version for PoC

This commit is contained in:
Josh Knapp 2024-12-16 08:34:09 -08:00
parent a8068f6a36
commit 21e481e85f
3 changed files with 221 additions and 236 deletions

View File

@ -1,3 +1,11 @@
# vault-access-tool # vault-access-tool
Allow Open Web-UI to access Hashicorp Vault POC Allow Open Web-UI to access Hashicorp Vault POC
This is a proof of concept for allowing the Open Web-UI to access Hashicorp Vault.
You should instruct your Model on how to use the tool like this:
"""
You have access to a tool called "HashiCorp Vault Tool" that can read secrets from our hosted Hashicorp Vault. If a user asks for a secret, assume it is stored within Vault. You will need the path of where the secret is stored and an access token to access secrets. If the user makes a request for a secret attempt to retrieve it via the tool's built-in token, but if it fails, you can ask the user to provide you a Token to use to access the secret. Always attempt to read the secret when asked, and show the current secret unless otherwise instructed. If you do not know the secret from running the tool, let the user know you are unable to get the secret, and show them the error as to why. An example of how to use the tool: get_vault_secret( "kv1/path/example", "hvs.sk38fm29rnhmd9833k3jke")
"""

View File

@ -3,121 +3,161 @@ import subprocess
import logging import logging
import urllib3 import urllib3
import os import os
from typing import Optional, Any
import asyncio
import contextlib
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Try to import hvac, install if not present class Tools:
try: def __init__(self) -> None:
import hvac pass
except ImportError:
logger.info("hvac package not found. Attempting to install...") # Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Try to import hvac, install if not present
try: try:
subprocess.check_call([sys.executable, "-m", "pip", "install", "hvac"])
import hvac import hvac
logger.info("hvac package installed successfully") except ImportError:
except subprocess.CalledProcessError as e: logger.info("hvac package not found. Attempting to install...")
logger.error(f"Failed to install hvac package: {str(e)}")
sys.exit(1)
import argparse
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def get_vault_secret(token, path, vault_addr='http://127.0.0.1:8200', verify=True):
try:
# Initialize the Vault client
client = hvac.Client(
url=vault_addr,
token=token,
verify=verify
)
# Check if client is authenticated
if not client.is_authenticated():
logger.error("Failed to authenticate with Vault")
return None
# Split path to separate the key if it exists
path_parts = path.rsplit('.', 1)
secret_path = path_parts[0]
key = path_parts[1] if len(path_parts) > 1 else 'value'
# Try KV v2 first
try: try:
# For KV v2, try with and without /data/ in the path subprocess.check_call([sys.executable, "-m", "pip", "install", "hvac"])
import hvac
logger.info("hvac package installed successfully")
except subprocess.CalledProcessError as e:
logger.error(f"Failed to install hvac package: {str(e)}")
sys.exit(1)
import argparse
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
@staticmethod
def get_vault_secret(
token: str,
path: str,
vault_addr: str = "http://127.0.0.1:8200",
verify: bool = True,
) -> Optional[Any]:
try:
# Initialize the Vault client
client = hvac.Client(url=vault_addr, token=token, verify=verify)
with contextlib.closing(client):
# Check if client is authenticated
if not client.is_authenticated():
print("Failed to authenticate with Vault")
return None
# Split path to separate the key if it exists
path_parts = path.rsplit(".", 1)
secret_path = path_parts[0]
key = path_parts[1] if len(path_parts) > 1 else "value"
# Try KV v2 first
try: try:
if '/data/' not in secret_path: # For KV v2, try with and without /data/ in the path
v2_path = secret_path.replace('//', '/').strip('/') try:
mount_point = v2_path.split('/')[0] if "/data/" not in secret_path:
v2_path = '/'.join(v2_path.split('/')[1:]) v2_path = secret_path.replace("//", "/").strip("/")
else: mount_point = v2_path.split("/")[0]
# Remove /data/ for the API call v2_path = "/".join(v2_path.split("/")[1:])
v2_path = secret_path.replace('/data/', '/') else:
mount_point = v2_path.split('/')[0] # Remove /data/ for the API call
v2_path = '/'.join(v2_path.split('/')[1:]) v2_path = secret_path.replace("/data/", "/")
mount_point = v2_path.split("/")[0]
v2_path = "/".join(v2_path.split("/")[1:])
secret = client.secrets.kv.v2.read_secret_version(
path=v2_path,
mount_point=mount_point,
raise_on_deleted_version=False,
)
if secret and "data" in secret and "data" in secret["data"]:
secret_data = secret["data"]["data"]
if key in secret_data:
return secret_data[key]
logger.warning(f"Key '{key}' not found in KV v2 secret")
except Exception as e:
logger.debug(f"KV v2 attempt failed: {str(e)}")
# Try KV v1
try:
secret = client.read(secret_path)
if secret and "data" in secret:
secret_data = secret["data"]
if key in secret_data:
return secret_data[key]
logger.warning(f"Key '{key}' not found in KV v1 secret")
except Exception as e:
logger.debug(f"KV v1 attempt failed: {str(e)}")
print(f"Secret not found at path: {path}")
return None
secret = client.secrets.kv.v2.read_secret_version(
path=v2_path,
mount_point=mount_point,
raise_on_deleted_version=False,
)
if secret and 'data' in secret and 'data' in secret['data']:
secret_data = secret['data']['data']
if key in secret_data:
return secret_data[key]
logger.warning(f"Key '{key}' not found in KV v2 secret")
except Exception as e: except Exception as e:
logger.debug(f"KV v2 attempt failed: {str(e)}") print(f"Error reading secret: {str(e)}")
return None
# Try KV v1
try:
secret = client.read(secret_path)
if secret and 'data' in secret:
secret_data = secret['data']
if key in secret_data:
return secret_data[key]
logger.warning(f"Key '{key}' not found in KV v1 secret")
except Exception as e:
logger.debug(f"KV v1 attempt failed: {str(e)}")
logger.error(f"Secret not found at path: {path}")
return None
except Exception as e: except Exception as e:
logger.error(f"Error reading secret: {str(e)}") print(f"Error connecting to Vault: {str(e)}")
return None return None
except Exception as e: @staticmethod
logger.error(f"Error connecting to Vault: {str(e)}") def vault_access() -> None:
return None """
Query Vault for secrets based on path and optionally a key
User needs to provide a path, optionally a key, and the Vault token if not set in the environment.
"""
parser = argparse.ArgumentParser(
description="Retrieve secrets from HashiCorp Vault"
)
parser.add_argument(
"--token",
default=os.environ.get("VAULT_TOKEN"),
help="Vault authentication token (defaults to VAULT_TOKEN environment variable)",
)
parser.add_argument(
"--path",
required=True,
help="Path to the secret in Vault (with optional .key)",
)
parser.add_argument(
"--vault-addr",
default="https://192.168.1.8:8200",
help="Vault server address",
)
parser.add_argument(
"--no-verify", action="store_true", help="Disable TLS verification"
)
def main(): args: argparse.Namespace = parser.parse_args()
parser = argparse.ArgumentParser(description='Retrieve secrets from HashiCorp Vault')
parser.add_argument('--token', default=os.environ.get('VAULT_TOKEN'),
help='Vault authentication token (defaults to VAULT_TOKEN environment variable)')
parser.add_argument('--path', required=True, help='Path to the secret in Vault (with optional .key)')
parser.add_argument('--vault-addr', default='http://127.0.0.1:8200', help='Vault server address')
parser.add_argument('--no-verify', action='store_true', help='Disable TLS verification')
args = parser.parse_args() if not args.token:
print(
"No token provided. Please set VAULT_TOKEN environment variable or use --token"
)
try:
secret: Optional[Any] = Tools.get_vault_secret(
token=args.token,
path=args.path,
vault_addr=args.vault_addr,
verify=not args.no_verify,
)
if not args.token: if secret is not None:
logger.error("No token provided. Please set VAULT_TOKEN environment variable or use --token") print(secret)
sys.exit(1) else:
print("Failed to retrieve secret")
except Exception as e:
print(f"Error: {e}")
finally:
if asyncio.get_event_loop().is_running():
pending = asyncio.all_tasks(asyncio.get_event_loop())
for task in pending:
task.cancel()
secret = get_vault_secret(
token=args.token,
path=args.path,
vault_addr=args.vault_addr,
verify=not args.no_verify
)
if secret is not None:
print(secret)
else:
logger.error("Failed to retrieve secret")
if __name__ == '__main__':
main()

View File

@ -1,153 +1,90 @@
import sys """
title: Hashicorp Vault Tool
author: Josh Knapp
version: 0.4.5
"""
import subprocess import subprocess
import logging
import urllib3
import os import os
from typing import Optional, Any import hvac
import urllib3
from pydantic import BaseModel, Field
# Try to import hvac, install if not present
try:
import hvac
except ImportError:
print("hvac package not found. Attempting to install...")
try:
subprocess.check_call([sys.executable, "-m", "pip", "install", "hvac"])
import hvac
print("hvac package installed successfully")
except subprocess.CalledProcessError as e:
print(f"Failed to install hvac package: {str(e)}")
class Tools: class Tools:
def __init__(self) -> None: class Valves(BaseModel):
VAULT_ADDR: str = Field(
default="",
description="The Web Address for the Vault Server",
)
TLS_VERIFY: bool = Field(
default="False",
description="Check the TLS Certificate for the Vault Server",
)
def __init__(self):
self.valves = self.Valves()
pass pass
# Configure logging def get_vault_secret(self, path: str, token: str) -> str:
logging.basicConfig( """
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" Read secrets stored within HashiCorp Vault based on the path provided by the user.
) :param token: Vault authentication token
logger = logging.getLogger(__name__) :param path: Path to the secret in Vault
:return: The value of the secret or the error message if one is produced
# Try to import hvac, install if not present """
try: # Check if Vault Address has been set
import hvac if not self.valves.VAULT_ADDR:
except ImportError: return "The Vault Address has not been set. Please define it in the Tool's Valves"
logger.info("hvac package not found. Attempting to install...") # Check if a Token is set
if not token:
return f"No token defined, please either provide one in the prompt or via an Environment Variable {token}"
vault_addr = self.valves.VAULT_ADDR
# Attempt to connect to HashiCorp Vault
try: try:
subprocess.check_call([sys.executable, "-m", "pip", "install", "hvac"]) if self.valves.TLS_VERIFY == True:
import hvac client = hvac.Client(
url=vault_addr, token=token, verify=self.valves.TLS_VERIFY
logger.info("hvac package installed successfully") )
except subprocess.CalledProcessError as e: else:
logger.error(f"Failed to install hvac package: {str(e)}") urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
sys.exit(1) client = hvac.Client(
url=vault_addr, token=token, verify=self.valves.TLS_VERIFY
import argparse )
# Check Authentication
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
@staticmethod
def get_vault_secret(
token: str,
path: str,
vault_addr: str = "http://127.0.0.1:8200",
verify: bool = True,
) -> Optional[Any]:
try:
# Initialize the Vault client
client = hvac.Client(url=vault_addr, token=token, verify=verify)
# Check if client is authenticated
if not client.is_authenticated(): if not client.is_authenticated():
logger.error("Failed to authenticate with Vault") return "Failed to authenticate with Vault"
return None
# Split path to separate the key if it exists # Split path to separate the key if it exists
path_parts = path.rsplit(".", 1) path_parts = path.rsplit(".", 1)
secret_path = path_parts[0] secret_path = path_parts[0]
key = path_parts[1] if len(path_parts) > 1 else "value" key = path_parts[1] if len(path_parts) > 1 else "value"
# Try KV v2 first
try: try:
# For KV v2, try with and without /data/ in the path secret = client.read(secret_path)
try: if secret and "data" in secret:
if "/data/" not in secret_path: secret_data = secret["data"]
v2_path = secret_path.replace("//", "/").strip("/") if key in secret_data:
mount_point = v2_path.split("/")[0] return secret_data[key]
v2_path = "/".join(v2_path.split("/")[1:])
else: else:
# Remove /data/ for the API call return f"Key '{key}' not found in KV v1 secret path. Check your secret path and ensure the token used has the appropriate permissions."
v2_path = secret_path.replace("/data/", "/")
mount_point = v2_path.split("/")[0]
v2_path = "/".join(v2_path.split("/")[1:])
secret = client.secrets.kv.v2.read_secret_version(
path=v2_path,
mount_point=mount_point,
raise_on_deleted_version=False,
)
if secret and "data" in secret and "data" in secret["data"]:
secret_data = secret["data"]["data"]
if key in secret_data:
return secret_data[key]
logger.warning(f"Key '{key}' not found in KV v2 secret")
except Exception as e:
logger.debug(f"KV v2 attempt failed: {str(e)}")
# Try KV v1
try:
secret = client.read(secret_path)
if secret and "data" in secret:
secret_data = secret["data"]
if key in secret_data:
return secret_data[key]
logger.warning(f"Key '{key}' not found in KV v1 secret")
except Exception as e:
logger.debug(f"KV v1 attempt failed: {str(e)}")
print(f"Secret not found at path: {path}")
return None
except Exception as e: except Exception as e:
print(f"Error reading secret: {str(e)}") print(f"KV v1 attempt failed: {str(e)}")
return None return f"Secret not found at path: {path}"
except Exception as e: except Exception as e:
print(f"Error connecting to Vault: {str(e)}") return f"Error connecting to Vault: {str(e)}"
return None
@staticmethod
def vault_access() -> None:
"""
Query Vault for secrets based on path and optionally a key
User needs to provide a path, optionally a key, and the Vault token if not set in the environment.
"""
parser = argparse.ArgumentParser(
description="Retrieve secrets from HashiCorp Vault"
)
parser.add_argument(
"--token",
default=os.environ.get("VAULT_TOKEN"),
help="Vault authentication token (defaults to VAULT_TOKEN environment variable)",
)
parser.add_argument(
"--path",
required=True,
help="Path to the secret in Vault (with optional .key)",
)
parser.add_argument(
"--vault-addr",
default="https://192.168.1.8:8200",
help="Vault server address",
)
parser.add_argument(
"--no-verify", action="store_true", help="Disable TLS verification"
)
args: argparse.Namespace = parser.parse_args()
if not args.token:
print(
"No token provided. Please set VAULT_TOKEN environment variable or use --token"
)
sys.exit(1)
secret: Optional[Any] = Tools.get_vault_secret(
token=args.token,
path=args.path,
vault_addr=args.vault_addr,
verify=not args.no_verify,
)
if secret is not None:
print(secret)
else:
print("Failed to retrieve secret")