Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions include/btree.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "content_storage.h"
#include "page_cache.h"
#include "writer_queue.h"
#include "wal.h"

/*
* BTree that stores the BTreeNodes, ensures it is balanced
Expand All @@ -23,6 +24,8 @@ class BTree {
ContentStorage<KeyType> content_storage;
PageCache<KeyType> page_cache;
WriterQueue<KeyType> writer_queue;
WALManager<KeyType> wal_manager;
uint64_t current_transaction;

void insertNonFull(std::shared_ptr<Page<KeyType>> root, const KeyType& key, const ValueType& value);
void splitChild(std::shared_ptr<Page<KeyType>> parent, int index, std::shared_ptr<Page<KeyType>> child);
Expand All @@ -40,6 +43,10 @@ class BTree {
ValueType* search(const KeyType& key); // Public search method
void printStorageStats() const;
void flush(); // To flush all pending writes

void beginTransaction();
void commitTransaction();
void abortTransaction();

Page<KeyType> findKey(std::shared_ptr<Page<KeyType>> node, const KeyType& key);

Expand Down
95 changes: 95 additions & 0 deletions include/wal.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#pragma once
#include <fstream>
#include <vector>
#include <mutex>
#include <atomic>
#include <chrono>
#include <string>
#include <cstdint>

enum class WALRecordType : uint8_t {
INSERT = 1,
DELETE = 2,
UPDATE = 3,
CHECKPOINT = 4,
COMMIT = 5,
ABORT = 6
};

struct WALRecordHeader {
WALRecordType type;
uint32_t record_size;
uint64_t transaction_id;
uint64_t lsn; // Log sequence number
uint32_t checksum;
std::chrono::steady_clock::time_point timestamp;

WALRecordHeader(WALRecordType t, uint32_t size, uint64_t txn_id, uint64_t sequence_num)
: type(t), record_size(size), transaction_id(txn_id), lsn(sequence_num),
checksum(0), timestamp(std::chrono::steady_clock::now()) {}
};

// WAL record for data operations
template<typename KeyType>
struct WALDataRecord {
WALRecordHeader header;
uint16_t page_id;
KeyType key;
std::vector<uint8_t> old_data; // For rollback
std::vector<uint8_t> new_data; // Redo

WALDataRecord(WALRecordType type, uint64_t txn_id, uint64_t lsn,
uint16_t pid, const KeyType& k)
: header(type, sizeof(WALDataRecord), txn_id, lsn),
page_id(pid), key(k) {}
};

// WAL manager class
template<typename KeyType>
class WALManager {
private:
std::string wal_file_path;
std::ofstream wal_file;
std::mutex wal_mutex;

std::atomic<uint64_t> next_lsn;
std::atomic<uint64_t> next_transaction_id;
std::atomic<uint64_t> last_checkpoint_lsn;

// Need buffer for batching writes
std::vector<uint8_t> write_buffer;
size_t buffer_size_limit;

uint32_t calculateChecksum(const void* data, size_t size);
void flushBuffer();

public:
WALManager(const std::string& wal_path, size_t buffer_limit = 4096);
~WALManager();

uint64_t beginTransaction();
void commitTransaction(uint64_t txn_id);
void abortTransaction(uint64_t txn_id);

// Data operation logging
uint64_t logInsert(uint64_t txn_id, uint16_t page_id, const KeyType& key,
const std::vector<uint8_t>& data);
uint64_t logDelete(uint64_t txn_id, uint16_t page_id, const KeyType& key,
const std::vector<uint8_t>& old_data);
uint64_t logUpdate(uint64_t txn_id, uint16_t page_id, const KeyType& key,
const std::vector<uint8_t>& old_data,
const std::vector<uint8_t>& new_data);

// Checkpoint management
uint64_t writeCheckpoint();
uint64_t getLastCheckpointLSN() const { return last_checkpoint_lsn.load(); }

// Recovery operations
void replay(uint64_t from_lsn = 0);
void truncate(uint64_t up_to_lsn);

// Utility
void sync(); // Force write to disk
uint64_t getCurrentLSN() const { return next_lsn.load(); }
size_t getWALSize() const;
};
10 changes: 5 additions & 5 deletions makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,23 @@ SRCDIR = src
OBJDIR = obj

# Source files (only B-tree related files)
SOURCES = src/Btree.cpp src/main.cpp src/page_manager.cpp src/page_cache.cpp src/writer_queue.cpp
SOURCES = src/Btree.cpp src/main.cpp src/page_manager.cpp src/page_cache.cpp src/writer_queue.cpp src/wal.cpp
OBJECTS = $(SOURCES:$(SRCDIR)/%.cpp=$(OBJDIR)/%.o)

# Demo source files
DEMO_SOURCES = src/Btree.cpp src/content_hash_demo.cpp src/page_manager.cpp src/page_cache.cpp src/writer_queue.cpp
DEMO_SOURCES = src/Btree.cpp src/content_hash_demo.cpp src/page_manager.cpp src/page_cache.cpp src/writer_queue.cpp src/wal.cpp
DEMO_OBJECTS = $(DEMO_SOURCES:$(SRCDIR)/%.cpp=$(OBJDIR)/%.o)

# Content addressable demo
ADDRESSABLE_SOURCES = src/Btree.cpp src/content_addressable_demo.cpp src/page_manager.cpp src/page_cache.cpp src/writer_queue.cpp
ADDRESSABLE_SOURCES = src/Btree.cpp src/content_addressable_demo.cpp src/page_manager.cpp src/page_cache.cpp src/writer_queue.cpp src/wal.cpp
ADDRESSABLE_OBJECTS = $(ADDRESSABLE_SOURCES:$(SRCDIR)/%.cpp=$(OBJDIR)/%.o)

# Deduplication demo
DEDUP_SOURCES = src/Btree.cpp src/deduplication_demo.cpp src/page_manager.cpp src/page_cache.cpp src/writer_queue.cpp
DEDUP_SOURCES = src/Btree.cpp src/deduplication_demo.cpp src/page_manager.cpp src/page_cache.cpp src/writer_queue.cpp src/wal.cpp
DEDUP_OBJECTS = $(DEDUP_SOURCES:$(SRCDIR)/%.cpp=$(OBJDIR)/%.o)

# Cache performance demo
CACHE_PERF_SOURCES = src/Btree.cpp src/cache_performance_demo.cpp src/page_manager.cpp src/page_cache.cpp src/writer_queue.cpp
CACHE_PERF_SOURCES = src/Btree.cpp src/cache_performance_demo.cpp src/page_manager.cpp src/page_cache.cpp src/writer_queue.cpp src/wal.cpp
CACHE_PERF_OBJECTS = $(CACHE_PERF_SOURCES:$(SRCDIR)/%.cpp=$(OBJDIR)/%.o)

# Target executables
Expand Down
57 changes: 56 additions & 1 deletion src/Btree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,15 @@ template <typename KeyType, typename ValueType>
BTree<KeyType, ValueType>::BTree(int maxKeys)
: maxKeysPerNode(maxKeys),
page_cache(&content_storage, 50), // Cache up to 50 pages, can change latr
writer_queue(&content_storage, &page_cache, 2) { // 2 writer threads
writer_queue(&content_storage, &page_cache, 2), // 2 writer threads
wal_manager("btree.wal", 8192), // 8KB WAL buffer
current_transaction(0) {

writer_queue.start();

// Start first transaction
current_transaction = wal_manager.beginTransaction();

// Initially, the tree is empty, so we create a root node
// and mark it as a leaf (all data starts at the leaf level in B+ Trees)
uint16_t root_id = content_storage.storePage(createPage<KeyType>(true));
Expand All @@ -23,8 +29,13 @@ BTree<KeyType, ValueType>::BTree(int maxKeys)
*/
template <typename KeyType, typename ValueType>
BTree<KeyType, ValueType>::~BTree() {
if (current_transaction != 0) {
wal_manager.commitTransaction(current_transaction);
}

writer_queue.stop();
page_cache.flushAll();
wal_manager.sync();
}

/*
Expand All @@ -36,14 +47,54 @@ void BTree<KeyType, ValueType>::flush() {
page_cache.flushAll();
}

/*
* Transaction Management Methods
*/
template <typename KeyType, typename ValueType>
void BTree<KeyType, ValueType>::beginTransaction() {
if (current_transaction != 0) {
wal_manager.commitTransaction(current_transaction);
}
current_transaction = wal_manager.beginTransaction();
}

template <typename KeyType, typename ValueType>
void BTree<KeyType, ValueType>::commitTransaction() {
if (current_transaction != 0) {
wal_manager.commitTransaction(current_transaction);
current_transaction = 0;
}
}

template <typename KeyType, typename ValueType>
void BTree<KeyType, ValueType>::abortTransaction() {
if (current_transaction != 0) {
wal_manager.abortTransaction(current_transaction);
current_transaction = 0;
}
}

/*
* Placeholder method to insert key value pairs
*/
template <typename KeyType, typename ValueType>
void BTree<KeyType, ValueType>::insert(const KeyType& key, const ValueType& value) {
// Ensure we have an active transaction
if (current_transaction == 0) {
current_transaction = wal_manager.beginTransaction();
}

// Serialize the value for WAL logging
std::vector<uint8_t> serialized_value;
const uint8_t* value_bytes = reinterpret_cast<const uint8_t*>(&value);
serialized_value.assign(value_bytes, value_bytes + sizeof(ValueType));

if (!root) { // If tree is empty, create a new root
uint16_t root_id = content_storage.storePage(createPage<KeyType>(true));
root = page_cache.getPage(root_id);

// Log the insert operation
wal_manager.logInsert(current_transaction, root_id, key, serialized_value);
} else if (root->keys.size() == maxKeysPerNode) { // Check if the root is full
Page<KeyType> new_root_page = createPage<KeyType>(false);
new_root_page.children.push_back(root->header.page_id); // Page ID of the old root
Expand All @@ -54,6 +105,10 @@ void BTree<KeyType, ValueType>::insert(const KeyType& key, const ValueType& valu
writer_queue.enqueueWrite(new_root_page.header.page_id, std::make_shared<Page<KeyType>>(new_root_page));
root = page_cache.getPage(new_root_page.header.page_id);
}

// Log the insert operation for all cases so that we can rollback if needed
wal_manager.logInsert(current_transaction, root->header.page_id, key, serialized_value);

// Now the root is guaranteed to not be empty
insertNonFull(root, key, value); // Insert

Expand Down
Loading