Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion JobRunner/Callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def start_callback(self):
job_params = self.load_prov(job_params_file)

try:
self.jr = JobRunner(self.conf, port=self.port)
self.jr = JobRunner(self.conf, port=self.port, server_mode=True)
self.jr.callback(job_params=job_params, app_name=self._app_name)
except Exception as e:
print("An unhandled error was encountered")
Expand Down
43 changes: 34 additions & 9 deletions JobRunner/JobRunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import socket
import threading
from time import sleep as _sleep, time as _time
import traceback

from clients.authclient import KBaseAuth
from clients.CatalogClient import Catalog
Expand Down Expand Up @@ -36,9 +37,16 @@ class JobRunner(object):
to support subjobs and provenenace calls.
"""

def __init__(self, config: Config, port=None):
def __init__(self, config: Config, port=None, server_mode: bool = False):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to add info about what "Server Mode" means in the comment

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

"""
inputs: config dictionary, EE2 URL, Job id, Token, Admin Token
Create the job runner.

config - the job runner config.
port - the port on which the callback server should run.
server_mode - True indicates the JobRunner will run in callback server only mode,
where the callback server is expected to keep running until explicitly shut down.
In this case certain errors will be returned via the callback server API rather
than stopping the job runner.
"""

self.ee2 = None
Expand All @@ -50,6 +58,7 @@ def __init__(self, config: Config, port=None):
self.bypass_token = os.environ.get("BYPASS_TOKEN", True)
self.admin_token = config.admin_token
self.config = config
self.server_mode = server_mode
# self.config = self._init_config(config, job_id, ee2_url)

self.hostname = config.hostname
Expand Down Expand Up @@ -195,13 +204,29 @@ def _watch(self, config: dict) -> dict:
self.logger.error("Too many subtasks")
self._cancel()
return {"error": "Canceled or unexpected error"}
if req[2].get("method").startswith("special."):
self._submit_special(
config=config, job_id=req[1], job_params=req[2]
)
else:
self._submit(config=config, job_id=req[1], job_params=req[2])
ct += 1
try:
if req[2].get("method").startswith("special."):
self._submit_special(
config=config, job_id=req[1], job_params=req[2]
)
else:
self._submit(config=config, job_id=req[1], job_params=req[2])
ct += 1
except Exception as e:
if self.server_mode:
err = {"error": {
"code": -32601,
"name": "CallbackServerError",
"message": str(e),
"error": traceback.format_exc()
}}
self.callback_queue.put(["output", req[1], err])
else:
# This case doesn't seem to be tested. Looking at the jobrunner
# tests, the tests that might have been able to be modified to test
# it are all marked as "online" which are currently disabled.
# For now, since it's so simple, leaving it untested.
raise # maintain prior behavior for job runner mode and throw
elif req[0] == "set_provenance":
# Ok, we're syncing provenance in 2 different places by sending messages
# on 2 different queues. I think there may be design issues here
Expand Down
2 changes: 1 addition & 1 deletion JobRunner/callback_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ async def _process_rpc(app, data, token):
"error": exception_message,
"code": "123",
"message": exception_message,
"name": "CallbackServerError",
"name": "CallbackServerError",
}
outputs[job_id] = {
"result": exception_message,
Expand Down
31 changes: 28 additions & 3 deletions test/test_callback_server_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,9 @@ def test_submit_job_async(callback_ports):
port = callback_ports[0]

resp = _post(port, {
"method": "njs_sdk_test_1._run_submit",
"method": "njs_sdk_test_2._run_submit",
"params": [{"id": "godiloveasynchrony"}],
"service_ver": "beta",
})
j = resp.json()
job_id = j["result"][0]
Expand All @@ -285,9 +286,9 @@ def test_submit_job_async(callback_ports):
res = resp.json()
assert res == {"result": [{
"result": [{
"hash": "366eb8cead445aa3e842cbc619082a075b0da322",
"hash": "9d6b868bc0bfdb61c79cf2569ff7b9abffd4c67f",
"id": "godiloveasynchrony",
"name": "njs_sdk_test_1"
"name": "njs_sdk_test_2"
}],
"finished": 1,
"id": "callback",
Expand Down Expand Up @@ -352,6 +353,30 @@ def test_submit_fail_module_lookup_service_ver_sync(callback_ports):
}}


def test_submit_job_fail_too_old_image(callback_ports):
# This image was built such a long time ago modern versions of docker refuse to run it.
# The error message thrown by the MethodRunner doesn't actually reflect this. The image
# exists but docker refuses to pull it
port = callback_ports[0]

resp = _post(port, {
"method": "HelloService.say_hello",
"params": ["I'm not your buddy, pal"],
})
j = resp.json()
assert "JobRunner/JobRunner/DockerRunner.py" in j["error"]["error"]
del j["error"]["error"]
assert j == {
"error": {
"code": -32601,
"message": "Couldn't find image for "
+ "dockerhub-ci.kbase.us/kbase:helloservice.25528a3b917ab4f40bc7aba45b08e581e33d985a",
"name": "CallbackServerError"
},
"finished": 1,
}


def test_submit_fail_max_jobs_limit(callback_ports):
port = callback_ports[0]

Expand Down
Loading