Skip to content

Commit dfad7dd

Browse files
authored
Merge pull request #11 from kernelci/fix-api-lifecycle
api: full claim + finish lifecycle implemented
2 parents e91cd7e + 23d22c5 commit dfad7dd

2 files changed

Lines changed: 399 additions & 10 deletions

File tree

src/kernel_ci_cloud_labs/pull_labs_poller.py

Lines changed: 201 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,12 @@
1313
1414
Long-lived service (or one-shot job) that:
1515
1. Polls kernelci-api /events for new pull-lab jobs.
16-
2. Fetches each job's PULL_LABS job_definition JSON.
17-
3. Translates it into a pullab_cloud run config and runs the pipeline.
18-
4. Submits per-test results directly to KCIDB.
16+
2. Claims each job node (state=running) so other pollers skip it.
17+
3. Fetches each job's PULL_LABS job_definition JSON.
18+
4. Translates it into a pullab_cloud run config and runs the pipeline.
19+
5. Submits per-test results directly to KCIDB.
20+
6. Marks the job node done in kernelci-api (state=done + result, plus
21+
data.error_code / data.error_msg on an infrastructure failure).
1922
2023
Generic Python only — uses stdlib urllib for HTTP, supports env-var and
2124
config-file configuration, can be invoked as a CLI, a long-running
@@ -35,6 +38,7 @@
3538
import urllib.error
3639
import urllib.parse
3740
import urllib.request
41+
from dataclasses import dataclass
3842
from typing import Any, Callable, Dict, List, Optional, Tuple
3943

4044
from kernel_ci_cloud_labs.kcidb_submit import (
@@ -116,6 +120,27 @@ def _http_get_json(url: str, token: Optional[str] = None, timeout: float = 30.0)
116120
return json.loads(body) if body else None
117121

118122

123+
def _http_put_json(
124+
url: str,
125+
payload: Any,
126+
token: Optional[str] = None,
127+
timeout: float = 30.0,
128+
) -> Any:
129+
"""PUT a JSON body; return the parsed JSON response (or None).
130+
131+
Raises urllib.error.URLError / HTTPError on transport or HTTP errors,
132+
mirroring _http_get_json so callers can handle both with one except.
133+
"""
134+
headers = {"Content-Type": "application/json", "Accept": "application/json"}
135+
if token:
136+
headers["Authorization"] = f"Bearer {token}"
137+
body = json.dumps(payload).encode("utf-8")
138+
req = urllib.request.Request(url, data=body, method="PUT", headers=headers)
139+
with urllib.request.urlopen(req, timeout=timeout) as resp:
140+
resp_body = resp.read().decode("utf-8", errors="replace")
141+
return json.loads(resp_body) if resp_body else None
142+
143+
119144
# ---------------------------------------------------------------------------
120145
# Cursor persistence — generic filesystem backend by default.
121146
# A deployment can swap in a custom CursorStore (e.g. backed by S3) by
@@ -270,6 +295,45 @@ def _extract_test_results(summary: Dict[str, Any]) -> Tuple[List[Dict[str, Any]]
270295
return rows, None
271296

272297

298+
def _node_result_from_rows(test_rows: List[Dict[str, Any]]) -> str:
299+
"""Derive a kernelci-api node result for a job that actually ran.
300+
301+
"incomplete" is reserved for infrastructure failures and is decided by
302+
the caller -- this never returns it. Any non-passing test status
303+
(FAIL/ERROR/MISS) fails the node.
304+
"""
305+
statuses = {row.get("status") for row in test_rows}
306+
if statuses & {"FAIL", "ERROR", "MISS"}:
307+
return "fail"
308+
if statuses & {"PASS", "DONE"}:
309+
return "pass"
310+
if "SKIP" in statuses:
311+
return "skip"
312+
return "fail"
313+
314+
315+
# kernelci-api Node.data.error_code values (a subset of the kernelci
316+
# ErrorCodes enum in kernelci/api/models.py). Per that enum's docstring,
317+
# error_code is set when an infrastructure error occurs; "Infrastructure"
318+
# is the generic catch-all and "invalid_job_params" flags a bad job.
319+
_ERR_INFRASTRUCTURE = "Infrastructure"
320+
_ERR_INVALID_JOB_PARAMS = "invalid_job_params"
321+
322+
323+
@dataclass
324+
class NodeOutcome:
325+
"""How a job node should be finished in kernelci-api.
326+
327+
*error_code* / *error_msg* go into the node's ``data`` and are set only
328+
on an infrastructure failure (result == "incomplete"), matching the
329+
kernelci-pipeline scheduler convention.
330+
"""
331+
332+
result: str
333+
error_code: Optional[str] = None
334+
error_msg: Optional[str] = None
335+
336+
273337
# ---------------------------------------------------------------------------
274338
# Main poller class.
275339
# ---------------------------------------------------------------------------
@@ -443,10 +507,94 @@ def resolve_build_id(self, node: Dict[str, Any]) -> Optional[str]:
443507
return None
444508
return None
445509

510+
# -- Node state updates ---------------------------------------------
511+
512+
def _node_url(self, node_id: str) -> str:
513+
return f"{self.api_base_uri.rstrip('/')}/node/{node_id}"
514+
515+
def _claim_node(self, node: Dict[str, Any]) -> bool:
516+
"""Claim a job node by transitioning it to state=running.
517+
518+
Re-reads the node first: if it is no longer "available", another
519+
poller has already taken it, so we skip it. This narrows -- but,
520+
without an atomic compare-and-set in kernelci-api, cannot fully
521+
close -- the window for two pollers claiming the same job.
522+
523+
Returns True only if this poller now owns the node.
524+
"""
525+
node_id = node.get("id")
526+
if not node_id:
527+
logger.warning("Cannot claim node without an id")
528+
return False
529+
url = self._node_url(node_id)
530+
try:
531+
current = _http_get_json(url, token=self.api_token) or {}
532+
except (urllib.error.URLError, json.JSONDecodeError) as e:
533+
logger.error("Could not re-read node %s before claim: %s", node_id, e)
534+
return False
535+
state = current.get("state")
536+
if state != "available":
537+
logger.info("Skipping node %s: already claimed (state=%s)", node_id, state)
538+
return False
539+
current["state"] = "running"
540+
try:
541+
_http_put_json(url, current, token=self.api_token)
542+
except (urllib.error.URLError, json.JSONDecodeError) as e:
543+
logger.error("Failed to claim node %s (PUT state=running): %s", node_id, e)
544+
return False
545+
logger.info("Claimed node %s (state=running)", node_id)
546+
return True
547+
548+
def _finish_node(self, node_id: str, outcome: NodeOutcome) -> bool:
549+
"""Mark a claimed job node done with the given outcome.
550+
551+
Sets state=done and result; on an infrastructure failure also sets
552+
data.error_code / data.error_msg (kernelci ErrorCodes values),
553+
matching the kernelci-pipeline scheduler. A failure here is logged
554+
but not fatal -- the job already ran and its results were submitted
555+
to KCIDB.
556+
"""
557+
url = self._node_url(node_id)
558+
try:
559+
current = _http_get_json(url, token=self.api_token) or {}
560+
except (urllib.error.URLError, json.JSONDecodeError) as e:
561+
logger.error("Could not re-read node %s before finish: %s", node_id, e)
562+
return False
563+
current["state"] = "done"
564+
current["result"] = outcome.result
565+
if outcome.error_code:
566+
# error_code/error_msg live in node.data (kernelci JobData), not
567+
# at the top level.
568+
data = current.get("data") or {}
569+
data["error_code"] = outcome.error_code
570+
data["error_msg"] = outcome.error_msg
571+
current["data"] = data
572+
try:
573+
_http_put_json(url, current, token=self.api_token)
574+
except (urllib.error.URLError, json.JSONDecodeError) as e:
575+
logger.error(
576+
"Failed to finish node %s (PUT state=done result=%s): %s",
577+
node_id, outcome.result, e,
578+
)
579+
return False
580+
logger.info(
581+
"Finished node %s (state=done, result=%s%s)",
582+
node_id, outcome.result,
583+
f", error_code={outcome.error_code}" if outcome.error_code else "",
584+
)
585+
return True
586+
446587
# -- Per-event processing -------------------------------------------
447588

448589
def process_event(self, event: Dict[str, Any]) -> bool:
449-
"""Process one event end to end. Returns True on success."""
590+
"""Process one event end to end. Returns True on success.
591+
592+
The job node is claimed (state=running) before any work starts and
593+
finished (state=done + result, plus error_code/error_msg on an
594+
infrastructure failure) afterwards, whatever the outcome. A node we
595+
fail to claim -- already taken, or an API error -- is skipped
596+
without being run or submitted.
597+
"""
450598
node = event.get("node") or {}
451599
node_id = node.get("id")
452600

@@ -463,13 +611,38 @@ def process_event(self, event: Dict[str, Any]) -> bool:
463611
logger.debug("Skipping event %s: no job_definition artifact", node_id)
464612
return True
465613

614+
if not self._claim_node(node):
615+
return True
616+
466617
logger.info("Processing pull-lab job node=%s definition=%s", node_id, jobdef_url)
467618

619+
# We own the node now: it must be finished whatever happens below.
620+
# This default covers an unexpected crash inside _execute_job.
621+
node_outcome = NodeOutcome(
622+
"incomplete", _ERR_INFRASTRUCTURE, "unexpected internal error"
623+
)
624+
try:
625+
ok, node_outcome = self._execute_job(node, node_id, jobdef_url)
626+
return ok
627+
finally:
628+
self._finish_node(node_id, node_outcome)
629+
630+
def _execute_job(
631+
self, node: Dict[str, Any], node_id: str, jobdef_url: str
632+
) -> Tuple[bool, NodeOutcome]:
633+
"""Fetch, translate, run and submit one already-claimed job.
634+
635+
Returns (ok, outcome): *ok* is False on a recoverable failure;
636+
*outcome* is the NodeOutcome passed to _finish_node().
637+
"""
468638
try:
469639
jobdef = _http_get_json(jobdef_url, token=self.api_token)
470640
except (urllib.error.URLError, json.JSONDecodeError) as e:
471641
logger.error("Failed to fetch job_definition for %s: %s", node_id, e)
472-
return False
642+
return False, NodeOutcome(
643+
"incomplete", _ERR_INFRASTRUCTURE,
644+
f"failed to fetch job_definition: {e}",
645+
)
473646

474647
build_id = self.resolve_build_id(node)
475648
if not build_id:
@@ -484,14 +657,22 @@ def process_event(self, event: Dict[str, Any]) -> bool:
484657
run_config = translate_job(jobdef, self.base_config, node_id=node_id)
485658
except ValueError as e:
486659
logger.error("Translation failed for node %s: %s", node_id, e)
487-
return False
660+
return False, NodeOutcome(
661+
"incomplete", _ERR_INVALID_JOB_PARAMS,
662+
f"job translation failed: {e}",
663+
)
488664

665+
infra_error: Optional[NodeOutcome] = None
489666
try:
490667
per_test, log_url = self.job_executor(run_config)
491668
except Exception as e: # pylint: disable=broad-exception-caught
492669
logger.error("Job execution failed for node %s: %s", node_id, e, exc_info=True)
493-
# Submit an ERROR row so KCIDB sees we picked it up. The boot.
494-
# prefix makes the dashboard classify it as a (failed) boot test.
670+
# An executor crash is an infrastructure failure: emit an ERROR
671+
# row so KCIDB sees we picked it up, and mark the node incomplete.
672+
# The boot. prefix has the dashboard classify it as a failed boot.
673+
infra_error = NodeOutcome(
674+
"incomplete", _ERR_INFRASTRUCTURE, f"job execution failed: {e}"
675+
)
495676
per_test = [{"name": "boot.infrastructure", "status": "ERROR"}]
496677
log_url = None
497678

@@ -509,6 +690,12 @@ def process_event(self, event: Dict[str, Any]) -> bool:
509690
for idx, t in enumerate(per_test or [])
510691
]
511692
if not test_rows:
693+
# No per-test results came back -> the outcome is unknown, itself
694+
# an infrastructure failure (-> node result incomplete).
695+
infra_error = NodeOutcome(
696+
"incomplete", _ERR_INFRASTRUCTURE,
697+
"executor returned no per-test results",
698+
)
512699
test_rows = [
513700
build_test_row(
514701
origin=self.kcidb_origin,
@@ -526,6 +713,10 @@ def process_event(self, event: Dict[str, Any]) -> bool:
526713
)
527714
]
528715

716+
# error_code + "incomplete" only on an infrastructure failure; a job
717+
# that actually ran is pass/fail/skip from its tests. Independent of
718+
# whether the KCIDB submission below succeeds.
719+
outcome = infra_error or NodeOutcome(_node_result_from_rows(test_rows))
529720
try:
530721
submit_tests(
531722
self.kcidb_submit_url,
@@ -536,9 +727,9 @@ def process_event(self, event: Dict[str, Any]) -> bool:
536727
)
537728
except urllib.error.URLError as e:
538729
logger.error("KCIDB submit failed for node %s: %s", node_id, e)
539-
return False
730+
return False, outcome
540731
logger.info("Submitted %d test row(s) for node %s", len(test_rows), node_id)
541-
return True
732+
return True, outcome
542733

543734
# -- Loop -----------------------------------------------------------
544735

0 commit comments

Comments
 (0)