adding files to be merged into main
This commit is contained in:
		
							
								
								
									
										123
									
								
								vault_reader.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										123
									
								
								vault_reader.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,123 @@
 | 
				
			|||||||
 | 
					import sys
 | 
				
			||||||
 | 
					import subprocess
 | 
				
			||||||
 | 
					import logging
 | 
				
			||||||
 | 
					import urllib3
 | 
				
			||||||
 | 
					import os
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# Configure logging
 | 
				
			||||||
 | 
					logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
 | 
				
			||||||
 | 
					logger = logging.getLogger(__name__)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# Try to import hvac, install if not present
 | 
				
			||||||
 | 
					try:
 | 
				
			||||||
 | 
					    import hvac
 | 
				
			||||||
 | 
					except ImportError:
 | 
				
			||||||
 | 
					    logger.info("hvac package not found. Attempting to install...")
 | 
				
			||||||
 | 
					    try:
 | 
				
			||||||
 | 
					        subprocess.check_call([sys.executable, "-m", "pip", "install", "hvac"])
 | 
				
			||||||
 | 
					        import hvac
 | 
				
			||||||
 | 
					        logger.info("hvac package installed successfully")
 | 
				
			||||||
 | 
					    except subprocess.CalledProcessError as e:
 | 
				
			||||||
 | 
					        logger.error(f"Failed to install hvac package: {str(e)}")
 | 
				
			||||||
 | 
					        sys.exit(1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import argparse
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def get_vault_secret(token, path, vault_addr='http://127.0.0.1:8200', verify=True):
 | 
				
			||||||
 | 
					    try:
 | 
				
			||||||
 | 
					        # Initialize the Vault client
 | 
				
			||||||
 | 
					        client = hvac.Client(
 | 
				
			||||||
 | 
					            url=vault_addr,
 | 
				
			||||||
 | 
					            token=token,
 | 
				
			||||||
 | 
					            verify=verify
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Check if client is authenticated
 | 
				
			||||||
 | 
					        if not client.is_authenticated():
 | 
				
			||||||
 | 
					            logger.error("Failed to authenticate with Vault")
 | 
				
			||||||
 | 
					            return None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Split path to separate the key if it exists
 | 
				
			||||||
 | 
					        path_parts = path.rsplit('.', 1)
 | 
				
			||||||
 | 
					        secret_path = path_parts[0]
 | 
				
			||||||
 | 
					        key = path_parts[1] if len(path_parts) > 1 else 'value'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Try KV v2 first
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
 | 
					            # For KV v2, try with and without /data/ in the path
 | 
				
			||||||
 | 
					            try:
 | 
				
			||||||
 | 
					                if '/data/' not in secret_path:
 | 
				
			||||||
 | 
					                    v2_path = secret_path.replace('//', '/').strip('/')
 | 
				
			||||||
 | 
					                    mount_point = v2_path.split('/')[0]
 | 
				
			||||||
 | 
					                    v2_path = '/'.join(v2_path.split('/')[1:])
 | 
				
			||||||
 | 
					                else:
 | 
				
			||||||
 | 
					                    # Remove /data/ for the API call
 | 
				
			||||||
 | 
					                    v2_path = secret_path.replace('/data/', '/')
 | 
				
			||||||
 | 
					                    mount_point = v2_path.split('/')[0]
 | 
				
			||||||
 | 
					                    v2_path = '/'.join(v2_path.split('/')[1:])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                secret = client.secrets.kv.v2.read_secret_version(
 | 
				
			||||||
 | 
					                    path=v2_path,
 | 
				
			||||||
 | 
					                    mount_point=mount_point,
 | 
				
			||||||
 | 
					                    raise_on_deleted_version=False,
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					                if secret and 'data' in secret and 'data' in secret['data']:
 | 
				
			||||||
 | 
					                    secret_data = secret['data']['data']
 | 
				
			||||||
 | 
					                    if key in secret_data:
 | 
				
			||||||
 | 
					                        return secret_data[key]
 | 
				
			||||||
 | 
					                    logger.warning(f"Key '{key}' not found in KV v2 secret")
 | 
				
			||||||
 | 
					            except Exception as e:
 | 
				
			||||||
 | 
					                logger.debug(f"KV v2 attempt failed: {str(e)}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # Try KV v1
 | 
				
			||||||
 | 
					            try:
 | 
				
			||||||
 | 
					                secret = client.read(secret_path)
 | 
				
			||||||
 | 
					                if secret and 'data' in secret:
 | 
				
			||||||
 | 
					                    secret_data = secret['data']
 | 
				
			||||||
 | 
					                    if key in secret_data:
 | 
				
			||||||
 | 
					                        return secret_data[key]
 | 
				
			||||||
 | 
					                    logger.warning(f"Key '{key}' not found in KV v1 secret")
 | 
				
			||||||
 | 
					            except Exception as e:
 | 
				
			||||||
 | 
					                logger.debug(f"KV v1 attempt failed: {str(e)}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            logger.error(f"Secret not found at path: {path}")
 | 
				
			||||||
 | 
					            return None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        except Exception as e:
 | 
				
			||||||
 | 
					            logger.error(f"Error reading secret: {str(e)}")
 | 
				
			||||||
 | 
					            return None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    except Exception as e:
 | 
				
			||||||
 | 
					        logger.error(f"Error connecting to Vault: {str(e)}")
 | 
				
			||||||
 | 
					        return None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def main():
 | 
				
			||||||
 | 
					    parser = argparse.ArgumentParser(description='Retrieve secrets from HashiCorp Vault')
 | 
				
			||||||
 | 
					    parser.add_argument('--token', default=os.environ.get('VAULT_TOKEN'), 
 | 
				
			||||||
 | 
					                       help='Vault authentication token (defaults to VAULT_TOKEN environment variable)')
 | 
				
			||||||
 | 
					    parser.add_argument('--path', required=True, help='Path to the secret in Vault (with optional .key)')
 | 
				
			||||||
 | 
					    parser.add_argument('--vault-addr', default='http://127.0.0.1:8200', help='Vault server address')
 | 
				
			||||||
 | 
					    parser.add_argument('--no-verify', action='store_true', help='Disable TLS verification')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    args = parser.parse_args()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    if not args.token:
 | 
				
			||||||
 | 
					        logger.error("No token provided. Please set VAULT_TOKEN environment variable or use --token")
 | 
				
			||||||
 | 
					        sys.exit(1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    secret = get_vault_secret(
 | 
				
			||||||
 | 
					        token=args.token,
 | 
				
			||||||
 | 
					        path=args.path,
 | 
				
			||||||
 | 
					        vault_addr=args.vault_addr,
 | 
				
			||||||
 | 
					        verify=not args.no_verify
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    if secret is not None:
 | 
				
			||||||
 | 
					        print(secret)
 | 
				
			||||||
 | 
					    else:
 | 
				
			||||||
 | 
					        logger.error("Failed to retrieve secret")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					if __name__ == '__main__':
 | 
				
			||||||
 | 
					    main()
 | 
				
			||||||
							
								
								
									
										136
									
								
								vault_tool.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										136
									
								
								vault_tool.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,136 @@
 | 
				
			|||||||
 | 
					import sys
 | 
				
			||||||
 | 
					import subprocess
 | 
				
			||||||
 | 
					import logging
 | 
				
			||||||
 | 
					import urllib3
 | 
				
			||||||
 | 
					import os
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class Tools:
 | 
				
			||||||
 | 
					    def __init__(self):
 | 
				
			||||||
 | 
					        pass
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Add your custom tools using pure Python code here, make sure to add type hints
 | 
				
			||||||
 | 
					    # Use Sphinx-style docstrings to document your tools, they will be used for generating tools specifications
 | 
				
			||||||
 | 
					    # Please refer to function_calling_filter_pipeline.py file from pipelines project for an example
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Configure logging
 | 
				
			||||||
 | 
					    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
 | 
				
			||||||
 | 
					    logger = logging.getLogger(__name__)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Try to import hvac, install if not present
 | 
				
			||||||
 | 
					    try:
 | 
				
			||||||
 | 
					        import hvac
 | 
				
			||||||
 | 
					    except ImportError:
 | 
				
			||||||
 | 
					        logger.info("hvac package not found. Attempting to install...")
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
 | 
					            subprocess.check_call([sys.executable, "-m", "pip", "install", "hvac"])
 | 
				
			||||||
 | 
					            import hvac
 | 
				
			||||||
 | 
					            logger.info("hvac package installed successfully")
 | 
				
			||||||
 | 
					        except subprocess.CalledProcessError as e:
 | 
				
			||||||
 | 
					            logger.error(f"Failed to install hvac package: {str(e)}")
 | 
				
			||||||
 | 
					            sys.exit(1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    import argparse
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def get_vault_secret(token, path, vault_addr='http://127.0.0.1:8200', verify=True):
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
 | 
					            # Initialize the Vault client
 | 
				
			||||||
 | 
					            client = hvac.Client(
 | 
				
			||||||
 | 
					                url=vault_addr,
 | 
				
			||||||
 | 
					                token=token,
 | 
				
			||||||
 | 
					                verify=verify
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # Check if client is authenticated
 | 
				
			||||||
 | 
					            if not client.is_authenticated():
 | 
				
			||||||
 | 
					                logger.error("Failed to authenticate with Vault")
 | 
				
			||||||
 | 
					                return None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # Split path to separate the key if it exists
 | 
				
			||||||
 | 
					            path_parts = path.rsplit('.', 1)
 | 
				
			||||||
 | 
					            secret_path = path_parts[0]
 | 
				
			||||||
 | 
					            key = path_parts[1] if len(path_parts) > 1 else 'value'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # Try KV v2 first
 | 
				
			||||||
 | 
					            try:
 | 
				
			||||||
 | 
					                # For KV v2, try with and without /data/ in the path
 | 
				
			||||||
 | 
					                try:
 | 
				
			||||||
 | 
					                    if '/data/' not in secret_path:
 | 
				
			||||||
 | 
					                        v2_path = secret_path.replace('//', '/').strip('/')
 | 
				
			||||||
 | 
					                        mount_point = v2_path.split('/')[0]
 | 
				
			||||||
 | 
					                        v2_path = '/'.join(v2_path.split('/')[1:])
 | 
				
			||||||
 | 
					                    else:
 | 
				
			||||||
 | 
					                        # Remove /data/ for the API call
 | 
				
			||||||
 | 
					                        v2_path = secret_path.replace('/data/', '/')
 | 
				
			||||||
 | 
					                        mount_point = v2_path.split('/')[0]
 | 
				
			||||||
 | 
					                        v2_path = '/'.join(v2_path.split('/')[1:])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    secret = client.secrets.kv.v2.read_secret_version(
 | 
				
			||||||
 | 
					                        path=v2_path,
 | 
				
			||||||
 | 
					                        mount_point=mount_point,
 | 
				
			||||||
 | 
					                        raise_on_deleted_version=False,
 | 
				
			||||||
 | 
					                    )
 | 
				
			||||||
 | 
					                    if secret and 'data' in secret and 'data' in secret['data']:
 | 
				
			||||||
 | 
					                        secret_data = secret['data']['data']
 | 
				
			||||||
 | 
					                        if key in secret_data:
 | 
				
			||||||
 | 
					                            return secret_data[key]
 | 
				
			||||||
 | 
					                        logger.warning(f"Key '{key}' not found in KV v2 secret")
 | 
				
			||||||
 | 
					                except Exception as e:
 | 
				
			||||||
 | 
					                    logger.debug(f"KV v2 attempt failed: {str(e)}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                # Try KV v1
 | 
				
			||||||
 | 
					                try:
 | 
				
			||||||
 | 
					                    secret = client.read(secret_path)
 | 
				
			||||||
 | 
					                    if secret and 'data' in secret:
 | 
				
			||||||
 | 
					                        secret_data = secret['data']
 | 
				
			||||||
 | 
					                        if key in secret_data:
 | 
				
			||||||
 | 
					                            return secret_data[key]
 | 
				
			||||||
 | 
					                        logger.warning(f"Key '{key}' not found in KV v1 secret")
 | 
				
			||||||
 | 
					                except Exception as e:
 | 
				
			||||||
 | 
					                    logger.debug(f"KV v1 attempt failed: {str(e)}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                print(f"Secret not found at path: {path}")
 | 
				
			||||||
 | 
					                return None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            except Exception as e:
 | 
				
			||||||
 | 
					                print(f"Error reading secret: {str(e)}")
 | 
				
			||||||
 | 
					                return None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        except Exception as e:
 | 
				
			||||||
 | 
					            print(f"Error connecting to Vault: {str(e)}")
 | 
				
			||||||
 | 
					            return None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def vault_access():
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        Query Vault for secrets based on path and optionally a key
 | 
				
			||||||
 | 
					        User needs to provide a path, optionally a key, and the Vault token if not set in the environment.
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        parser = argparse.ArgumentParser(description='Retrieve secrets from HashiCorp Vault')
 | 
				
			||||||
 | 
					        parser.add_argument('--token', default=os.environ.get('VAULT_TOKEN'), 
 | 
				
			||||||
 | 
					                        help='Vault authentication token (defaults to VAULT_TOKEN environment variable)')
 | 
				
			||||||
 | 
					        parser.add_argument('--path', required=True, help='Path to the secret in Vault (with optional .key)')
 | 
				
			||||||
 | 
					        parser.add_argument('--vault-addr', default='https://192.168.1.8:8200', help='Vault server address')
 | 
				
			||||||
 | 
					        parser.add_argument('--no-verify', action='store_true', help='Disable TLS verification')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        args = parser.parse_args()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if not args.token:
 | 
				
			||||||
 | 
					            print("No token provided. Please set VAULT_TOKEN environment variable or use --token")
 | 
				
			||||||
 | 
					            sys.exit(1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        secret = Tools.get_vault_secret(
 | 
				
			||||||
 | 
					            token=args.token,
 | 
				
			||||||
 | 
					            path=args.path,
 | 
				
			||||||
 | 
					            vault_addr=args.vault_addr,
 | 
				
			||||||
 | 
					            verify=not args.no_verify
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if secret is not None:
 | 
				
			||||||
 | 
					            print(secret)
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
 | 
					            print("Failed to retrieve secret")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # if __name__ == '__main__':
 | 
				
			||||||
 | 
					    #     main()
 | 
				
			||||||
		Reference in New Issue
	
	Block a user