-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdfg_builder.py
More file actions
182 lines (146 loc) · 6.28 KB
/
dfg_builder.py
File metadata and controls
182 lines (146 loc) · 6.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
"""Phase 1 — Directly-Follows Graph plus self-loop and short-loop discovery.
From §3.1 of Augusto et al. (2019):
* |a -> b| directly-follows frequency
* a -> a self-loop iff |a -> a| > 0
* |a <-> b| number of (e_i, e_j, e_k) triples with labels (a, b, a)
* a <-> b short-loop iff |a -> a| = 0 and |b -> b| = 0 and
|a <-> b| + |b <-> a| != 0
"""
from __future__ import annotations
from collections import defaultdict
from dataclasses import dataclass
from log_loader import RefinedEvent
from types_ import DFG
START = "__start__"
END = "__end__"
@dataclass
class LoopInfo:
self_loops: set[str]
short_loops: set[frozenset[str]] # unordered pairs {a, b}
short_loop_freq: dict[tuple[str, str], int] # directed |a <-> b|
def preprocess_traces(traces: list[list[str]]) -> list[list[str]]:
"""Wrap every trace with sentinel start/end labels (7PMG guideline G3)."""
return [[START, *t, END] for t in traces if t]
def build_dfg(traces: list[list[str]]) -> tuple[DFG, set[str]]:
"""Build the directly-follows graph from preprocessed traces.
Returns the frequency map and the set of node labels (activities).
"""
dfg: dict[tuple[str, str], int] = defaultdict(int)
labels: set[str] = set()
for trace in traces:
for label in trace:
labels.add(label)
for a, b in zip(trace, trace[1:]):
dfg[(a, b)] += 1
return dict(dfg), labels
def short_loop_frequencies(traces: list[list[str]]) -> dict[tuple[str, str], int]:
"""Compute |a <-> b| for every directed pair (a, b)."""
freq: dict[tuple[str, str], int] = defaultdict(int)
for trace in traces:
for i in range(len(trace) - 2):
a, b, c = trace[i], trace[i + 1], trace[i + 2]
if a == c and a != b:
freq[(a, b)] += 1
return dict(freq)
def discover_loops(dfg: DFG, traces: list[list[str]]) -> LoopInfo:
"""Identify self-loops and short-loops as defined in §3.1."""
self_loops: set[str] = {a for (a, b), f in dfg.items() if a == b and f > 0}
short_freq = short_loop_frequencies(traces)
short_loops: set[frozenset[str]] = set()
for (a, b), f in short_freq.items():
if f == 0:
continue
if a in self_loops or b in self_loops:
continue
if short_freq.get((a, b), 0) + short_freq.get((b, a), 0) == 0:
continue
short_loops.add(frozenset((a, b)))
return LoopInfo(self_loops=self_loops, short_loops=short_loops, short_loop_freq=short_freq)
def strip_self_loops(dfg: DFG) -> DFG:
"""Remove self-loop arcs from the DFG (re-attached at the very end)."""
return {(a, b): f for (a, b), f in dfg.items() if a != b}
# ---------------------------------------------------------------------------
# Split Miner 2.0 — refined DFG (Definition 6)
# ---------------------------------------------------------------------------
def preprocess_refined(
refined_traces: list[list[RefinedEvent]],
) -> list[list[RefinedEvent]]:
"""Wrap every refined trace with sentinel ``start_end`` events.
Two pairs of pseudo-events are added: a single ``(START, "end", ...)``
at the head (so the very first activity's ``start`` becomes a successor
of START in the refined DFG) and a single ``(END, "start", ...)`` at
the tail (so the last activity's ``end`` flows into END). Both
sentinels carry both lifecycle states to satisfy any downstream logic
that may look at them.
"""
out: list[list[RefinedEvent]] = []
for trace in refined_traces:
if not trace:
continue
wrapped: list[RefinedEvent] = [
(START, "start", None),
(START, "end", None),
*trace,
(END, "start", None),
(END, "end", None),
]
out.append(wrapped)
return out
def build_refined_dfg(
refined_traces: list[list[RefinedEvent]],
) -> tuple[DFG, set[str]]:
"""Build the SM 2.0 refined DFG (Definition 6).
``a -> b`` is recorded once per pair (i, j) of events in a single trace
such that:
* event i has label ``a`` and lifecycle ``end``;
* event j has label ``b`` and lifecycle ``start``;
* ``i < j``;
* no event strictly between i and j has lifecycle ``end``.
Self-references (``a == b``) are kept here — they get detected as
self-loops by ``discover_loops_refined`` and stripped before the
pruning phase, exactly like in SM 1.x.
"""
dfg: dict[tuple[str, str], int] = defaultdict(int)
labels: set[str] = set()
for trace in refined_traces:
for _, _, _ in trace:
pass
for i, (a, lc_a, _) in enumerate(trace):
labels.add(a)
if lc_a != "end":
continue
for j in range(i + 1, len(trace)):
b, lc_b, _ = trace[j]
if lc_b == "end":
# Any subsequent start is shadowed by this end ->
# stop scanning after we've absorbed this single end.
break
if lc_b == "start":
dfg[(a, b)] += 1
return dict(dfg), labels
def discover_loops_refined(
dfg: DFG,
refined_traces: list[list[RefinedEvent]],
) -> LoopInfo:
"""Self-loops + short-loops over a refined log.
Self-loops are taken straight from the refined DFG. Short-loops are
detected on the *end-event projection* of each trace, which keeps the
SM 1.x semantics (``a -> b -> a`` pattern in completed activities).
"""
self_loops = {a for (a, b), f in dfg.items() if a == b and f > 0}
# Project to the sequence of end-events to evaluate the |a <-> b|
# triplet pattern in completed activities only.
end_projection: list[list[str]] = []
for trace in refined_traces:
end_projection.append([lbl for lbl, lc, _ in trace if lc == "end"])
short_freq = short_loop_frequencies(end_projection)
short_loops: set[frozenset[str]] = set()
for (a, b), f in short_freq.items():
if f == 0:
continue
if a in self_loops or b in self_loops:
continue
if short_freq.get((a, b), 0) + short_freq.get((b, a), 0) == 0:
continue
short_loops.add(frozenset((a, b)))
return LoopInfo(self_loops=self_loops, short_loops=short_loops, short_loop_freq=short_freq)