2828from _tilebox .grpc .error import InternalServerError
2929from tilebox .datasets .sync .dataset import DatasetClient
3030from tilebox .workflows .cache import JobCache
31- from tilebox .workflows .data import ComputedTask , Idling , NextTaskToRun , ProgressBar , Task , TaskLease
31+ from tilebox .workflows .data import ComputedTask , Idling , NextTaskToRun , ProgressIndicator , Task , TaskLease
3232from tilebox .workflows .interceptors import Interceptor , InterceptorType
3333from tilebox .workflows .observability .logging import get_logger
3434from tilebox .workflows .observability .tracing import WorkflowTracer
5656_FALLBACK_JITTER_INTERVAL = timedelta (seconds = 5 )
5757
5858# Maximum number of progress bars per task, mirroring the limit on the server side
59- _MAX_TASK_PROGRESS_BARS = 1000
59+ _MAX_TASK_PROGRESS_INDICATORS = 1000
6060
6161WrappedFnReturnT = TypeVar ("WrappedFnReturnT" )
6262
@@ -217,7 +217,7 @@ def _external_interrupt_handler(self, signum: int, frame: FrameType | None) -> N
217217 if self ._task is not None :
218218 progress = []
219219 if self ._context is not None :
220- progress = _mutable_progress_bars_to_progress_updates (self ._context ._progress_bars ) # noqa: SLF001
220+ progress = _finalize_mutable_progress_trackers (self ._context ._progress_indicators ) # noqa: SLF001
221221 self ._service .task_failed (
222222 self ._task ,
223223 RunnerShutdown ("Task was interrupted" ),
@@ -431,7 +431,7 @@ def _execute(self, task: Task, shutdown_context: _GracefulShutdown) -> Task | Id
431431
432432 task_failed_retry = _retry_backoff (self ._service .task_failed , stop = shutdown_context .stop_if_shutting_down ())
433433 cancel_job = True
434- progress_updates = _mutable_progress_bars_to_progress_updates (context ._progress_bars ) # noqa: SLF001
434+ progress_updates = _finalize_mutable_progress_trackers (context ._progress_indicators ) # noqa: SLF001
435435 task_failed_retry (task , e , cancel_job , progress_updates )
436436
437437 return None
@@ -493,7 +493,7 @@ def _try_execute(
493493 task .to_submission (self .tasks_to_run .cluster_slug )
494494 for task in context ._sub_tasks # noqa: SLF001
495495 ],
496- progress_updates = _mutable_progress_bars_to_progress_updates (context ._progress_bars ), # noqa: SLF001
496+ progress_updates = _finalize_mutable_progress_trackers (context ._progress_indicators ), # noqa: SLF001
497497 )
498498
499499 next_task_retry = _retry_backoff (self ._service .next_task , stop = shutdown_context .stop_if_shutting_down ())
@@ -513,7 +513,7 @@ def __init__(self, runner: TaskRunner, task: Task, job_cache: JobCache) -> None:
513513 self .current_task = task
514514 self .job_cache = job_cache
515515 self ._sub_tasks : list [FutureTask ] = []
516- self ._progress_bars : dict [str | None , ProgressUpdate ] = {}
516+ self ._progress_indicators : dict [str | None , ProgressUpdate ] = {}
517517
518518 def submit_subtask (
519519 self ,
@@ -548,20 +548,20 @@ def submit_batch(
548548 )
549549 return self .submit_subtasks (tasks , cluster , max_retries )
550550
551- def progress (self , label : str | None ) -> ProgressUpdate :
551+ def progress (self , label : str | None = None ) -> ProgressUpdate :
552552 if label == "" :
553553 label = None
554554
555- if label in self ._progress_bars :
556- return self ._progress_bars [label ]
555+ if label in self ._progress_indicators :
556+ return self ._progress_indicators [label ]
557557
558558 # this is our server side limit to prevent mistakes / abuse, so let's not allow to go beyond that already
559559 # client side
560- if len (self ._progress_bars ) > _MAX_TASK_PROGRESS_BARS :
561- raise ValueError (f"Cannot create more than { _MAX_TASK_PROGRESS_BARS } progress bars per task." )
560+ if len (self ._progress_indicators ) > _MAX_TASK_PROGRESS_INDICATORS :
561+ raise ValueError (f"Cannot create more than { _MAX_TASK_PROGRESS_INDICATORS } progress indicators per task." )
562562
563563 progress_bar = ProgressUpdate (label )
564- self ._progress_bars [label ] = progress_bar
564+ self ._progress_indicators [label ] = progress_bar
565565 return progress_bar
566566
567567 @property
@@ -577,8 +577,10 @@ def _dataset(self, dataset_id: str) -> DatasetClient:
577577 return client .dataset (dataset_id )
578578
579579
580- def _mutable_progress_bars_to_progress_updates (progress_bars : dict [str | None , ProgressUpdate ]) -> list [ProgressBar ]:
581- return [ProgressBar (label , bar ._total , bar ._done ) for label , bar in progress_bars .items ()] # noqa: SLF001
580+ def _finalize_mutable_progress_trackers (
581+ progress_bars : dict [str | None , ProgressUpdate ],
582+ ) -> list [ProgressIndicator ]:
583+ return [ProgressIndicator (label , bar ._total , bar ._done ) for label , bar in progress_bars .items ()] # noqa: SLF001
582584
583585
584586def _execute (task : TaskInstance , context : ExecutionContext , additional_interceptors : list [Interceptor ]) -> None :
0 commit comments