Files
lijiaoqiao/llm-gateway-competitors/litellm-wheel-src/litellm/secret_managers/secret_manager_handler.py
2026-03-26 20:06:14 +08:00

183 lines
6.6 KiB
Python

"""
Secret Manager Handler
Handles retrieving secrets from different secret management systems.
"""
import base64
import os
from typing import Any, Optional
import litellm
from litellm._logging import print_verbose
from litellm.types.secret_managers.main import KeyManagementSystem
def _is_base64(s):
"""Check if a string is valid base64."""
import binascii
try:
return base64.b64encode(base64.b64decode(s)).decode() == s
except binascii.Error:
return False
def get_secret_from_manager( # noqa: PLR0915
client: Any,
key_manager: str,
secret_name: str,
key_management_settings: Optional[Any] = None,
) -> Optional[str]:
"""
Get a secret from the configured secret manager.
Args:
client: The secret manager client instance
key_manager: The type of key manager (e.g., "azure_key_vault", "google_kms", etc.)
secret_name: The name/path of the secret to retrieve
key_management_settings: Optional settings for the key management system
Returns:
The secret value as a string, or None if not found
Raises:
ValueError: If the secret cannot be retrieved or required parameters are missing
Exception: For other errors during secret retrieval
"""
secret = None
if (
key_manager == KeyManagementSystem.AZURE_KEY_VAULT.value
or type(client).__module__ + "." + type(client).__name__
== "azure.keyvault.secrets._client.SecretClient"
): # support Azure Secret Client - from azure.keyvault.secrets import SecretClient
secret = client.get_secret(secret_name).value
elif (
key_manager == KeyManagementSystem.GOOGLE_KMS.value
or client.__class__.__name__ == "KeyManagementServiceClient"
):
encrypted_secret: Any = os.getenv(secret_name)
if encrypted_secret is None:
raise ValueError(
"Google KMS requires the encrypted secret to be in the environment!"
)
b64_flag = _is_base64(encrypted_secret)
if b64_flag is True: # if passed in as encoded b64 string
encrypted_secret = base64.b64decode(encrypted_secret)
ciphertext = encrypted_secret
else:
raise ValueError(
"Google KMS requires the encrypted secret to be encoded in base64"
) # fix for this vulnerability https://huntr.com/bounties/ae623c2f-b64b-4245-9ed4-f13a0a5824ce
response = client.decrypt(
request={
"name": litellm._google_kms_resource_name,
"ciphertext": ciphertext,
}
)
secret = response.plaintext.decode(
"utf-8"
) # assumes the original value was encoded with utf-8
elif key_manager == KeyManagementSystem.AWS_KMS.value:
"""
Only check the tokens which start with 'aws_kms/'. This prevents latency impact caused by checking all keys.
"""
encrypted_value = os.getenv(secret_name, None)
if encrypted_value is None:
raise Exception(
"AWS KMS - Encrypted Value of Key={} is None".format(secret_name)
)
# Decode the base64 encoded ciphertext
ciphertext_blob = base64.b64decode(encrypted_value)
# Set up the parameters for the decrypt call
params = {"CiphertextBlob": ciphertext_blob}
# Perform the decryption
response = client.decrypt(**params)
# Extract and decode the plaintext
plaintext = response["Plaintext"]
secret = plaintext.decode("utf-8")
if isinstance(secret, str):
secret = secret.strip()
elif key_manager == KeyManagementSystem.AWS_SECRET_MANAGER.value:
from litellm.secret_managers.aws_secret_manager_v2 import (
AWSSecretsManagerV2,
)
if isinstance(client, AWSSecretsManagerV2):
primary_secret_name = None
if key_management_settings is not None:
primary_secret_name = key_management_settings.primary_secret_name
secret = client.sync_read_secret(
secret_name=secret_name,
primary_secret_name=primary_secret_name,
)
print_verbose(f"get_secret_value_response: {secret}")
elif key_manager == KeyManagementSystem.GOOGLE_SECRET_MANAGER.value:
try:
secret = client.get_secret_from_google_secret_manager(secret_name)
print_verbose(f"secret from google secret manager: {secret}")
if secret is None:
raise ValueError(
f"No secret found in Google Secret Manager for {secret_name}"
)
except Exception as e:
print_verbose(f"An error occurred - {str(e)}")
raise e
elif key_manager == KeyManagementSystem.HASHICORP_VAULT.value:
try:
secret = client.sync_read_secret(secret_name=secret_name)
if secret is None:
raise ValueError(
f"No secret found in Hashicorp Secret Manager for {secret_name}"
)
except Exception as e:
print_verbose(f"An error occurred - {str(e)}")
raise e
elif key_manager == KeyManagementSystem.CYBERARK.value:
try:
secret = client.sync_read_secret(secret_name=secret_name)
if secret is None:
raise ValueError(
f"No secret found in CyberArk Secret Manager for {secret_name}"
)
except Exception as e:
print_verbose(f"An error occurred - {str(e)}")
raise e
elif key_manager == KeyManagementSystem.CUSTOM.value:
# Check if client is a CustomSecretManager instance
from litellm.integrations.custom_secret_manager import CustomSecretManager
if isinstance(client, CustomSecretManager):
secret = client.sync_read_secret(
secret_name=secret_name,
optional_params=key_management_settings.model_dump()
if key_management_settings
else None,
)
if secret is None:
raise ValueError(
f"No secret found in Custom Secret Manager for {secret_name}"
)
else:
raise ValueError(
f"Custom secret manager client must be an instance of CustomSecretManager, got {type(client).__name__}"
)
elif key_manager == "local":
secret = os.getenv(secret_name)
else: # assume the default is infisicial client
secret = client.get_secret(secret_name).secret_value
return secret