Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions infisical_sdk/api_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,55 @@ class MachineIdentityLoginResponse(BaseModel):
expiresIn: int
accessTokenMaxTTL: int
tokenType: str


@dataclass
class BaseKey(BaseModel):
createdAt: str
id: str
name: str
orgId: str
updatedAt: str
description: Optional[str] = None
isDisabled: Optional[bool] = field(default=False)
isReserved: Optional[bool] = field(default=True)
projectId: Optional[str] = None
slug: Optional[str] = None


@dataclass
class ListKey(BaseKey):
encryptionAlgorithm: str = "aes-256-gcm"
version: int = 1

@dataclass
class ListKeysResponse(BaseModel):
keys: List[ListKey]
totalCount: int

@classmethod
def from_dict(cls, data: Dict) -> 'ListKeysResponse':
return cls(
keys=[ListKey.from_dict(key) for key in data['keys']],
totalCount=data['totalCount']
)


@dataclass
class SingleKeyResponse(BaseModel):
key: BaseKey

@classmethod
def from_dict(cls, data: Dict) -> 'SingleKeyResponse':
return cls(
key=BaseKey.from_dict(data['key'])
)

@dataclass
class EncryptDataResponse(BaseModel):
ciphertext: str


@dataclass
class DecryptDataResponse(BaseModel):
plaintext: str
171 changes: 171 additions & 0 deletions infisical_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@
from .infisical_requests import InfisicalRequests
from .api_types import ListSecretsResponse, MachineIdentityLoginResponse
from .api_types import SingleSecretResponse, BaseSecret
from .api_types import (
BaseKey,
ListKeysResponse,
SingleKeyResponse,
EncryptDataResponse,
DecryptDataResponse,
)


class InfisicalSDKClient:
Expand All @@ -25,6 +32,7 @@ def __init__(self, host: str, token: str = None):

self.auth = Auth(self)
self.secrets = V3RawSecrets(self)
self.keys = V1Keys(self)

def set_token(self, token: str):
"""
Expand Down Expand Up @@ -343,3 +351,166 @@ def delete_secret_by_name(
)

return result.data.secret

class V1Keys:
def __init__(self, client: InfisicalSDKClient):
"""
Initializes the KeysAPI class.

Args:
client: An instance of the API client.
"""
self.client = client

def list_keys(
self,
project_id: str,
offset: int = 0,
limit: int = 100,
order_by: str = "name",
order_direction: str = "asc",
search: str = None,
) -> ListKeysResponse:
"""
List KMS keys for a given project.

Args:
project_id: The project ID to list keys from.
offset: The offset to start from.
limit: The number of keys to return.
order_by: The column to order keys by.
order_direction: The direction to order keys (asc/desc).
search: A text string to filter key names.

Returns:
A dictionary containing the list of keys and the total count.
"""
params = {
"projectId": project_id,
"offset": offset,
"limit": limit,
"orderBy": order_by,
"orderDirection": order_direction,
}
if search:
params["search"] = search

response = self.client.api.get(
path="/api/v1/kms/keys", model=ListKeysResponse, params=params
)
return response.data

def create_key(
self,
name: str,
project_id: str,
description: str = None,
encryption_algorithm: str = "aes-256-gcm",
) -> BaseKey:
"""
Create a new KMS key.

Args:
name: The name of the key.
project_id: The project ID to create the key in.
description: An optional description of the key.
encryption_algorithm: The encryption algorithm to use.

Returns:
A dictionary containing the created key details.
"""
body = {
"name": name,
"projectId": project_id,
"description": description,
"encryptionAlgorithm": encryption_algorithm,
}
response = self.client.api.post(
path="/api/v1/kms/keys", model=SingleKeyResponse, json=body
)
return response.data.key

def update_key(
self,
key_id: str,
name: str = None,
description: str = None,
is_disabled: bool = None,
) -> BaseKey:
"""
Update a KMS key.

Args:
key_id: The ID of the key to update.
name: The updated name of the key.
description: The updated description of the key.
is_disabled: Flag to enable or disable the key.

Returns:
A dictionary containing the updated key details.
"""
body = {}
if name:
body["name"] = name
if description:
body["description"] = description
if is_disabled is not None:
body["isDisabled"] = is_disabled

response = self.client.api.patch(
path=f"/api/v1/kms/keys/{key_id}", model=SingleKeyResponse, json=body
)
return response.data.key

def delete_key(self, key_id: str) -> BaseKey:
"""
Delete a KMS key.

Args:
key_id: The ID of the key to delete.

Returns:
A dictionary containing the details of the deleted key.
"""
response = self.client.api.delete(
path=f"/api/v1/kms/keys/{key_id}", model=SingleKeyResponse
)
return response.data.key

def encrypt_data(self, key_id: str, plaintext: str) -> EncryptDataResponse:
"""
Encrypt data using a KMS key.

Args:
key_id: The ID of the key to encrypt the data with.
plaintext: The plaintext to be encrypted (base64 encoded).

Returns:
A dictionary containing the ciphertext.
"""
body = {"plaintext": plaintext}
response = self.client.api.post(
path=f"/api/v1/kms/keys/{key_id}/encrypt",
model=EncryptDataResponse,
json=body,
)
return response.data

def decrypt_data(self, key_id: str, ciphertext: str) -> DecryptDataResponse:
"""
Decrypt data using a KMS key.

Args:
key_id: The ID of the key to decrypt the data with.
ciphertext: The ciphertext to be decrypted (base64 encoded).

Returns:
A dictionary containing the plaintext.
"""
body = {"ciphertext": ciphertext}
response = self.client.api.post(
path=f"/api/v1/kms/keys/{key_id}/decrypt",
model=DecryptDataResponse,
json=body,
)
return response.data