Compare commits
No commits in common. "d1f7e8ea7f5879b4f733f41474cbfa167a9234ac" and "785be14e68db14312dbac94920a05d64025abb5c" have entirely different histories.
d1f7e8ea7f
...
785be14e68
123
vault_reader.py
123
vault_reader.py
@ -1,123 +0,0 @@
|
|||||||
import sys
|
|
||||||
import subprocess
|
|
||||||
import logging
|
|
||||||
import urllib3
|
|
||||||
import os
|
|
||||||
|
|
||||||
# Configure logging
|
|
||||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
# Try to import hvac, install if not present
|
|
||||||
try:
|
|
||||||
import hvac
|
|
||||||
except ImportError:
|
|
||||||
logger.info("hvac package not found. Attempting to install...")
|
|
||||||
try:
|
|
||||||
subprocess.check_call([sys.executable, "-m", "pip", "install", "hvac"])
|
|
||||||
import hvac
|
|
||||||
logger.info("hvac package installed successfully")
|
|
||||||
except subprocess.CalledProcessError as e:
|
|
||||||
logger.error(f"Failed to install hvac package: {str(e)}")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
import argparse
|
|
||||||
|
|
||||||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
||||||
|
|
||||||
def get_vault_secret(token, path, vault_addr='http://127.0.0.1:8200', verify=True):
|
|
||||||
try:
|
|
||||||
# Initialize the Vault client
|
|
||||||
client = hvac.Client(
|
|
||||||
url=vault_addr,
|
|
||||||
token=token,
|
|
||||||
verify=verify
|
|
||||||
)
|
|
||||||
|
|
||||||
# Check if client is authenticated
|
|
||||||
if not client.is_authenticated():
|
|
||||||
logger.error("Failed to authenticate with Vault")
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Split path to separate the key if it exists
|
|
||||||
path_parts = path.rsplit('.', 1)
|
|
||||||
secret_path = path_parts[0]
|
|
||||||
key = path_parts[1] if len(path_parts) > 1 else 'value'
|
|
||||||
|
|
||||||
# Try KV v2 first
|
|
||||||
try:
|
|
||||||
# For KV v2, try with and without /data/ in the path
|
|
||||||
try:
|
|
||||||
if '/data/' not in secret_path:
|
|
||||||
v2_path = secret_path.replace('//', '/').strip('/')
|
|
||||||
mount_point = v2_path.split('/')[0]
|
|
||||||
v2_path = '/'.join(v2_path.split('/')[1:])
|
|
||||||
else:
|
|
||||||
# Remove /data/ for the API call
|
|
||||||
v2_path = secret_path.replace('/data/', '/')
|
|
||||||
mount_point = v2_path.split('/')[0]
|
|
||||||
v2_path = '/'.join(v2_path.split('/')[1:])
|
|
||||||
|
|
||||||
secret = client.secrets.kv.v2.read_secret_version(
|
|
||||||
path=v2_path,
|
|
||||||
mount_point=mount_point,
|
|
||||||
raise_on_deleted_version=False,
|
|
||||||
)
|
|
||||||
if secret and 'data' in secret and 'data' in secret['data']:
|
|
||||||
secret_data = secret['data']['data']
|
|
||||||
if key in secret_data:
|
|
||||||
return secret_data[key]
|
|
||||||
logger.warning(f"Key '{key}' not found in KV v2 secret")
|
|
||||||
except Exception as e:
|
|
||||||
logger.debug(f"KV v2 attempt failed: {str(e)}")
|
|
||||||
|
|
||||||
# Try KV v1
|
|
||||||
try:
|
|
||||||
secret = client.read(secret_path)
|
|
||||||
if secret and 'data' in secret:
|
|
||||||
secret_data = secret['data']
|
|
||||||
if key in secret_data:
|
|
||||||
return secret_data[key]
|
|
||||||
logger.warning(f"Key '{key}' not found in KV v1 secret")
|
|
||||||
except Exception as e:
|
|
||||||
logger.debug(f"KV v1 attempt failed: {str(e)}")
|
|
||||||
|
|
||||||
logger.error(f"Secret not found at path: {path}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error reading secret: {str(e)}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error connecting to Vault: {str(e)}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
def main():
|
|
||||||
parser = argparse.ArgumentParser(description='Retrieve secrets from HashiCorp Vault')
|
|
||||||
parser.add_argument('--token', default=os.environ.get('VAULT_TOKEN'),
|
|
||||||
help='Vault authentication token (defaults to VAULT_TOKEN environment variable)')
|
|
||||||
parser.add_argument('--path', required=True, help='Path to the secret in Vault (with optional .key)')
|
|
||||||
parser.add_argument('--vault-addr', default='http://127.0.0.1:8200', help='Vault server address')
|
|
||||||
parser.add_argument('--no-verify', action='store_true', help='Disable TLS verification')
|
|
||||||
|
|
||||||
args = parser.parse_args()
|
|
||||||
|
|
||||||
if not args.token:
|
|
||||||
logger.error("No token provided. Please set VAULT_TOKEN environment variable or use --token")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
secret = get_vault_secret(
|
|
||||||
token=args.token,
|
|
||||||
path=args.path,
|
|
||||||
vault_addr=args.vault_addr,
|
|
||||||
verify=not args.no_verify
|
|
||||||
)
|
|
||||||
|
|
||||||
if secret is not None:
|
|
||||||
print(secret)
|
|
||||||
else:
|
|
||||||
logger.error("Failed to retrieve secret")
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
main()
|
|
136
vault_tool.py
136
vault_tool.py
@ -1,136 +0,0 @@
|
|||||||
import sys
|
|
||||||
import subprocess
|
|
||||||
import logging
|
|
||||||
import urllib3
|
|
||||||
import os
|
|
||||||
|
|
||||||
|
|
||||||
class Tools:
|
|
||||||
def __init__(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Add your custom tools using pure Python code here, make sure to add type hints
|
|
||||||
# Use Sphinx-style docstrings to document your tools, they will be used for generating tools specifications
|
|
||||||
# Please refer to function_calling_filter_pipeline.py file from pipelines project for an example
|
|
||||||
|
|
||||||
# Configure logging
|
|
||||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
# Try to import hvac, install if not present
|
|
||||||
try:
|
|
||||||
import hvac
|
|
||||||
except ImportError:
|
|
||||||
logger.info("hvac package not found. Attempting to install...")
|
|
||||||
try:
|
|
||||||
subprocess.check_call([sys.executable, "-m", "pip", "install", "hvac"])
|
|
||||||
import hvac
|
|
||||||
logger.info("hvac package installed successfully")
|
|
||||||
except subprocess.CalledProcessError as e:
|
|
||||||
logger.error(f"Failed to install hvac package: {str(e)}")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
import argparse
|
|
||||||
|
|
||||||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
||||||
|
|
||||||
def get_vault_secret(token, path, vault_addr='http://127.0.0.1:8200', verify=True):
|
|
||||||
try:
|
|
||||||
# Initialize the Vault client
|
|
||||||
client = hvac.Client(
|
|
||||||
url=vault_addr,
|
|
||||||
token=token,
|
|
||||||
verify=verify
|
|
||||||
)
|
|
||||||
|
|
||||||
# Check if client is authenticated
|
|
||||||
if not client.is_authenticated():
|
|
||||||
logger.error("Failed to authenticate with Vault")
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Split path to separate the key if it exists
|
|
||||||
path_parts = path.rsplit('.', 1)
|
|
||||||
secret_path = path_parts[0]
|
|
||||||
key = path_parts[1] if len(path_parts) > 1 else 'value'
|
|
||||||
|
|
||||||
# Try KV v2 first
|
|
||||||
try:
|
|
||||||
# For KV v2, try with and without /data/ in the path
|
|
||||||
try:
|
|
||||||
if '/data/' not in secret_path:
|
|
||||||
v2_path = secret_path.replace('//', '/').strip('/')
|
|
||||||
mount_point = v2_path.split('/')[0]
|
|
||||||
v2_path = '/'.join(v2_path.split('/')[1:])
|
|
||||||
else:
|
|
||||||
# Remove /data/ for the API call
|
|
||||||
v2_path = secret_path.replace('/data/', '/')
|
|
||||||
mount_point = v2_path.split('/')[0]
|
|
||||||
v2_path = '/'.join(v2_path.split('/')[1:])
|
|
||||||
|
|
||||||
secret = client.secrets.kv.v2.read_secret_version(
|
|
||||||
path=v2_path,
|
|
||||||
mount_point=mount_point,
|
|
||||||
raise_on_deleted_version=False,
|
|
||||||
)
|
|
||||||
if secret and 'data' in secret and 'data' in secret['data']:
|
|
||||||
secret_data = secret['data']['data']
|
|
||||||
if key in secret_data:
|
|
||||||
return secret_data[key]
|
|
||||||
logger.warning(f"Key '{key}' not found in KV v2 secret")
|
|
||||||
except Exception as e:
|
|
||||||
logger.debug(f"KV v2 attempt failed: {str(e)}")
|
|
||||||
|
|
||||||
# Try KV v1
|
|
||||||
try:
|
|
||||||
secret = client.read(secret_path)
|
|
||||||
if secret and 'data' in secret:
|
|
||||||
secret_data = secret['data']
|
|
||||||
if key in secret_data:
|
|
||||||
return secret_data[key]
|
|
||||||
logger.warning(f"Key '{key}' not found in KV v1 secret")
|
|
||||||
except Exception as e:
|
|
||||||
logger.debug(f"KV v1 attempt failed: {str(e)}")
|
|
||||||
|
|
||||||
print(f"Secret not found at path: {path}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error reading secret: {str(e)}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error connecting to Vault: {str(e)}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
def vault_access():
|
|
||||||
"""
|
|
||||||
Query Vault for secrets based on path and optionally a key
|
|
||||||
User needs to provide a path, optionally a key, and the Vault token if not set in the environment.
|
|
||||||
"""
|
|
||||||
parser = argparse.ArgumentParser(description='Retrieve secrets from HashiCorp Vault')
|
|
||||||
parser.add_argument('--token', default=os.environ.get('VAULT_TOKEN'),
|
|
||||||
help='Vault authentication token (defaults to VAULT_TOKEN environment variable)')
|
|
||||||
parser.add_argument('--path', required=True, help='Path to the secret in Vault (with optional .key)')
|
|
||||||
parser.add_argument('--vault-addr', default='https://192.168.1.8:8200', help='Vault server address')
|
|
||||||
parser.add_argument('--no-verify', action='store_true', help='Disable TLS verification')
|
|
||||||
|
|
||||||
args = parser.parse_args()
|
|
||||||
|
|
||||||
if not args.token:
|
|
||||||
print("No token provided. Please set VAULT_TOKEN environment variable or use --token")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
secret = Tools.get_vault_secret(
|
|
||||||
token=args.token,
|
|
||||||
path=args.path,
|
|
||||||
vault_addr=args.vault_addr,
|
|
||||||
verify=not args.no_verify
|
|
||||||
)
|
|
||||||
|
|
||||||
if secret is not None:
|
|
||||||
print(secret)
|
|
||||||
else:
|
|
||||||
print("Failed to retrieve secret")
|
|
||||||
|
|
||||||
# if __name__ == '__main__':
|
|
||||||
# main()
|
|
Loading…
Reference in New Issue
Block a user