-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtypes_.py
More file actions
97 lines (71 loc) · 3.08 KB
/
types_.py
File metadata and controls
97 lines (71 loc) · 3.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
"""Shared data types used across Split Miner phases."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Iterable, Literal
NodeKind = Literal["task", "xor", "and", "or", "start", "end"]
@dataclass
class Node:
id: str
kind: NodeKind
label: str = ""
@dataclass
class WorkingGraph:
"""Mutable adjacency representation used between phases 4-7.
Edges are unweighted directed (source_id, target_id) pairs. Multiple
parallel edges between the same pair of nodes are not supported.
"""
nodes: dict[str, Node] = field(default_factory=dict)
out_edges: dict[str, list[str]] = field(default_factory=dict)
in_edges: dict[str, list[str]] = field(default_factory=dict)
start_id: str = ""
end_id: str = ""
# Concurrency relation from phase 2 (symmetric pairs of task ids).
concurrency: set[frozenset[str]] = field(default_factory=set)
# Self-loops detected in phase 1: task ids that should carry a self-loop
# in the final BPMN (reattached after split/join discovery).
self_loops: set[str] = field(default_factory=set)
_id_counter: int = 0
def fresh_id(self, prefix: str) -> str:
self._id_counter += 1
return f"{prefix}_{self._id_counter}"
def add_node(self, kind: NodeKind, label: str = "", node_id: str | None = None) -> str:
if node_id is None:
node_id = self.fresh_id(kind)
self.nodes[node_id] = Node(id=node_id, kind=kind, label=label)
self.out_edges.setdefault(node_id, [])
self.in_edges.setdefault(node_id, [])
return node_id
def add_edge(self, src: str, tgt: str) -> None:
if tgt not in self.out_edges[src]:
self.out_edges[src].append(tgt)
if src not in self.in_edges[tgt]:
self.in_edges[tgt].append(src)
def remove_edge(self, src: str, tgt: str) -> None:
if tgt in self.out_edges.get(src, []):
self.out_edges[src].remove(tgt)
if src in self.in_edges.get(tgt, []):
self.in_edges[tgt].remove(src)
def remove_node(self, node_id: str) -> None:
for s in list(self.in_edges.get(node_id, [])):
self.remove_edge(s, node_id)
for t in list(self.out_edges.get(node_id, [])):
self.remove_edge(node_id, t)
self.in_edges.pop(node_id, None)
self.out_edges.pop(node_id, None)
self.nodes.pop(node_id, None)
def successors(self, node_id: str) -> list[str]:
return list(self.out_edges.get(node_id, []))
def predecessors(self, node_id: str) -> list[str]:
return list(self.in_edges.get(node_id, []))
def edges(self) -> list[tuple[str, str]]:
return [(s, t) for s, ts in self.out_edges.items() for t in ts]
def is_concurrent(self, a: str, b: str) -> bool:
return frozenset((a, b)) in self.concurrency
# A DFG is just a frequency dict over directed task pairs.
DFG = dict[tuple[str, str], int]
def activities(dfg: DFG, extra: Iterable[str] = ()) -> set[str]:
s: set[str] = set(extra)
for a, b in dfg.keys():
s.add(a)
s.add(b)
return s