Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@ class Hello(BaseTask):
```

## Documentation
Read more about Digital.ai Release Python SDK [here](https://digital.ai/)
Read more about Digital.ai Release Python SDK [here](https://docs.digital.ai/release/docs/category/python-sdk)
1 change: 1 addition & 0 deletions digitalai/release/integration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
from .output_context import OutputContext
from .exceptions import AbortException
from .reporting_records import BuildRecord, PlanRecord, ItsmRecord,CodeComplianceRecord, DeploymentRecord
from .logger import dai_logger
21 changes: 14 additions & 7 deletions digitalai/release/integration/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,20 @@
from .input_context import AutomatedTaskAsUserContext, ReleaseContext
from .output_context import OutputContext
from .exceptions import AbortException

logger = logging.getLogger("Digitalai")
from .logger import dai_logger


class BaseTask(ABC):
"""
An abstract base class representing a task that can be executed.
"""
def __init__(self):
self.task_id = None
self.release_context = None
self.release_server_url = None
self.input_properties = None
self.output_context = None

def execute_task(self) -> None:
"""
Executes the task by calling the execute method. If an AbortException is raised during execution,
Expand All @@ -28,12 +34,13 @@ def execute_task(self) -> None:
self.output_context = OutputContext(0, "", {}, [])
self.execute()
except AbortException:
logger.debug("Abort requested")
dai_logger.info("Abort requested")
self.set_exit_code(1)
sys.exit(1)
self.set_error_message("Abort requested")
sys.exit(1)

except Exception as e:
logger.error("Unexpected error occurred.", exc_info=True)
dai_logger.error("Unexpected error occurred.", exc_info=True)
self.set_exit_code(1)
self.set_error_message(str(e))

Expand Down Expand Up @@ -104,13 +111,13 @@ def add_comment(self, comment: str) -> None:
"""
Logs a comment of the task.
"""
logger.debug(f"##[start: comment]{comment}##[end: comment]")
dai_logger.debug(f"##[start: comment]{comment}##[end: comment]")

def set_status_line(self, status_line: str) -> None:
"""
Set the status of the task.
"""
logger.debug(f"##[start: status]{status_line}##[end: status]")
dai_logger.debug(f"##[start: status]{status_line}##[end: status]")

def add_reporting_record(self, reporting_record: Any) -> None:
"""
Expand Down
16 changes: 15 additions & 1 deletion digitalai/release/integration/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from kubernetes import client, config
from kubernetes.client import CoreV1Api
from kubernetes.config.config_exception import ConfigException
from .logger import dai_logger

kubernetes_client: CoreV1Api = None
lock = threading.Lock()
Expand All @@ -13,8 +15,20 @@ def get_client():
if not kubernetes_client:
with lock:
if not kubernetes_client:
config.load_config()
try:
#dai_logger.info("Attempting to load in-cluster config")
config.load_incluster_config()
#dai_logger.info("Successfully loaded in-cluster config")
except ConfigException:
#dai_logger.warning("In-cluster config failed, attempting default load_config")
try:
config.load_config()
#dai_logger.info("Successfully loaded config using load_config")
except Exception:
dai_logger.exception("Failed to load any Kubernetes config")
raise RuntimeError("Could not configure Kubernetes client")
kubernetes_client = client.CoreV1Api()
#dai_logger.info("Kubernetes client created successfully")

return kubernetes_client

Expand Down
20 changes: 20 additions & 0 deletions digitalai/release/integration/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import logging
import sys

# Define the log format (with milliseconds) and date format
LOG_FORMAT = "%(asctime)s.%(msecs)03d %(levelname)s [%(filename)s:%(lineno)d] - %(message)s"
DATE_FORMAT = "%Y-%m-%d %H:%M:%S"

# Create a formatter
_formatter = logging.Formatter(fmt=LOG_FORMAT, datefmt=DATE_FORMAT)

# Create a stream handler (to stdout) and attach the formatter
_handler = logging.StreamHandler(sys.stdout)
_handler.setFormatter(_formatter)

# Get your “dai” logger
dai_logger = logging.getLogger("digital_ai")
dai_logger.setLevel(logging.DEBUG)
dai_logger.propagate = False
if not dai_logger.handlers:
dai_logger.addHandler(_handler)
35 changes: 0 additions & 35 deletions digitalai/release/integration/logging_config.py

This file was deleted.

13 changes: 5 additions & 8 deletions digitalai/release/integration/watcher.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,28 @@
import logging
import os
import threading

from kubernetes import watch

from .logger import dai_logger
from digitalai.release.integration import k8s

logger = logging.getLogger("Digitalai")


def start_input_context_watcher(on_input_context_update_func):
logger.debug("Input context watcher started")
dai_logger.info("Input context watcher started")

stop = threading.Event()

try:
start_input_secret_watcher(on_input_context_update_func, stop)
except Exception:
logger.error("Unexpected error occurred.", exc_info=True)
dai_logger.error("Unexpected error occurred.", exc_info=True)
return

# Wait until the watcher is stopped
stop.wait()


def start_input_secret_watcher(on_input_context_update_func, stop):
logger.debug("Input secret watcher started")
dai_logger.info("Input secret watcher started")

kubernetes_client = k8s.get_client()
field_selector = "metadata.name=" + os.getenv("INPUT_CONTEXT_SECRET")
Expand All @@ -39,7 +36,7 @@ def start_input_secret_watcher(on_input_context_update_func, stop):

# Checking if 'session-key' field has changed
if old_session_key and old_session_key != new_session_key:
logger.info("Detected input context value change")
dai_logger.info("Detected input context value change")
on_input_context_update_func()

# Set old session-key value
Expand Down
53 changes: 24 additions & 29 deletions digitalai/release/integration/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import base64
import importlib
import json
import logging.config
import os
import signal
import sys
Expand All @@ -17,7 +16,7 @@
from .base_task import BaseTask
from .input_context import InputContext
from .job_data_encryptor import AESJobDataEncryptor, NoOpJobDataEncryptor
from .logging_config import LOGGING_CONFIG
from .logger import dai_logger
from .masked_io import MaskedIO
from .output_context import OutputContext

Expand Down Expand Up @@ -51,12 +50,6 @@ def get_encryptor():
return encryptor


# Set up logging
logging.config.dictConfig(LOGGING_CONFIG)

# Get the logger
logger = logging.getLogger("Digitalai")

# Initialize the global task object
dai_task_object: BaseTask = None

Expand All @@ -66,13 +59,13 @@ def abort_handler(signum, frame):
This function handles the abort request by calling the abort method on the global task object, if it exists.
If the task object does not exist, it logs a message and exits with a status code of 1.
"""
logger.debug("Received SIGTERM to gracefully stop the process")
dai_logger.info("Received SIGTERM to gracefully stop the process")
global dai_task_object

if dai_task_object:
dai_task_object.abort()
else:
logger.debug("Abort requested")
dai_logger.info("Abort requested")
sys.exit(1)


Expand All @@ -86,15 +79,17 @@ def get_task_details():
and parsing the JSON data into an InputContext object. Then, set the secrets for the masked standard output
and error streams, build the task properties from the InputContext object.
"""
logger.debug("Preparing for task properties.")
dai_logger.info("Preparing for task properties")
if input_context_file:
logger.debug("Reading input context from file")
dai_logger.info("Reading input context from file")
with open(input_context_file) as data_input:
input_content = data_input.read()
#dai_logger.info("Successfully loaded input context from file")
else:
logger.debug("Reading input context from secret")
secret = k8s.get_client().read_namespaced_secret(input_context_secret, runner_namespace)

k8s_client = k8s.get_client()
dai_logger.info("Reading input context from secret")
secret =k8s_client.read_namespaced_secret(input_context_secret, runner_namespace)
#dai_logger.info("Successfully loaded input context from secret")
global base64_session_key, callback_url
base64_session_key = base64.b64decode(secret.data["session-key"])
callback_url = base64.b64decode(secret.data["url"])
Expand All @@ -111,7 +106,7 @@ def get_task_details():
response = requests.get(fetch_url)
response.raise_for_status()
except requests.exceptions.RequestException as e:
logger.error("Failed to fetch data.", exc_info=True)
dai_logger.error("Failed to fetch data.", exc_info=True)
raise e

if response.status_code != 200:
Expand All @@ -122,6 +117,7 @@ def get_task_details():
input_content = base64.b64decode(input_content)

decrypted_json = get_encryptor().decrypt(input_content)
#dai_logger.info("Successfully decrypted input context")
global input_context
input_context = InputContext.from_dict(json.loads(decrypted_json))

Expand All @@ -140,39 +136,38 @@ def update_output_context(output_context: OutputContext):
dictionary to a JSON string, encrypting the string using the encryptor, and writing the encrypted string
to the output context file or secret and pushing to callback URL.
"""
logger.debug("Creating output context file")
output_content = json.dumps(output_context.to_dict())
encrypted_json = get_encryptor().encrypt(output_content)
try:
if output_context_file:
logger.debug("Writing output context to file")
dai_logger.info("Writing output context to file")
with open(output_context_file, "w") as data_output:
data_output.write(encrypted_json)
if result_secret_key:
logger.debug("Writing output context to secret")
dai_logger.info("Writing output context to secret")
if len(encrypted_json) >= size_of_1Mb:
logger.warning("Result size exceeds 1Mb and is too big to store in secret")
dai_logger.warning("Result size exceeds 1Mb and is too big to store in secret")
else:
namespace, name, key = k8s.split_secret_resource_data(result_secret_key)
secret = k8s.get_client().read_namespaced_secret(name, namespace)
secret.data[key] = encrypted_json
k8s.get_client().replace_namespaced_secret(name, namespace, secret)
if callback_url:
logger.debug("Pushing result using HTTP")
dai_logger.info("Pushing result using HTTP")
url = base64.b64decode(callback_url).decode("UTF-8")
try:
urllib3.PoolManager().request("POST", url, headers={'Content-Type': 'application/json'},
body=encrypted_json)
except Exception:
if should_retry_callback_request(encrypted_json):
logger.error("Cannot finish Callback request.", exc_info=True)
logger.info("Retry flag was set on Callback request, retrying until successful...")
dai_logger.error("Cannot finish Callback request.", exc_info=True)
dai_logger.info("Retry flag was set on Callback request, retrying until successful...")
retry_push_result_infinitely(encrypted_json)
else:
raise

except Exception:
logger.error("Unexpected error occurred.", exc_info=True)
dai_logger.error("Unexpected error occurred.", exc_info=True)


def retry_push_result_infinitely(encrypted_json):
Expand All @@ -197,7 +192,7 @@ def retry_push_result_infinitely(encrypted_json):
response = urllib3.PoolManager().request("POST", url, headers={'Content-Type': 'application/json'}, body=encrypted_json)
return response
except Exception as e:
logger.warning(f"Cannot finish retried Callback request: {e}. Retrying in {retry_delay} seconds...")
dai_logger.warning(f"Cannot finish retried Callback request: {e}. Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
retry_delay = min(retry_delay * backoff_factor, max_backoff)

Expand All @@ -216,13 +211,13 @@ def execute_task(task_object: BaseTask):
If an exception is raised during execution, log the error. Finally, update the output context file
using the output context of the task object.
"""
global dai_task_object
try:
global dai_task_object
dai_task_object = task_object
logger.debug("Starting task execution")
dai_logger.info("Starting task execution")
dai_task_object.execute_task()
except Exception:
logger.error("Unexpected error occurred.", exc_info=True)
dai_logger.error("Unexpected error occurred.", exc_info=True)
finally:
update_output_context(dai_task_object.get_output_context())

Expand Down Expand Up @@ -261,7 +256,7 @@ def run():
execute_task(task_obj)
except Exception as e:
# Log the error and update the output context file with exit code 1 if an exception is raised
logger.error("Unexpected error occurred.", exc_info=True)
dai_logger.error("Unexpected error occurred.", exc_info=True)
update_output_context(OutputContext(1, str(e), {}, []))
finally:
if execution_mode == "daemon":
Expand Down
Loading