forked from OCP-on-NERC/python-batchtools
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbr.py
More file actions
181 lines (152 loc) · 5.36 KB
/
br.py
File metadata and controls
181 lines (152 loc) · 5.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
from helpers import *
from imports import *
from build_yaml import build_job_body
# https://piazza.com/class/me4rjds6oce507/post/23
# change pid -> make temp
def get_pod_status(pod_name: str, namespace: str | None = None) -> str:
"""
Return the current status.phase of a pod (Pending, Running, Succeeded, Failed).
"""
pod = oc.selector(f"pod/{pod_name}").object()
return pod.model.status.phase or "Unknown"
def log_job_output(job_name: str, *, wait: int, timeout: int | None) -> None:
"""
Wait until the job's pod completes (Succeeded/Failed), then print its logs once.
"""
pods = oc.selector("pod", labels={"job-name": job_name}).objects()
if not pods:
print(f"No pods found for job {job_name}")
return
pod = pods[0]
pod_name = pod.model.metadata.name
if wait:
start = time.monotonic()
while True:
phase = get_pod_status(pod_name)
if phase in ("Succeeded", "Failed"):
print(f"Pod, {pod_name} finished with phase={phase}")
break
if timeout and (time.monotonic() - start) > timeout:
print(f"Timeout waiting for pod {pod_name} to complete")
return
print(pretty_print(pod_name))
def prepare_context_and_getlist(context: int, context_dir: str, jobs_dir: str, output_dir: str, getlist_path: str) -> None:
if not context:
return
ctx = Path(context_dir).resolve()
out = Path(output_dir).resolve()
gl = Path(getlist_path).resolve()
jobs = Path(jobs_dir).resolve()
if not ctx.is_dir():
print(f"ERROR: CONTEXT_DIR: {ctx} is not a directory")
sys.exit(-1)
if out.exists():
print(f"ERROR: {out} directory already exists")
sys.exit(-1)
try:
out.mkdir(parents=True, exist_ok=False)
except FileExistsError:
print("ERROR: Failed to make output dir (already exists)")
sys.exit(-1)
except Exception as e:
print(f"ERROR: Failed to make output dir: {e}")
sys.exit(-1)
jdir_rel: str | None = None
# Is jobs_dir directly under context_dir? if yes create relative path of jobs
if jobs.parent.resolve() == ctx:
jdir_rel = f"./{jobs.name}"
else:
jdir_rel = None
entries: list[str] = []
for name in sorted(p.name for p in ctx.iterdir()):
# immediate children only (like -mindepth 1 -maxdepth 1)
# "find" would include both files and directories; do the same here
rel = f"./{name}"
if jdir_rel and rel == jdir_rel:
continue
entries.append(rel)
# write files to get list
try:
gl.parent.mkdir(parents=True, exist_ok=True)
gl.write_text("\n".join(entries) + ("\n" if entries else ""))
except Exception as e:
print(f"ERROR: Failed to write getlist at {gl}: {e}")
sys.exit(-1)
def br(args) -> int:
DEFAULT_QUEUES = {
"v100": "v100-localqueue",
"a100": "a100-localqueue",
"h100": "h100-localqueue",
"none": "dummy-localqueue",
}
gpu = args.gpu
name = args.name
image = args.image
job_id = int(args.job_id)
context = int(args.context)
max_sec = int(args.max_sec)
gpu_req = int(args.gpu_numreq)
gpu_lim = int(args.gpu_numlim)
wait = int(args.wait)
timeout = int(args.timeout)
delete = int(args.job_delete)
if gpu not in DEFAULT_QUEUES:
print(f"ERROR: unsupported GPU {gpu} : no queue found")
return 1
queue_name = DEFAULT_QUEUES[gpu]
job_name = f"{name}-{gpu}-{job_id}"
container_name = f"{job_name}-container"
file_to_execute = " ".join(args.command).strip()
pwd = get_cmd("pwd")
context_directory=f"{pwd}"
jobs_directory=f"{pwd}/jobs"
output_directory=f"{jobs_directory}/{job_name}"
dev_pod_name = get_cmd("hostname")
getlist=f"{output_directory}/getlist"
pod = oc.selector(f"pod/{dev_pod_name}").object()
container = getattr(pod.model.spec, "containers", []) or []
dev_container_name = container[0].name
prepare_context_and_getlist(
context=context,
context_dir=context_directory,
jobs_dir=jobs_directory,
output_dir=output_directory,
getlist_path=getlist
)
try:
# Create job body using the helper
job_body = build_job_body(
job_name=job_name,
queue_name=queue_name,
image=image,
container_name=container_name,
cmdline=file_to_execute,
max_sec=max_sec,
gpu=gpu,
gpu_req=gpu_req,
gpu_lim=gpu_lim,
context=context,
devpod_name=dev_pod_name,
devcontainer=dev_container_name,
context_dir=context_directory,
jobs_dir=jobs_directory,
job_workspace=output_directory,
getlist_path=getlist,
)
print(f"Creating job {job_name} in queue {queue_name}...")
job = oc.create(job_body)
print(f"Job: {job_name} created successfully.")
if wait:
log_job_output(
job_name=job_name,
wait=wait,
timeout=timeout
)
except OpenShiftPythonException as e:
print("Error occurred while creating job:")
print(e)
traceback.print_exc()
return 1
if delete:
oc_delete(job_name)
return 0