Skip to content
8 changes: 8 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ jobs:

- name: check out repository
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: 3.11

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
Expand All @@ -31,6 +36,9 @@ jobs:
run: |
python tests/upload_server.py &

- name: Install dependencies
run: pip install .

- name: instal pytest
run: pip install pytest

Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ services:
target: development
image: tesp-api
environment:
- CONTAINER_TYPE=docker # Set to "docker", "singularity", or "both"
- CONTAINER_TYPE=singularity # Set to "docker", "singularity", or "both"
container_name: tesp-api
privileged: true
ports:
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ repository = "https://github.com/ndopj/tesp-api"

[tool.poetry.dependencies]
python = "^3.10.0"
aio_pika = "^9.5.7"
fastapi = "^0.75.1"
orjson = "^3.6.8"
gunicorn = "^20.1.0"
Expand Down
5 changes: 3 additions & 2 deletions settings.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
db.mongodb_uri = "mongodb://localhost:27017"
pulsar.url = "http://localhost:8913"
pulsar.status.poll_interval = 4
pulsar.status.max_polls = 100
pulsar.status.max_polls = 400
pulsar.client_timeout = 30

logging.level = "DEBUG"
logging.output_json = false
Expand All @@ -14,5 +15,5 @@ basic_auth.password = "password"

[dev-docker]
db.mongodb_uri = "mongodb://tesp-db:27017"
pulsar.url = "http://pulsar_rest:8913"
pulsar.url = "http://172.17.0.1:8913"
logging.output_json = false
25 changes: 14 additions & 11 deletions tesp_api/repository/task_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,20 @@ def cancel_task(
p_author: Maybe[str],
task_id: ObjectId
) -> Promise:
full_search_query = dict()
full_search_query.update({'_id': task_id})
full_search_query.update(p_author.maybe({}, lambda a: {'author': a}))

return Promise(lambda resolve, reject: resolve(full_search_query)) \
.then(self._tasks.find_one) \
.then(lambda _task: self.update_task(
{'_id': task_id},
{'$set': {'state': TesTaskState.CANCELED}}
)).map(lambda updated_task: updated_task
.map(lambda _updated_task: _updated_task.id))\
search_query = {
'_id': task_id,
'state': {'$in': [
TesTaskState.QUEUED,
TesTaskState.INITIALIZING,
TesTaskState.RUNNING
]}
}
search_query.update(p_author.maybe({}, lambda a: {'author': a}))
update_query = {'$set': {'state': TesTaskState.CANCELED}}

return self.update_task(search_query, update_query)\
.map(lambda updated_task: updated_task
.map(lambda _updated_task: _updated_task.id))\
.catch(handle_data_layer_error)


Expand Down
133 changes: 82 additions & 51 deletions tesp_api/service/event_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from tesp_api.repository.task_repository import task_repository
from tesp_api.service.file_transfer_service import file_transfer_service
from tesp_api.service.error import pulsar_event_handle_error, TaskNotFoundError, TaskExecutorError
from tesp_api.service.pulsar_operations import PulsarRestOperations, PulsarAmpqOperations, DataType
from tesp_api.service.pulsar_operations import PulsarRestOperations, PulsarAmqpOperations, DataType
from tesp_api.repository.model.task import (
TesTaskState,
TesTaskExecutor,
Expand All @@ -29,6 +29,7 @@

CONTAINER_TYPE = os.getenv("CONTAINER_TYPE", "docker")


@local_handler.register(event_name="queued_task")
def handle_queued_task(event: Event) -> None:
"""
Expand All @@ -39,8 +40,9 @@ def handle_queued_task(event: Event) -> None:
match pulsar_service.get_operations():
case PulsarRestOperations() as pulsar_rest_operations:
dispatch_event('queued_task_rest', {**payload, 'pulsar_operations': pulsar_rest_operations})
case PulsarAmpqOperations() as pulsar_ampq_operations:
dispatch_event('queued_task_ampq', {**payload, 'pulsar_operations': pulsar_ampq_operations})
case PulsarAmqpOperations() as pulsar_amqp_operations:
dispatch_event('queued_task_amqp', {**payload, 'pulsar_operations': pulsar_amqp_operations})


@local_handler.register(event_name="queued_task_rest")
async def handle_queued_task_rest(event: Event):
Expand All @@ -53,12 +55,37 @@ async def handle_queued_task_rest(event: Event):

print(f"Queued task rest: {task_id}")

await Promise(lambda resolve, reject: resolve(None))\
.then(lambda nothing: pulsar_operations.setup_job(task_id))\
.map(lambda setup_job_result: dispatch_event('initialize_task', {**payload, 'task_config': setup_job_result}))\
.catch(lambda error: pulsar_event_handle_error(error, task_id, event_name, pulsar_operations))\
await Promise(lambda resolve, reject: resolve(None)) \
.then(lambda nothing: pulsar_operations.setup_job(task_id)) \
.map(lambda setup_job_result: dispatch_event('initialize_task', {**payload, 'task_config': setup_job_result})) \
.catch(lambda error: pulsar_event_handle_error(error, task_id, event_name, pulsar_operations)) \
.then(lambda x: x) # Invokes promise, potentially from error handler


@local_handler.register(event_name="queued_task_amqp")
async def handle_queued_task_amqp(event: Event):
"""
Sets up the job in Pulsar via AMQP operations and dispatches an 'initialize_task' event.
"""
event_name, payload = event
task_id: ObjectId = payload['task_id']
pulsar_operations: PulsarAmqpOperations = payload['pulsar_operations']

print(f"Queued task AMQP: {task_id}")

try:
# Setup job via AMQP
setup_job_result = await pulsar_operations.setup_job(task_id)

# Dispatch initialize event
await dispatch_event('initialize_task', {
**payload,
'task_config': setup_job_result
})
except Exception as error:
await pulsar_event_handle_error(error, task_id, event_name, pulsar_operations)


@local_handler.register(event_name="initialize_task")
async def handle_initializing_task(event: Event) -> None:
"""
Expand All @@ -71,10 +98,10 @@ async def handle_initializing_task(event: Event) -> None:

# Merged Logic: Using the feature-complete setup_data from the new version
async def setup_data(job_id: ObjectId,
resources: TesTaskResources,
volumes: List[str],
inputs: List[TesTaskInput],
outputs: List[TesTaskOutput]):
resources: TesTaskResources,
volumes: List[str],
inputs: List[TesTaskInput],
outputs: List[TesTaskOutput]):
resource_conf: dict
volume_confs: List[dict] = []
input_confs: List[dict] = []
Expand Down Expand Up @@ -109,28 +136,29 @@ async def setup_data(job_id: ObjectId,
return resource_conf, volume_confs, input_confs, output_confs

print(f"Initializing task: {task_id}")
await Promise(lambda resolve, reject: resolve(None))\
await Promise(lambda resolve, reject: resolve(None)) \
.then(lambda nothing: task_repository.update_task_state(
task_id,
TesTaskState.QUEUED,
TesTaskState.INITIALIZING
)).map(lambda updated_task: get_else_throw(
updated_task, TaskNotFoundError(task_id, Just(TesTaskState.QUEUED))
)).then(lambda updated_task: setup_data(
task_id,
maybe_of(updated_task.resources).maybe(None, lambda x: x),
maybe_of(updated_task.volumes).maybe([], lambda x: x),
maybe_of(updated_task.inputs).maybe([], lambda x: x),
maybe_of(updated_task.outputs).maybe([], lambda x: x)
)).map(lambda res_input_output_confs: dispatch_event('run_task', {
**payload,
'resource_conf': res_input_output_confs[0],
'volume_confs': res_input_output_confs[1],
'input_confs': res_input_output_confs[2],
'output_confs': res_input_output_confs[3]
})).catch(lambda error: pulsar_event_handle_error(error, task_id, event_name, pulsar_operations))\
task_id,
TesTaskState.QUEUED,
TesTaskState.INITIALIZING
)).map(lambda updated_task: get_else_throw(
updated_task, TaskNotFoundError(task_id, Just(TesTaskState.QUEUED))
)).then(lambda updated_task: setup_data(
task_id,
maybe_of(updated_task.resources).maybe(None, lambda x: x),
maybe_of(updated_task.volumes).maybe([], lambda x: x),
maybe_of(updated_task.inputs).maybe([], lambda x: x),
maybe_of(updated_task.outputs).maybe([], lambda x: x)
)).map(lambda res_input_output_confs: dispatch_event('run_task', {
**payload,
'resource_conf': res_input_output_confs[0],
'volume_confs': res_input_output_confs[1],
'input_confs': res_input_output_confs[2],
'output_confs': res_input_output_confs[3]
})).catch(lambda error: pulsar_event_handle_error(error, task_id, event_name, pulsar_operations)) \
.then(lambda x: x)


@local_handler.register(event_name="run_task")
async def handle_run_task(event: Event) -> None:
"""
Expand All @@ -146,8 +174,8 @@ async def handle_run_task(event: Event) -> None:
input_confs: List[dict] = payload['input_confs']
output_confs: List[dict] = payload['output_confs']
pulsar_operations: PulsarRestOperations = payload['pulsar_operations']
run_command_str = None

run_command_str = None
command_start_time = datetime.datetime.now(datetime.timezone.utc)

try:
Expand Down Expand Up @@ -175,7 +203,7 @@ async def handle_run_task(event: Event) -> None:
)

stage_exec = TesTaskExecutor(image="willdockerhub/curl-wget:latest", command=[], workdir=Path("/downloads"))

# Stage-in command
stage_in_cmd = ""
stage_in_mount = ""
Expand Down Expand Up @@ -231,27 +259,28 @@ async def handle_run_task(event: Event) -> None:
command_status.get('returncode', -1)
)

current_task_monad = await task_repository.get_task(maybe_of(author), {'_id': task_id})
current_task_obj = get_else_throw(current_task_monad, TaskNotFoundError(task_id))
current_task_monad = await task_repository.get_task(maybe_of(author), {'_id': task_id})
current_task_obj = get_else_throw(current_task_monad, TaskNotFoundError(task_id))

if current_task_obj.state == TesTaskState.CANCELED:
print(f"Task {task_id} found CANCELED after job completion polling. Aborting state changes.")
return
return

if command_status.get('returncode', -1) != 0:
print(f"Task {task_id} executor error (return code: {command_status.get('returncode', -1)}). Setting state to EXECUTOR_ERROR.")
print(
f"Task {task_id} executor error (return code: {command_status.get('returncode', -1)}). Setting state to EXECUTOR_ERROR.")
await task_repository.update_task_state(task_id, TesTaskState.RUNNING, TesTaskState.EXECUTOR_ERROR)
await pulsar_operations.erase_job(task_id)
return
return

print(f"Task {task_id} completed successfully. Setting state to COMPLETE.")
await Promise(lambda resolve, reject: resolve(None)) \
.then(lambda ignored: task_repository.update_task_state(
task_id, TesTaskState.RUNNING, TesTaskState.COMPLETE
)) \
task_id, TesTaskState.RUNNING, TesTaskState.COMPLETE
)) \
.map(lambda task_after_complete_update: get_else_throw(
task_after_complete_update, TaskNotFoundError(task_id, Just(TesTaskState.RUNNING))
)) \
task_after_complete_update, TaskNotFoundError(task_id, Just(TesTaskState.RUNNING))
)) \
.then(lambda ignored: pulsar_operations.erase_job(task_id)) \
.catch(lambda error: pulsar_event_handle_error(error, task_id, event_name, pulsar_operations)) \
.then(lambda x: x)
Expand All @@ -262,22 +291,24 @@ async def handle_run_task(event: Event) -> None:
await pulsar_operations.kill_job(task_id)
await pulsar_operations.erase_job(task_id)
print(f"Task {task_id} Pulsar job cleanup attempted after asyncio cancellation.")

except Exception as error:
print(f"Exception in handle_run_task for task {task_id}: {type(error).__name__} - {error}")

task_state_after_error_monad = await task_repository.get_task(maybe_of(author), {'_id': task_id})
if task_state_after_error_monad.is_just() and task_state_after_error_monad.value.state == TesTaskState.CANCELED:
print(f"Task {task_id} is already CANCELED. Exception '{type(error).__name__}' likely due to this. No further error processing by handler.")
return
print(
f"Task {task_id} is already CANCELED. Exception '{type(error).__name__}' likely due to this. No further error processing by handler.")
return

print(f"Task {task_id} not CANCELED; proceeding with pulsar_event_handle_error for '{type(error).__name__}'.")
error_handler_result = pulsar_event_handle_error(error, task_id, event_name, pulsar_operations)
if asyncio.iscoroutine(error_handler_result) or isinstance(error_handler_result, _Promise):
await error_handler_result

try:
print(f"Ensuring Pulsar job for task {task_id} is erased after general error handling in run_task.")
await pulsar_operations.erase_job(task_id)
except Exception as final_erase_error:
print(f"Error during final Pulsar erase attempt for task {task_id} after general error: {final_erase_error}")

# try:
# print(f"Ensuring Pulsar job for task {task_id} is erased after general error handling in run_task.")
# await pulsar_operations.erase_job(task_id)
# except Exception as final_erase_error:
# print(
# f"Error during final Pulsar erase attempt for task {task_id} after general error: {final_erase_error}")
Loading
Loading