Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added btree.wal
Binary file not shown.
Binary file removed btree_test
Binary file not shown.
Binary file removed cache_performance_demo
Binary file not shown.
Binary file removed content_addressable_demo
Binary file not shown.
Binary file removed content_hash_demo
Binary file not shown.
Binary file removed deduplication_demo
Binary file not shown.
4 changes: 4 additions & 0 deletions include/btree.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ class BTree {
void beginTransaction();
void commitTransaction();
void abortTransaction();

// Accessors for job scheduler integration
WALManager<KeyType>& getWALManager() { return wal_manager; }
PageCache<KeyType>& getPageCache() { return page_cache; }

Page<KeyType> findKey(std::shared_ptr<Page<KeyType>> node, const KeyType& key);

Expand Down
75 changes: 75 additions & 0 deletions include/checkpoint_manager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#pragma once
#include <chrono>
#include <atomic>
#include <string>
#include <memory>
#include "wal.h"
#include "page_cache.h"
#include "job_scheduler.h"

template<typename KeyType>
class CheckpointManager {
private:
WALManager<KeyType>* wal_manager;
PageCache<KeyType>* page_cache;
JobScheduler* job_scheduler;

// Checkpoint configuration
std::chrono::milliseconds checkpoint_interval;
size_t wal_size_threshold; // Trigger checkpoint when WAL exceeds this size
size_t dirty_page_threshold; // Trigger checkpoint when dirty pages exceed this

// Checkpoint tracking
std::atomic<uint64_t> last_checkpoint_lsn;
std::atomic<std::chrono::steady_clock::time_point> last_checkpoint_time;
std::atomic<size_t> checkpoints_completed;
std::atomic<size_t> checkpoints_failed;

// Job IDs for recurring jobs
std::string checkpoint_job_name;
std::string cleanup_job_name;

public:
CheckpointManager(WALManager<KeyType>* wal, PageCache<KeyType>* cache,
JobScheduler* scheduler,
std::chrono::milliseconds interval = std::chrono::minutes(5),
size_t wal_threshold = 1024 * 1024, // 1MB
size_t dirty_threshold = 100); // 100 pages

~CheckpointManager();

// Lifecycle
void start();
void stop();

// Manual checkpoint
bool performCheckpoint();

// Automatic checkpoint triggers
bool shouldCheckpoint() const;
void scheduleCheckpointIfNeeded();

// Configuration
void setCheckpointInterval(std::chrono::milliseconds interval);
void setWALSizeThreshold(size_t threshold);
void setDirtyPageThreshold(size_t threshold);

// Statistics
struct CheckpointStats {
size_t total_checkpoints;
size_t failed_checkpoints;
double success_rate;
uint64_t last_checkpoint_lsn;
std::chrono::steady_clock::time_point last_checkpoint_time;
size_t current_wal_size;
bool is_healthy;
};

CheckpointStats getStats() const;
void printStats() const;

private:
// Job functions for scheduler
bool checkpointJobFunc();
bool cleanupJobFunc();
};
167 changes: 167 additions & 0 deletions include/job_scheduler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
#pragma once
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <chrono>
#include <functional>
#include <memory>
#include <string>
#include <unordered_map>

enum class JobType {
CHECKPOINT,
VERSION_PRUNE,
HEALTH_CHECK,
CUSTOM
};

enum class JobStatus {
PENDING,
RUNNING,
COMPLETED,
FAILED,
CANCELLED
};

enum class JobPriority {
LOW = 0,
NORMAL = 1,
HIGH = 2,
CRITICAL = 3
};

// Base job class
class Job {
public:
uint64_t job_id;
JobType type;
JobPriority priority;
JobStatus status;
std::chrono::steady_clock::time_point created_at;
std::chrono::steady_clock::time_point scheduled_at;
std::chrono::milliseconds timeout;
std::function<bool()> execute_func;
std::string description;

Job(uint64_t id, JobType t, JobPriority p, std::function<bool()> func,
const std::string& desc, std::chrono::milliseconds to = std::chrono::minutes(5))
: job_id(id), type(t), priority(p), status(JobStatus::PENDING),
created_at(std::chrono::steady_clock::now()),
scheduled_at(std::chrono::steady_clock::now()),
timeout(to), execute_func(func), description(desc) {}

// For priority queue ordering
bool operator<(const Job& other) const {
if (priority != other.priority) {
return priority < other.priority;
}
return scheduled_at > other.scheduled_at;
}
};

// Job scheduler class
class JobScheduler {
private:
// Thread pool
std::vector<std::thread> worker_threads;
size_t num_workers;
std::atomic<bool> running;

// Job queue with priority
std::priority_queue<std::shared_ptr<Job>> job_queue;
std::mutex queue_mutex;
std::condition_variable queue_cv;
std::condition_variable shutdown_cv;

// Job tracking
std::unordered_map<uint64_t, std::shared_ptr<Job>> active_jobs;
std::unordered_map<uint64_t, std::shared_ptr<Job>> completed_jobs;
mutable std::mutex jobs_mutex;

// Job ID generation
std::atomic<uint64_t> next_job_id;

// Health monitoring
std::atomic<size_t> total_jobs_executed;
std::atomic<size_t> failed_jobs;
std::atomic<size_t> successful_jobs;
std::chrono::steady_clock::time_point last_health_check;

// Recurring job management
struct RecurringJobInfo {
std::chrono::milliseconds interval;
std::chrono::steady_clock::time_point next_execution;
std::function<bool()> job_func;
std::string description;
JobPriority priority;
bool enabled;
};
std::unordered_map<std::string, RecurringJobInfo> recurring_jobs;
std::mutex recurring_jobs_mutex;

// Worker thread functions
void workerThread(int worker_id);
void schedulerThread();

// Job execution
bool executeJob(std::shared_ptr<Job> job);
void handleJobTimeout(std::shared_ptr<Job> job);

// Recurring job management
void scheduleRecurringJobs();

public:
JobScheduler(size_t num_threads = 4);
~JobScheduler();

// Lifecycle
void start();
void stop();
bool isRunning() const { return running.load(); }

// Job submission
uint64_t scheduleJob(JobType type, JobPriority priority,
std::function<bool()> job_func, const std::string& description,
std::chrono::milliseconds delay = std::chrono::milliseconds(0),
std::chrono::milliseconds timeout = std::chrono::minutes(5));

uint64_t scheduleCheckpoint(std::function<bool()> checkpoint_func,
std::chrono::milliseconds delay = std::chrono::milliseconds(0));

uint64_t scheduleVersionPrune(std::function<bool()> prune_func,
std::chrono::milliseconds delay = std::chrono::milliseconds(0));

// Recurring jobs
bool addRecurringJob(const std::string& name, std::chrono::milliseconds interval,
std::function<bool()> job_func, const std::string& description,
JobPriority priority = JobPriority::NORMAL);

bool removeRecurringJob(const std::string& name);
bool enableRecurringJob(const std::string& name, bool enabled);

// Job management
bool cancelJob(uint64_t job_id);
JobStatus getJobStatus(uint64_t job_id);
std::shared_ptr<Job> getJob(uint64_t job_id);

// Health and monitoring
struct SchedulerStats {
size_t pending_jobs;
size_t active_jobs;
size_t total_executed;
size_t successful;
size_t failed;
double success_rate;
size_t worker_threads;
bool is_healthy;
};

SchedulerStats getStats() const;
void printStats() const;
bool isHealthy() const;

// Maintenance
void cleanupCompletedJobs(std::chrono::hours max_age = std::chrono::hours(24));
};
Binary file added job_scheduler_demo
Binary file not shown.
29 changes: 21 additions & 8 deletions makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,39 @@ SRCDIR = src
OBJDIR = obj

# Source files (only B-tree related files)
SOURCES = src/Btree.cpp src/main.cpp src/page_manager.cpp src/page_cache.cpp src/writer_queue.cpp src/wal.cpp
SOURCES = src/Btree.cpp src/main.cpp src/page_manager.cpp src/page_cache.cpp src/writer_queue.cpp src/wal.cpp src/job_scheduler.cpp src/checkpoint_manager.cpp
OBJECTS = $(SOURCES:$(SRCDIR)/%.cpp=$(OBJDIR)/%.o)

# Demo source files
DEMO_SOURCES = src/Btree.cpp src/content_hash_demo.cpp src/page_manager.cpp src/page_cache.cpp src/writer_queue.cpp src/wal.cpp
DEMO_SOURCES = src/Btree.cpp src/content_hash_demo.cpp src/page_manager.cpp src/page_cache.cpp src/writer_queue.cpp src/wal.cpp src/job_scheduler.cpp src/checkpoint_manager.cpp
DEMO_OBJECTS = $(DEMO_SOURCES:$(SRCDIR)/%.cpp=$(OBJDIR)/%.o)

# Content addressable demo
ADDRESSABLE_SOURCES = src/Btree.cpp src/content_addressable_demo.cpp src/page_manager.cpp src/page_cache.cpp src/writer_queue.cpp src/wal.cpp
ADDRESSABLE_SOURCES = src/Btree.cpp src/content_addressable_demo.cpp src/page_manager.cpp src/page_cache.cpp src/writer_queue.cpp src/wal.cpp src/job_scheduler.cpp src/checkpoint_manager.cpp
ADDRESSABLE_OBJECTS = $(ADDRESSABLE_SOURCES:$(SRCDIR)/%.cpp=$(OBJDIR)/%.o)

# Deduplication demo
DEDUP_SOURCES = src/Btree.cpp src/deduplication_demo.cpp src/page_manager.cpp src/page_cache.cpp src/writer_queue.cpp src/wal.cpp
DEDUP_SOURCES = src/Btree.cpp src/deduplication_demo.cpp src/page_manager.cpp src/page_cache.cpp src/writer_queue.cpp src/wal.cpp src/job_scheduler.cpp src/checkpoint_manager.cpp
DEDUP_OBJECTS = $(DEDUP_SOURCES:$(SRCDIR)/%.cpp=$(OBJDIR)/%.o)

# Cache performance demo
CACHE_PERF_SOURCES = src/Btree.cpp src/cache_performance_demo.cpp src/page_manager.cpp src/page_cache.cpp src/writer_queue.cpp src/wal.cpp
CACHE_PERF_SOURCES = src/Btree.cpp src/cache_performance_demo.cpp src/page_manager.cpp src/page_cache.cpp src/writer_queue.cpp src/wal.cpp src/job_scheduler.cpp src/checkpoint_manager.cpp
CACHE_PERF_OBJECTS = $(CACHE_PERF_SOURCES:$(SRCDIR)/%.cpp=$(OBJDIR)/%.o)

# Job scheduler demo
JOB_SCHED_SOURCES = src/Btree.cpp src/job_scheduler_demo.cpp src/page_manager.cpp src/page_cache.cpp src/writer_queue.cpp src/wal.cpp src/job_scheduler.cpp src/checkpoint_manager.cpp
JOB_SCHED_OBJECTS = $(JOB_SCHED_SOURCES:$(SRCDIR)/%.cpp=$(OBJDIR)/%.o)

# Target executables
TARGET = btree_test
DEMO_TARGET = content_hash_demo
ADDRESSABLE_TARGET = content_addressable_demo
DEDUP_TARGET = deduplication_demo
CACHE_PERF_TARGET = cache_performance_demo
JOB_SCHED_TARGET = job_scheduler_demo

# Default target
all: $(TARGET) $(DEMO_TARGET) $(ADDRESSABLE_TARGET) $(DEDUP_TARGET) $(CACHE_PERF_TARGET)
all: $(TARGET) $(DEMO_TARGET) $(ADDRESSABLE_TARGET) $(DEDUP_TARGET) $(CACHE_PERF_TARGET) $(JOB_SCHED_TARGET)

# Create object directory if it doesn't exist
$(OBJDIR):
Expand Down Expand Up @@ -61,9 +66,13 @@ $(DEDUP_TARGET): $(DEDUP_OBJECTS)
$(CACHE_PERF_TARGET): $(CACHE_PERF_OBJECTS)
$(CXX) $(CACHE_PERF_OBJECTS) -o $(CACHE_PERF_TARGET)

# Link job scheduler demo executable
$(JOB_SCHED_TARGET): $(JOB_SCHED_OBJECTS)
$(CXX) $(JOB_SCHED_OBJECTS) -o $(JOB_SCHED_TARGET)

# Clean build files
clean:
rm -rf $(OBJDIR) $(TARGET) $(DEMO_TARGET) $(ADDRESSABLE_TARGET) $(DEDUP_TARGET) $(CACHE_PERF_TARGET)
rm -rf $(OBJDIR) $(TARGET) $(DEMO_TARGET) $(ADDRESSABLE_TARGET) $(DEDUP_TARGET) $(CACHE_PERF_TARGET) $(JOB_SCHED_TARGET)

# Run the test
run: $(TARGET)
Expand All @@ -85,6 +94,10 @@ dedup: $(DEDUP_TARGET)
cache_perf: $(CACHE_PERF_TARGET)
./$(CACHE_PERF_TARGET)

# Run the job scheduler demo
job_sched: $(JOB_SCHED_TARGET)
./$(JOB_SCHED_TARGET)

# Run all tests (for CI/CD compatibility)
tests: $(TARGET) $(DEMO_TARGET) $(ADDRESSABLE_TARGET) $(DEDUP_TARGET)
@echo "=== Running Content Hash Demo ==="
Expand All @@ -100,4 +113,4 @@ tests: $(TARGET) $(DEMO_TARGET) $(ADDRESSABLE_TARGET) $(DEDUP_TARGET)
@echo -e "insert 1 apple\ninsert 2 banana\nsearch 1\nsearch 2\nquit" | ./$(TARGET) > /dev/null
@echo "All tests passed!"

.PHONY: all clean run demo addressable dedup cache_perf tests
.PHONY: all clean run demo addressable dedup cache_perf job_sched tests
Loading