-
Notifications
You must be signed in to change notification settings - Fork 16.7k
Introduce parent task spans and nest worker and trigger spans under them #63839
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
1e6f518
1546540
edc298e
065882b
18590df
714102d
6709732
c1cce26
1745c7b
f624632
0d3fe47
43c9913
747724b
3ed122b
8bd301b
9c41724
943ff31
c89c5f8
5908945
506d1cb
069071d
bd46602
6647f62
5ae2291
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,6 +32,8 @@ | |
| import attrs | ||
| import dill | ||
| import uuid6 | ||
| from opentelemetry import trace | ||
| from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator | ||
| from sqlalchemy import ( | ||
| JSON, | ||
| Float, | ||
|
|
@@ -67,6 +69,7 @@ | |
| from airflow import settings | ||
| from airflow._shared.observability.metrics.dual_stats_manager import DualStatsManager | ||
| from airflow._shared.observability.metrics.stats import Stats | ||
| from airflow._shared.observability.traces import new_dagrun_trace_carrier | ||
| from airflow._shared.timezones import timezone | ||
| from airflow.assets.manager import asset_manager | ||
| from airflow.configuration import conf | ||
|
|
@@ -102,7 +105,7 @@ | |
| TR = TaskReschedule | ||
|
|
||
| log = logging.getLogger(__name__) | ||
|
|
||
| tracer = trace.get_tracer(__name__) | ||
|
|
||
| if TYPE_CHECKING: | ||
| from datetime import datetime | ||
|
|
@@ -382,7 +385,7 @@ def clear_task_instances( | |
| for instance in tis: | ||
| run_ids_by_dag_id[instance.dag_id].add(instance.run_id) | ||
|
|
||
| drs = session.scalars( | ||
| drs: Iterable[DagRun] = session.scalars( | ||
| select(DagRun).where( | ||
| or_( | ||
| *( | ||
|
|
@@ -397,6 +400,7 @@ def clear_task_instances( | |
| # Always update clear_number and queued_at when clearing tasks, regardless of state | ||
| dr.clear_number += 1 | ||
| dr.queued_at = timezone.utcnow() | ||
| dr.context_carrier = new_dagrun_trace_carrier() | ||
|
|
||
| _recalculate_dagrun_queued_at_deadlines(dr, dr.queued_at, session) | ||
|
|
||
|
|
@@ -425,6 +429,8 @@ def clear_task_instances( | |
| if dag_run_state == DagRunState.QUEUED: | ||
| dr.last_scheduling_decision = None | ||
| dr.start_date = None | ||
| for ti in tis: | ||
| ti.context_carrier = _make_task_carrier(ti.dag_run.context_carrier) | ||
| session.flush() | ||
|
|
||
|
|
||
|
|
@@ -486,6 +492,17 @@ def uuid7() -> UUID: | |
| return uuid6.uuid7() | ||
|
|
||
|
|
||
| def _make_task_carrier(dag_run_context_carrier): | ||
| parent_context = ( | ||
| TraceContextTextMapPropagator().extract(dag_run_context_carrier) if dag_run_context_carrier else None | ||
| ) | ||
| span = tracer.start_span("notused", context=parent_context) # intentionally never closed | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will be the new hidden parent for the task span?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe that's why the task span doesn't appear as a child of the dag_run span.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this just generates a trace id and a span id for the TI parent span the issue was, when i actually emit the task span, i need to supply the DR span context as context for the task parent span. you can see that in the last commit now.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @dstandish Why does it have to happen here? Can't we just start the parent task span and then access its IDs and persist them to the DB? The parent task span is created under core. I think there is db access there.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i create it when we create the task --- i guess, what's your concern? |
||
| new_ctx = trace.set_span_in_context(span) | ||
| carrier: dict[str, str] = {} | ||
| TraceContextTextMapPropagator().inject(carrier, context=new_ctx) | ||
| return carrier | ||
|
|
||
|
|
||
| class TaskInstance(Base, LoggingMixin, BaseWorkload): | ||
| """ | ||
| Task instances store the state of a task instance. | ||
|
|
@@ -679,7 +696,7 @@ def stats_tags(self) -> dict[str, str]: | |
|
|
||
| @staticmethod | ||
| def insert_mapping( | ||
dstandish marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| run_id: str, task: Operator, map_index: int, dag_version_id: UUID | None | ||
| run_id: str, task: Operator, map_index: int, *, dag_version_id: UUID | None, dag_run: DagRun | ||
| ) -> dict[str, Any]: | ||
| """ | ||
| Insert mapping. | ||
|
|
@@ -689,6 +706,7 @@ def insert_mapping( | |
| priority_weight = task.weight_rule.get_weight( | ||
| TaskInstance(task=task, run_id=run_id, map_index=map_index, dag_version_id=dag_version_id) | ||
| ) | ||
| context_carrier = _make_task_carrier(dag_run.context_carrier) | ||
|
|
||
| return { | ||
| "dag_id": task.dag_id, | ||
|
|
@@ -710,6 +728,7 @@ def insert_mapping( | |
| "map_index": map_index, | ||
| "_task_display_property_value": task.task_display_name, | ||
| "dag_version_id": dag_version_id, | ||
| "context_carrier": context_carrier, | ||
| } | ||
|
|
||
| @reconstructor | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.