-
Notifications
You must be signed in to change notification settings - Fork 22
Expand file tree
/
Copy pathBlockingQueue.tla
More file actions
127 lines (99 loc) · 4.97 KB
/
BlockingQueue.tla
File metadata and controls
127 lines (99 loc) · 4.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
--------------------------- MODULE BlockingQueue ---------------------------
EXTENDS Naturals, Sequences, FiniteSets
CONSTANTS Producers, (* the (nonempty) set of producers *)
Consumers, (* the (nonempty) set of consumers *)
BufCapacity (* the maximum number of messages in the bounded buffer *)
ASSUME Assumption ==
/\ Producers # {} (* at least one producer *)
/\ Consumers # {} (* at least one consumer *)
/\ Producers \intersect Consumers = {} (* no thread is both consumer and producer *)
/\ BufCapacity \in (Nat \ {0}) (* buffer capacity is at least 1 *)
-----------------------------------------------------------------------------
VARIABLES buffer, waitSet
vars == <<buffer, waitSet>>
RunningThreads == (Producers \cup Consumers) \ waitSet
NotifyOther(Others) ==
IF waitSet \cap Others # {}
THEN \E t \in waitSet \cap Others : waitSet' = waitSet \ {t}
ELSE UNCHANGED waitSet
(* @see java.lang.Object#wait *)
Wait(t) == /\ waitSet' = waitSet \cup {t}
/\ UNCHANGED <<buffer>>
-----------------------------------------------------------------------------
Put(t, d) ==
/\ t \notin waitSet
/\ \/ /\ Len(buffer) < BufCapacity
/\ buffer' = Append(buffer, d)
/\ NotifyOther(Consumers)
\/ /\ Len(buffer) = BufCapacity
/\ Wait(t)
Get(t) ==
/\ t \notin waitSet
/\ \/ /\ buffer # <<>>
/\ buffer' = Tail(buffer)
/\ NotifyOther(Producers)
\/ /\ buffer = <<>>
/\ Wait(t)
-----------------------------------------------------------------------------
(* Initially, the buffer is empty and no thread is waiting. *)
Init == /\ buffer = <<>>
/\ waitSet = {}
(* Then, pick a thread out of all running threads and have it do its thing. *)
Next == \/ \E p \in Producers: Put(p, p) \* Add some data to buffer
\/ \E c \in Consumers: Get(c)
-----------------------------------------------------------------------------
(* TLA+ is untyped, thus lets verify the range of some values in each state. *)
TypeInv == /\ buffer \in Seq(Producers)
/\ Len(buffer) \in 0..BufCapacity
/\ waitSet \in SUBSET (Producers \cup Consumers)
(* No Deadlock *)
Invariant == waitSet # (Producers \cup Consumers)
-----------------------------------------------------------------------------
MySeq(P) == UNION {[1..n -> P] : n \in 0..BufCapacity}
Spec == Init /\ [][Next]_vars
\* The naive thing to do is to check if the conjunct of TypeInv /\ Invariant
\* is inductive.
IInv == /\ TypeInv!2
/\ TypeInv!3
/\ Invariant
\* When the buffer is empty, a consumer will be added to the waitSet.
\* However, this does not crate a deadlock, because at least one producer
\* will not be in the waitSet.
/\ buffer = <<>> => \E p \in Producers : p \notin waitSet
\* Vice versa, when buffer is full, a producer will be added to waitSet,
\* but at least one consumer won't be in waitSet.
/\ Len(buffer) = BufCapacity => \E c \in Consumers : c \notin waitSet
MCIInv == TypeInv!1 /\ IInv
-----------------------------------------------------------------------------
PutEnabled == \A p \in Producers : ENABLED Put(p, p)
FairSpec ==
/\ Spec
\* Assert that producers take steps should their Put action be (continuously)
\* enabled. This is the basic case of fairness that rules out stuttering, i.e.,
\* assert global progress.
/\ \A t \in Producers:
WF_vars(Put(t,t))
\* Stipulates that Get actions (consumers!) will eventually notify *all*
\* waiting producers. In other words, given repeated Get actions (we don't
\* care which consumer, thus, existential quantification), all waiting
\* producers will eventually be notified. Because Get actions are not
\* continuously enabled (the buffer might be empty), weak fairness is not
\* strong enough. Obviously, no real system scheduler implements such an
\* inefficient "policy".
\* This fairness constraint was initially proposed by Leslie Lamport, although
\* with the minor typo "in" instead of "notin", which happens to hold for
\* configurations with at most two producers.
/\ \A t \in Producers:
SF_vars(\E self \in Consumers: Get(self) /\ t \notin waitSet')
\* See notes above (except swap "producer" with "consumer").
/\ \A t \in Consumers:
WF_vars(Get(t))
/\ \A t \in Consumers:
SF_vars(\E self \in Producers: Put(self, self) /\ t \notin waitSet')
(* All producers will continuously be serviced. For this to be violated, *)
(* ASSUME Cardinality(Producers) > 1 has to hold (a single producer cannot *)
(* starve itself). *)
Starvation ==
/\ \A p \in Producers: []<>(<<Put(p, p)>>_vars)
/\ \A c \in Consumers: []<>(<<Get(c)>>_vars)
=============================================================================