Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions example/LockFreeQueue.chpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/**
* Created by Garvit Dewan -
* https://github.com/dgarvit/epoch-based-manager/blob/master/src/LockFreeQueue.chpl
*
* Lock-Free Queue that uses ABA feature of LocalAtomicObject
* Based on Michael Scott Queue
*/
module LockFreeQueue {

use LocalAtomics;

class node {
type eltType;
var val : eltType;
var next : LocalAtomicObject(unmanaged node(eltType));

proc init(val : ?eltType) {
this.eltType = eltType;
this.val = val;
}

proc init(type eltType) {
this.eltType = eltType;
}
}

class LockFreeQueue {
type objType;
var _head : LocalAtomicObject(unmanaged node(objType));
var _tail : LocalAtomicObject(unmanaged node(objType));

proc init(type objType) {
this.objType = objType;
this.complete();
var _node = new unmanaged node(objType);
_head.write(_node);
_tail.write(_node);
}

proc enqueue(newObj : objType) {
var n = new unmanaged node(newObj);
while (true) {
var curr_tail = _tail.readABA();
var next = curr_tail.next.readABA();
if (next.getObject() == nil) {
if (curr_tail.next.compareExchangeABA(next, n)) {
_tail.compareExchangeABA(curr_tail, n);
break;
}
}
else {
_tail.compareExchangeABA(curr_tail, next.getObject());
}
}
}

proc dequeue() : objType {
while (true) {
var curr_head = _head.readABA();
var curr_tail = _tail.readABA();
var next = curr_head.next.readABA();
if (curr_head.getObject() == curr_tail.getObject()) {
if (next.getObject() == nil) then
return nil;
_tail.compareExchangeABA(curr_tail, next.getObject());
}
else {
if (_head.compareExchangeABA(curr_head, next.getObject())) then
return next.getObject().val;
}
}
return nil;
}

iter these() : objType {
var ptr = _head.read().next.read();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the queue is empty?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When queue is empty, it will hold a dummy node, which will have a next, which will be a LocalAtomicObject, which will point to a nil value.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, nothing will happen, because, if ptr is nil. then the function will finish executing.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I forgot about the dummy node! That should work then.

while (ptr != nil) {
yield ptr.val;
ptr = ptr.next.read();
}
}

proc peek() : objType {
var actual_head = _head.read().next.read();
if (actual_head != nil) then
return actual_head.val;
return nil;
}

proc deinit() {
var ptr = _head.read();
while (ptr != nil) {
_head = ptr.next;
delete ptr.val;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, don't assume the user will have classes. If they want to have their classes managed, they should use a shared or owned type.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. Alright.

delete ptr;
ptr = _head.read();
}
}
}
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a main function here that tests it, similar to what is done in LockFreeStack.chpl:

proc main() {
var a = new LockFreeStack(int);
forall i in 1..1024 do a.push(i);
a.push(1025..2048);
writeln(+ reduce a);
}