Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cicada/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ CICADA_ALLSRC = $(CICADA_SRCS1) $(wildcard include/*.hh)

ADD_ANALYSIS=1
BACK_OFF=1
INLINE_VERSION_OPT=1
INLINE_VERSION_PROMOTION=1
INLINE_VERSION_OPT=0
INLINE_VERSION_PROMOTION=0
NO_SPINWAIT=1
MASSTREE_USE=1
PARTITION_TABLE=0
REUSE_VERSION=1
Expand All @@ -29,6 +30,7 @@ CFLAGS = -c -pipe -g -O3 -std=c++17 -march=native \
-DBACK_OFF=$(BACK_OFF) \
-DINLINE_VERSION_PROMOTION=$(INLINE_VERSION_PROMOTION) \
-DINLINE_VERSION_OPT=$(INLINE_VERSION_OPT) \
-DNO_SPINWAIT=$(NO_SPINWAIT) \
-DMASSTREE_USE=$(MASSTREE_USE) \
-DPARTITION_TABLE=$(PARTITION_TABLE) \
-DREUSE_VERSION=$(REUSE_VERSION) \
Expand Down
4 changes: 3 additions & 1 deletion cicada/include/cicada_op_element.hh
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ class ReadElement : public OpElement<T> {
using OpElement<T>::OpElement;

Version *later_ver_, *ver_;
bool rmw_;

ReadElement(uint64_t key, T *rcdptr, Version *later_ver, Version *ver)
ReadElement(uint64_t key, T *rcdptr, Version *later_ver, Version *ver, bool rmw)
: OpElement<T>::OpElement(key, rcdptr) {
later_ver_ = later_ver;
ver_ = ver;
rmw_ = rmw;
}

bool operator<(const ReadElement &right) const {
Expand Down
8 changes: 8 additions & 0 deletions cicada/include/time_stamp.hh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ class TimeStamp {

inline void generateTimeStampFirst(uint8_t tid) {
localClock_ = rdtscp();
#if NO_SPINWAIT
ts_ = ((localClock_ << (sizeof(tid) * 8)) & ~(1ULL<<63))| tid;
#else
ts_ = (localClock_ << (sizeof(tid) * 8)) | tid;
#endif
thid_ = tid;
}

Expand All @@ -34,6 +38,10 @@ class TimeStamp {
localClock_ += elapsedTime;
localClock_ += clockBoost_;

#if NO_SPINWAIT
ts_ = ((localClock_ << (sizeof(tid) * 8)) & ~(1ULL<<63))| tid;
#else
ts_ = (localClock_ << (sizeof(tid) * 8)) | tid;
#endif
}
};
32 changes: 31 additions & 1 deletion cicada/include/transaction.hh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

#define CONTINUING_COMMIT_THRESHOLD 5

#define RTS_NOT_EXTENDABLE (1ULL<<63)

enum class TransactionStatus : uint8_t {
invalid,
inflight,
Expand Down Expand Up @@ -159,7 +161,7 @@ class TxExecutor {
twrite(key);
if ((*pro_set_.begin()).ronly_) {
(*pro_set_.begin()).ronly_ = false;
read_set_.emplace_back(key, tuple, later_ver, ver);
read_set_.emplace_back(key, tuple, later_ver, ver, false);
}
}
}
Expand Down Expand Up @@ -255,6 +257,34 @@ class TxExecutor {
}
}

bool readTimestampUpdateWithValidation() {
uint64_t expected, new_rts;
for (auto itr = read_set_.begin(); itr != read_set_.end(); ++itr) {
expected = (*itr).ver_->ldAcqRts();
if ((*itr).rmw_) {
for (;;) {
if (expected & RTS_NOT_EXTENDABLE ||
(expected & ~RTS_NOT_EXTENDABLE) > this->wts_.ts_) return false;
new_rts = this->wts_.ts_ | RTS_NOT_EXTENDABLE;
if ((*itr).ver_->rts_.compare_exchange_strong(expected, new_rts,
memory_order_acq_rel,
memory_order_acquire))
break;
}
} else {
for (;;) {
if ((expected & ~RTS_NOT_EXTENDABLE) > this->wts_.ts_) break;
if (expected & RTS_NOT_EXTENDABLE) return false;
if ((*itr).ver_->rts_.compare_exchange_strong(expected, this->wts_.ts_,
memory_order_acq_rel,
memory_order_acquire))
break;
}
}
}
return true;
}

/**
* @brief Search xxx set
* @detail Search element of local set corresponding to given key.
Expand Down
60 changes: 59 additions & 1 deletion cicada/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,25 @@ void TxExecutor::tread(const uint64_t key) {
later_ver = ver;
ver = ver->ldAcqNext();
}
#if NO_SPINWAIT
Version *next_ver;
while (ver->status_.load(memory_order_acquire) != VersionStatus::committed) {
next_ver = ver->ldAcqNext();
/*
* TODO: Is this kind of effort effective?
if (next_ver->status_.load(memory_order_acquire) == VersionStatus::committed &&
next_ver->ldAcqRts() & RTS_NOT_EXTENDABLE) {
while (ver->status_.load(memory_order_acquire) == VersionStatus::pending) {
}
if (ver->status_.load(memory_order_acquire) == VersionStatus::aborted) {
ver = next_ver;
break;
}
}
*/
ver = next_ver;
}
#else
while (ver->status_.load(memory_order_acquire) != VersionStatus::committed) {
/**
* Wait for the result of the pending version in the view.
Expand All @@ -144,6 +163,7 @@ void TxExecutor::tread(const uint64_t key) {
ver = ver->ldAcqNext();
}
}
#endif
#endif

/**
Expand All @@ -155,7 +175,7 @@ void TxExecutor::tread(const uint64_t key) {
* If read-only tx, not track or validate read_set_
*/
if ((*this->pro_set_.begin()).ronly_ == false) {
read_set_.emplace_back(key, tuple, later_ver, ver);
read_set_.emplace_back(key, tuple, later_ver, ver, false);
}

#if INLINE_VERSION_OPT
Expand All @@ -174,7 +194,9 @@ void TxExecutor::tread(const uint64_t key) {
cres_->local_read_latency_ += rdtscp() - start;
#endif

#if INLINE_VERSION_PROMOTION
END_TREAD:
#endif

return;
}
Expand Down Expand Up @@ -267,6 +289,7 @@ void TxExecutor::twrite(const uint64_t key) {

/**
* Constraint from new to old.
* TODO: Is this only necessary for RMW case, isn't it?
*/
if ((ver->ldAcqRts() > this->wts_.ts_) &&
(ver->ldAcqStatus() == VersionStatus::committed)) {
Expand All @@ -277,6 +300,16 @@ void TxExecutor::twrite(const uint64_t key) {
goto FINISH_TWRITE;
}

#if NO_SPINWAIT
if (re) {
re->rmw_ = true;
} else {
/*
* TODO: Psedo RMW can be relaxed more.
*/
read_set_.emplace_back(key, tuple, later_ver, ver, true);
}
#endif
Version *new_ver;
new_ver = newVersionGeneration(tuple);
write_set_.emplace_back(key, tuple, later_ver, new_ver, rmw);
Expand Down Expand Up @@ -353,6 +386,14 @@ bool TxExecutor::validation() {
(*itr).finish_version_install_ = true;
}

#if NO_SPINWAIT
/**
* Read timestamp update with validation
*/
result = readTimestampUpdateWithValidation();
if (!result)
goto FINISH_VALIDATION;
#else
/**
* Read timestamp update
*/
Expand Down Expand Up @@ -410,6 +451,7 @@ bool TxExecutor::validation() {
goto FINISH_VALIDATION;
}
}
#endif

FINISH_VALIDATION:
#if ADD_ANALYSIS
Expand Down Expand Up @@ -566,6 +608,22 @@ void TxExecutor::earlyAbort() {
* @return void
*/
void TxExecutor::abort() {
#if NO_SPINWAIT
// Roll back RTS_NOT_EXTENDABLE flags
for (auto itr = read_set_.begin(); itr != read_set_.end(); ++itr) {
if ((*itr).rmw_) {
uint64_t expected, new_rts;
expected = (*itr).ver_->ldAcqRts();
for (;;) {
new_rts = expected & ~RTS_NOT_EXTENDABLE;
if ((*itr).ver_->rts_.compare_exchange_strong(expected, new_rts,
memory_order_acq_rel,
memory_order_acquire))
break;
}
}
}
#endif
writeSetClean();
read_set_.clear();

Expand Down