-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlog_loader.py
More file actions
116 lines (97 loc) · 4.12 KB
/
log_loader.py
File metadata and controls
116 lines (97 loc) · 4.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
"""Load an event log into a list of activity-label traces.
Wraps PM4Py's XES reader and projects every trace down to its sequence of
``concept:name`` values. The rest of Split Miner only cares about labels.
"""
from __future__ import annotations
from pathlib import Path
from typing import Any
import pm4py
# A single refined event: ``(label, lifecycle, timestamp)`` where lifecycle
# is normalized to "start" or "end" (anything else, e.g. "complete", is
# treated as an instantaneous event mapped to *both* start and end).
RefinedEvent = tuple[str, str, Any]
def load_traces(path: str | Path) -> list[list[str]]:
"""Read an XES (or .xes.gz) log and return one label sequence per trace.
Empty traces are dropped.
"""
path = str(path)
log = pm4py.read_xes(path)
# pm4py.read_xes returns a DataFrame in recent versions; normalize to EventLog
# so we can iterate trace-by-trace regardless of the input shape.
try:
log = pm4py.convert_to_event_log(log)
except Exception:
pass
traces: list[list[str]] = []
for trace in log:
labels = [str(event["concept:name"]) for event in trace if "concept:name" in event]
if labels:
traces.append(labels)
return traces
def load_traces_v2(path: str | Path) -> list[list[RefinedEvent]]:
"""Read an XES log preserving lifecycle info (Split Miner 2.0).
Each trace is returned as a list of ``(label, lifecycle, timestamp)``
tuples sorted by timestamp. The lifecycle attribute is read from
``lifecycle:transition`` and normalized to "start" / "end". Anything
other than ``start`` (e.g. ``complete``) is mapped to a single
instantaneous event with ``end`` lifecycle (a degenerate fallback that
makes the SM 2.0 pipeline reduce to SM 1.x on lifecycle-less logs).
"""
path = str(path)
log = pm4py.read_xes(path)
try:
log = pm4py.convert_to_event_log(log)
except Exception:
pass
traces: list[list[RefinedEvent]] = []
for trace in log:
events: list[RefinedEvent] = []
for ev in trace:
label = ev.get("concept:name")
if label is None:
continue
label = str(label)
ts = ev.get("time:timestamp")
raw_lc = str(ev.get("lifecycle:transition", "complete")).lower()
if raw_lc == "start":
events.append((label, "start", ts))
elif raw_lc in ("end", "complete", "close", "ate_abort", "abort_activity"):
# "complete" events are instantaneous: emit both a start and
# an end at the same timestamp so the SM 2.0 algorithm has a
# well-formed lifecycle to work with even on legacy logs.
if raw_lc == "complete":
events.append((label, "start", ts))
events.append((label, "end", ts))
else:
# Unknown lifecycles (suspend/resume/etc.) are skipped — SM 2.0
# is defined only over {start, end}.
continue
# Sort by (timestamp, position) — stable on timestamp ties so the
# synthesized start always precedes its matching end.
events = sorted(
enumerate(events),
key=lambda p: (p[1][2] if p[1][2] is not None else 0, p[0]),
)
events = [e for _, e in events]
if events:
traces.append(events)
return traces
def has_lifecycle_info(refined_traces: list[list[RefinedEvent]]) -> bool:
"""Return ``True`` iff at least one trace has an explicit ``start`` event.
Logs that only contain ``complete`` events get mapped to all-``end``,
in which case SM 2.0's overlap-based concurrency cannot fire.
"""
return any(lc == "start" for trace in refined_traces for _, lc, _ in trace)
# The running example log from the original SM paper.
PAPER_EXAMPLE_LOG: list[list[str]] = (
[list("abcgeh")] * 10
+ [list("abcfgh")] * 10
+ [list("abdgeh")] * 10
+ [list("abdegh")] * 10
+ [list("abecgh")] * 10
+ [list("abedgh")] * 10
+ [list("acbegh")] * 10
+ [list("acbfgh")] * 10
+ [list("adbegh")] * 10
+ [list("adbfgh")] * 10
)