Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,11 @@ jobs:
--junitxml=artifacts/tests/results.xml \
--cov=./backends \
--cov=./cli \
--cov=./objects \
--cov=./sdk \
--cov=./clients \
--cov=./jupyter \
--cov=./objects \
--cov=./sdk \
--cov=./transformers \
--cov-report=term \
--cov-report=xml:artifacts/tests/coverage.xml \
--cov-report=html:artifacts/tests/coverage.html \
Expand Down
6 changes: 6 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python: Attach using Process Id",
"type": "python",
"request": "attach",
"processId": "${command:pickProcess}"
},
{
"name": "Python: Current File",
"type": "python",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,6 @@ def _execute_steps_workflow(context: df.DurableOrchestrationContext):
# TODO: Add unit test to resume from non-zero ID
id = input.get("idin", 0)

# TODO Determine which ones can be executed in parallel and construct the appropriate DAG
# executions = []
# for step in steps:
# execution = context.call_activity(EXECUTE_STEP_ACTIVITY_NAME, step)
# executions.append(execution)
# results = yield context.task_all(executions)

# Execute all steps one after the other
# Note: Assuming that the list of steps is in order of required (sequential) execution
results = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ async def start_steps_workflow(req: func.HttpRequest, starter: str) -> func.Http
stats = {}
response_payload = {
"result": result,
"stats": stats
"stats": stats,
"HTTP_params": req.params,
}
return http_utils.generate_response(response_payload, status_code)

Expand Down
10 changes: 6 additions & 4 deletions clients/durable_functions_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,23 @@ def __init__(
user: str,
start_state_id: int = 0
):
if backend_host.endswith('/'):
backend_host = backend_host[-1]
self.backend_host = backend_host
self.user = user
self.next_state_id = start_state_id
self.connect_timeout_sec = 10
self.read_timeout_sec = 60
self.status_query_timeout_sec = 10
self.retry_interval_sec = 1
self.session = requests.Session()
self.workflow_url = f"{self.backend_host}/api/orchestrators/{EXECUTE_WORKFLOW_ACTIVITY_NAME}"
self.set_workflow_url(backend_host)

def __del__(self):
self.session.close()

def set_workflow_url(self, backend_host: str):
if backend_host.endswith('/'):
backend_host = backend_host[-1]
self.workflow_url = f"{backend_host}/api/orchestrators/{EXECUTE_WORKFLOW_ACTIVITY_NAME}"

def execute_notebook(self, notebook_path: str) -> List[dict]:
"""
Execute a given notebook and return the output of each Step.
Expand Down
1 change: 1 addition & 0 deletions jupyter/kernel/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@

from clients.durable_functions_client import DurableFunctionsClient
from objects.step import Step
from backends.common.serialization_utils import deserialize_obj
42 changes: 42 additions & 0 deletions jupyter/kernel/kernel_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import logging


class Colors:
"""Class with a few pre-defined ANSI colors for cleaner output.
The list was extracted from:
https://gist.github.com/rene-d/9e584a7dd2935d0f461904b9f2950007
"""
HEADER = '\033[95m'
BLUE = '\033[94m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
ENDC = '\033[0m'
LIGHT_GRAY = "\033[0;37m"
DARK_GRAY = "\033[1;30m"
BOLD = '\033[1m'
UNDERLINE = '\033[4m'


class KernelLogger:
def __init__(self, prefix=''):
self.prefix = prefix
self.log_level = logging.DEBUG

def log(self, msg, msg_log_level=logging.INFO):
"""Log to the output which is visible to the user.
"""
if msg_log_level >= self.log_level:
if msg_log_level == logging.DEBUG:
print(self._get_log_start(), f"{Colors.LIGHT_GRAY} {msg} {Colors.ENDC}")
elif msg_log_level == logging.WARNING:
print(self._get_log_start(), f"{Colors.YELLOW} {msg} {Colors.ENDC}")
elif msg_log_level == logging.ERROR:
print(self._get_log_start(), f"{Colors.RED} {msg} {Colors.ENDC}")
else:
print(self._get_log_start(), msg)

def _get_log_start(self):
"""Get the initial part of the log.
"""
return f"{Colors.BLUE} {Colors.BOLD} {self.prefix} {Colors.ENDC} {Colors.ENDC}"
86 changes: 86 additions & 0 deletions jupyter/kernel/magics/config_magic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from metakernel import Magic


class ConfigMagic(Magic):
def line_config(self, line):
self._handle_magic(line)

def cell_config(self, line):
self._handle_magic(line)

def _print_error(self, line, exception):
self.kernel.Error(f"Config failed: {line} - {exception}")

def _handle_magic(self, line):
try:
self._handle_magic_core(line)
except Exception as exception:
self._print_error(line, exception)

def _handle_magic_core(self, line):
tokens = line.split(' ')
command = tokens[0]
if command == "debug":
arg = tokens[1]
if arg == "enable":
self.kernel.debug_mode = True
elif arg == "disable":
self.kernel.debug_mode = False
else:
raise Exception(f"Unknown arg: {arg}")
elif command == "user":
arg = tokens[1]
if arg == "set":
user = tokens[2]
self.kernel.set_user(user)
elif arg == "get":
self.kernel.logger.log(f"Current user is: {self.kernel.get_user()}")
else:
raise Exception(f"Unknown arg: {arg}")
elif command == "state":
arg = tokens[1]
if arg == "set":
state_str = tokens[2]
state = int(state_str)
self.kernel.set_next_state_id(state)
elif arg == "get":
self.kernel.logger.log(f"Next state ID is: {self.kernel.get_next_state_id()}")
else:
raise Exception(f"Unknown arg: {arg}")
elif command == "host":
arg = tokens[1]
if arg == "set":
host = tokens[2]
self.kernel.set_backend_host(host)
elif arg == "get":
self.kernel.logger.log(f"Backend URL: {self.kernel.get_backend_host()}")
else:
raise Exception(f"Unknown arg: {arg}")
else:
raise Exception(f"Invalid command: {command}")


def register_magics(kernel):
kernel.register_magics(ConfigMagic)


def register_ipython_magics():
from metakernel import IPythonKernel
from metakernel.utils import add_docs
from IPython.core.magic import register_line_magic, register_cell_magic
kernel = IPythonKernel()
magic = ConfigMagic(kernel)
# Make magics callable:
kernel.line_magics["config"] = magic
kernel.cell_magics["config"] = magic

@register_line_magic
@add_docs(magic.line_config.__doc__)
def config(line):
kernel.call_magic("%config " + line)

@register_cell_magic
@add_docs(magic.cell_config.__doc__)
def config(line, cell):
magic.code = cell
magic.cell_config(line)
103 changes: 90 additions & 13 deletions jupyter/kernel/same_kernel.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
from __future__ import print_function
from context import Step
from clients.durable_functions_client import DurableFunctionsClient
from context import deserialize_obj
from context import DurableFunctionsClient
from kernel_logger import KernelLogger
from metakernel import MetaKernel
from IPython.core.interactiveshell import InteractiveShell
from six import reraise
import logging
import sys


Expand All @@ -27,18 +32,43 @@ class SAMEKernel(MetaKernel):
"language": "python",
"name": "same_kernel"
}
_interactive_shell = None
_same_client = None
_logger = None
_debug_mode = True

@property
def debug_mode(self):
return self._debug_mode

@debug_mode.setter
def debug_mode(self, value):
self._debug_mode = value

@property
def interactive_shell(self):
if self._interactive_shell:
return self._interactive_shell
self._interactive_shell = InteractiveShell()
return self._interactive_shell

@property
def same_client(self):
if self._same_client:
return self._same_client
backend_host = "http://localhost:7071"
user = "gochaudh"
user = "default"
start_state_id = 0
self._same_client = DurableFunctionsClient(backend_host, user, start_state_id)
return self._same_client

@property
def logger(self):
if self._logger:
return self._logger
self._logger = KernelLogger(prefix='SAME')
return self._logger

def get_usage(self):
return ("This is the SAME Python Kernel")

Expand All @@ -57,24 +87,50 @@ def get_variable(self, name):
return python_magic.env.get(name, None)

def do_execute_direct(self, code):
try:
self._do_execute_direct_core(code)
except:
self.interactive_shell.showtraceback()

def _do_execute_direct_core(self, code):
code_stripped = code.strip()
step : Step = Step(code=code_stripped)
steps = [step]

outputs = self.same_client.execute_steps(steps)
self.logger.log(outputs, logging.DEBUG)

assert len(outputs) == 1
output = outputs[0]
result = output["result"]
stdout = result["stdout"]
stderr = result["stderr"]
exec_result = result["exec_result"]
if stdout and stdout != "":
print(stdout)
if exec_result and exec_result != "":
print(exec_result)
if stderr and stderr != "":
print(stderr, file=sys.stderr)
# TODO: Do this in DEBUG mode only.
print(result)

status = result["status"]
if status == "success":
stdout = result["stdout"]
stderr = result["stderr"]
exec_result = result["exec_result"]
if stdout and stdout != "":
self.Print(stdout)
if exec_result and exec_result != "":
self.Print(exec_result)
if stderr and stderr != "":
self.Error(stderr, file=sys.stderr)
elif status == "fail":
reason = result["reason"]
info = result["info"]
if reason == "exception":
exception_base64 = info["exception"]
exception = deserialize_obj(exception_base64)
if type(exception) is tuple:
# This comes from sys.exc_info()
exception_tuple = exception
exception_class = exception_tuple[0]
exception_value = exception_tuple[1]
exception_traceback = exception_tuple[2]
# TODO clean the stack trace
reraise(exception_class, exception_value, exception_traceback)
else:
raise exception
return exec_result

def get_completions(self, info):
Expand All @@ -85,6 +141,27 @@ def get_kernel_help_on(self, info, level=0, none_on_fail=False):
python_magic = self.line_magics['python']
return python_magic.get_help_on(info, level, none_on_fail)

def set_user(self, user: str):
self.same_client.user = user
self.logger.log(f"Set user to: {user}")

def set_next_state_id(self, id: int):
self.same_client.next_state_id = id
self.logger.log(f"Set next state ID to: {id}")

def set_backend_host(self, backend_host: str):
self.same_client.set_workflow_url(backend_host)
self.logger.log(f"Set backend host to: {backend_host}")

def get_user(self):
return self.same_client.user

def get_next_state_id(self):
return self.same_client.next_state_id

def get_backend_host(self):
return self.same_client.workflow_url


if __name__ == '__main__':
SAMEKernel.run_as_main()
13 changes: 7 additions & 6 deletions objects/step.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations
from .json_serializable_object import JSONSerializableObject
from typing import Optional
from uuid import uuid4


Expand All @@ -13,20 +14,20 @@ def __init__(
name: str = "same_step_unset",
cache_value: str = "P0D",
environment_name: str = "default",
tags: list = [],
tags: Optional[list] = None,
index: int = -1,
code: str = "",
parameters: list = [],
packages_to_install: list = []
parameters: Optional[list] = None,
packages_to_install: Optional[list] = None
):
self.name = name
self.cache_value = cache_value
self.environment_name = environment_name
self.tags = tags
self.tags = tags if tags is not None else []
self.index = index
self.code = code
self.parameters = parameters
self.packages_to_install = packages_to_install
self.parameters = parameters if parameters is not None else []
self.packages_to_install = packages_to_install if packages_to_install is not None else []

@property
def name(self):
Expand Down
Loading