-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsplits.py
More file actions
162 lines (142 loc) · 4.9 KB
/
splits.py
File metadata and controls
162 lines (142 loc) · 4.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
"""Phase 5 — Discover split gateways
For every task ``t`` with more than one outgoing edge we discover a hierarchy
of XOR / AND gateways that captures the concurrency and exclusion relations
between the *d-successors* of ``t``. Cover and future sets are tracked
per d-successor and per newly inserted gateway; the iteration stops once
``|S| == 1`` or no further split can be inferred. A fallback OR-split is
produced when the hierarchy cannot be decomposed (rare, but possible in the
wild on noisy real-life logs).
"""
from __future__ import annotations
from typing import Iterable
from types_ import WorkingGraph
def _initial_cover_future(
wg: WorkingGraph, d_successors: list[str]
) -> tuple[dict[str, set[str]], dict[str, set[str]]]:
"""Initial C[s] = {s}; F[s] = {s' in d-successors | s' || s}.
At the start of Algorithm 5 every d-successor is a task (gateways from
other tasks' processing do not appear here — see commentary in the paper).
"""
cover: dict[str, set[str]] = {}
future: dict[str, set[str]] = {}
for s in d_successors:
cover[s] = {s}
future[s] = {
other for other in d_successors if other != s and wg.is_concurrent(s, other)
}
return cover, future
def _try_xor_split(
wg: WorkingGraph,
t: str,
s_set: list[str],
cover: dict[str, set[str]],
future: dict[str, set[str]],
) -> str | None:
"""Algorithm 6. Returns the new gateway id if one was inserted."""
for s1 in s_set:
group: set[str] = set()
c_union: set[str] = set(cover[s1])
for s2 in s_set:
if s2 == s1:
continue
if future[s1] == future[s2]:
group.add(s2)
c_union |= cover[s2]
if group:
group.add(s1)
g = wg.add_node("xor", label="xor")
for s in group:
wg.add_edge(g, s)
s_set.remove(s)
s_set.append(g)
cover[g] = c_union
future[g] = set(future[s1])
return g
return None
def _try_and_split(
wg: WorkingGraph,
t: str,
s_set: list[str],
cover: dict[str, set[str]],
future: dict[str, set[str]],
) -> str | None:
"""Algorithm 7. Returns the new gateway id if one was inserted."""
for s1 in s_set:
group: set[str] = set()
c_union: set[str] = set(cover[s1])
f_inter: set[str] = set(future[s1])
cf_s1 = cover[s1] | future[s1]
for s2 in s_set:
if s2 == s1:
continue
cf_s2 = cover[s2] | future[s2]
if cf_s1 == cf_s2:
group.add(s2)
c_union |= cover[s2]
f_inter &= future[s2]
if group:
group.add(s1)
g = wg.add_node("and", label="and")
for s in group:
wg.add_edge(g, s)
s_set.remove(s)
s_set.append(g)
cover[g] = c_union
future[g] = f_inter
return g
return None
def _fallback_or_split(
wg: WorkingGraph,
t: str,
s_set: list[str],
cover: dict[str, set[str]],
future: dict[str, set[str]],
) -> str:
"""Catch-all OR-split used when XOR/AND iteration cannot reduce S."""
g = wg.add_node("or", label="or")
c_union: set[str] = set()
for s in list(s_set):
wg.add_edge(g, s)
c_union |= cover[s]
s_set.clear()
s_set.append(g)
cover[g] = c_union
future[g] = set()
return g
def discover_splits(wg: WorkingGraph) -> None:
"""Insert split gateways for every task in-place (Algorithm 5)."""
# Snapshot the task list — we add gateways during iteration and don't want
# to revisit them as split-tasks themselves.
split_tasks = [
nid for nid, n in list(wg.nodes.items()) if n.kind == "task" or n.kind == "start"
]
for t in split_tasks:
if len(wg.successors(t)) <= 1:
continue
_split_one(wg, t)
def _split_one(wg: WorkingGraph, t: str) -> None:
d_succs = wg.successors(t)
cover, future = _initial_cover_future(wg, d_succs)
s_set: list[str] = list(d_succs)
# Remove outgoing edges of t — they will be re-added through gateways.
for s in list(d_succs):
wg.remove_edge(t, s)
safety = 0
max_iter = 4 * len(d_succs) + 8
while len(s_set) > 1:
progress = False
if _try_xor_split(wg, t, s_set, cover, future) is not None:
progress = True
elif _try_and_split(wg, t, s_set, cover, future) is not None:
progress = True
if not progress:
# Cannot reduce further — fall back to an OR-split.
_fallback_or_split(wg, t, s_set, cover, future)
break
safety += 1
if safety > max_iter:
_fallback_or_split(wg, t, s_set, cover, future)
break
# Re-connect t to the (now single) remaining root.
if s_set:
wg.add_edge(t, s_set[0])