Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 145 additions & 0 deletions include/health_monitor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
#pragma once
#include <atomic>
#include <chrono>
#include <vector>
#include <unordered_map>
#include <mutex>
#include <memory>
#include <functional>
#include "job_scheduler.h"

enum class ComponentType {
WAL_MANAGER,
PAGE_CACHE,
WRITER_QUEUE,
JOB_SCHEDULER,
VERSION_MANAGER,
CHECKPOINT_MANAGER,
BTREE_ENGINE
};

enum class HealthStatus {
HEALTHY,
WARNING,
CRITICAL,
FAILED
};

struct HealthMetric {
std::string name;
double value;
double warning_threshold;
double critical_threshold;
std::chrono::steady_clock::time_point last_updated;
HealthStatus status;

HealthMetric(const std::string& n, double warn_thresh, double crit_thresh)
: name(n), value(0.0), warning_threshold(warn_thresh),
critical_threshold(crit_thresh), last_updated(std::chrono::steady_clock::now()),
status(HealthStatus::HEALTHY) {}
};

struct ComponentHealth {
ComponentType type;
std::string name;
HealthStatus status;
std::vector<std::shared_ptr<HealthMetric>> metrics;
std::chrono::steady_clock::time_point last_check;
std::string last_error;
size_t consecutive_failures;

ComponentHealth(ComponentType t, const std::string& n)
: type(t), name(n), status(HealthStatus::HEALTHY),
last_check(std::chrono::steady_clock::now()), consecutive_failures(0) {}
};

class HealthMonitor {
private:
// Component health tracking
std::unordered_map<ComponentType, std::shared_ptr<ComponentHealth>> components;
mutable std::mutex health_mutex;

// Health check scheduling
JobScheduler* job_scheduler;
std::string health_check_job_name;
std::chrono::milliseconds check_interval;

// Recovery actions
std::unordered_map<ComponentType, std::function<bool()>> recovery_actions;
std::atomic<size_t> recovery_attempts;
std::atomic<size_t> successful_recoveries;

// System-wide health
std::atomic<HealthStatus> overall_health;
std::chrono::steady_clock::time_point last_health_change;

// Alerting
std::function<void(ComponentType, HealthStatus, const std::string&)> alert_callback;

// Configuration
size_t max_consecutive_failures;
std::chrono::minutes recovery_cooldown;
std::unordered_map<ComponentType, std::chrono::steady_clock::time_point> last_recovery_attempt;

// Health check functions
bool performHealthCheck();
void checkComponent(std::shared_ptr<ComponentHealth> component);
void updateOverallHealth();
bool shouldAttemptRecovery(ComponentType type) const;
void attemptRecovery(ComponentType type);

public:
HealthMonitor(JobScheduler* scheduler, std::chrono::milliseconds interval = std::chrono::seconds(30));
~HealthMonitor();

// Lifecycle
void start();
void stop();

// Component registration
void registerComponent(ComponentType type, const std::string& name);
void addMetric(ComponentType type, const std::string& metric_name,
double warning_threshold, double critical_threshold);
void registerRecoveryAction(ComponentType type, std::function<bool()> recovery_func);

// Metric updates
void updateMetric(ComponentType type, const std::string& metric_name, double value);
void reportError(ComponentType type, const std::string& error_message);
void reportRecovery(ComponentType type);

// Health status queries
HealthStatus getComponentHealth(ComponentType type) const;
HealthStatus getOverallHealth() const;
bool isSystemHealthy() const;
std::vector<ComponentType> getUnhealthyComponents() const;

// Statistics
struct HealthStats {
size_t total_components;
size_t healthy_components;
size_t warning_components;
size_t critical_components;
size_t failed_components;
size_t recovery_attempts;
size_t successful_recoveries;
double recovery_success_rate;
HealthStatus overall_status;
std::chrono::steady_clock::time_point last_health_change;
};

HealthStats getStats() const;
void printHealthReport() const;

// Configuration
void setAlertCallback(std::function<void(ComponentType, HealthStatus, const std::string&)> callback);
void setMaxConsecutiveFailures(size_t max_failures);
void setRecoveryCooldown(std::chrono::minutes cooldown);

private:
// Job scheduler function
bool healthCheckJobFunc();

// Utility functions
std::string componentTypeToString(ComponentType type) const;
std::string healthStatusToString(HealthStatus status) const;
};
112 changes: 112 additions & 0 deletions include/version_manager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#pragma once
#include <unordered_map>
#include <vector>
#include <mutex>
#include <atomic>
#include <chrono>
#include <memory>
#include "page_manager.h"

// Transaction timestamp for MVCC
using TransactionId = uint64_t;
using Timestamp = std::chrono::steady_clock::time_point;

template<typename KeyType>
struct VersionedRecord {
KeyType key;
std::vector<uint8_t> data;
TransactionId created_by;
TransactionId deleted_by; // 0 if not deleted
Timestamp created_at;
Timestamp deleted_at;
bool is_deleted;

VersionedRecord(const KeyType& k, const std::vector<uint8_t>& d, TransactionId txn_id)
: key(k), data(d), created_by(txn_id), deleted_by(0),
created_at(std::chrono::steady_clock::now()), is_deleted(false) {}
};

template<typename KeyType>
struct Transaction {
TransactionId id;
Timestamp start_time;
Timestamp commit_time;
bool is_committed;
bool is_aborted;
std::vector<KeyType> read_set;
std::vector<KeyType> write_set;

Transaction(TransactionId txn_id)
: id(txn_id), start_time(std::chrono::steady_clock::now()),
is_committed(false), is_aborted(false) {}
};

template<typename KeyType>
class VersionManager {
private:
// Version storage: key -> list of versions (newest first)
std::unordered_map<KeyType, std::vector<std::shared_ptr<VersionedRecord<KeyType>>>> versions;

// Active transactions
std::unordered_map<TransactionId, std::shared_ptr<Transaction<KeyType>>> active_transactions;
std::unordered_map<TransactionId, std::shared_ptr<Transaction<KeyType>>> committed_transactions;

// Transaction ID generation
std::atomic<TransactionId> next_transaction_id;

// Version cleanup tracking
std::atomic<size_t> total_versions;
std::atomic<size_t> cleaned_versions;
Timestamp last_cleanup;

// Synchronization
mutable std::mutex versions_mutex;
mutable std::mutex transactions_mutex;

// Configuration
std::chrono::hours version_retention_period;
size_t max_versions_per_key;

// Helper methods
bool isVisible(const std::shared_ptr<VersionedRecord<KeyType>>& version, TransactionId reader_txn) const;
std::shared_ptr<VersionedRecord<KeyType>> findVisibleVersion(const KeyType& key, TransactionId reader_txn) const;

public:
VersionManager(std::chrono::hours retention = std::chrono::hours(24), size_t max_versions = 100);
~VersionManager();

// Transaction management
TransactionId beginTransaction();
bool commitTransaction(TransactionId txn_id);
bool abortTransaction(TransactionId txn_id);
bool isTransactionActive(TransactionId txn_id) const;

// MVCC operations
bool insert(TransactionId txn_id, const KeyType& key, const std::vector<uint8_t>& data);
bool update(TransactionId txn_id, const KeyType& key, const std::vector<uint8_t>& new_data);
bool remove(TransactionId txn_id, const KeyType& key);
std::shared_ptr<VersionedRecord<KeyType>> read(TransactionId txn_id, const KeyType& key);

// Version cleanup
size_t cleanupOldVersions();
size_t cleanupAbortedTransactions();
bool canCleanupVersion(const std::shared_ptr<VersionedRecord<KeyType>>& version) const;

// Statistics and monitoring
struct VersionStats {
size_t total_versions;
size_t active_transactions;
size_t committed_transactions;
size_t versions_per_key_avg;
size_t cleaned_versions;
double cleanup_efficiency;
std::chrono::steady_clock::time_point last_cleanup_time;
};

VersionStats getStats() const;
void printStats() const;

// Configuration
void setRetentionPeriod(std::chrono::hours period);
void setMaxVersionsPerKey(size_t max_versions);
};
Binary file removed job_scheduler_demo
Binary file not shown.
31 changes: 22 additions & 9 deletions makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,44 @@ SRCDIR = src
OBJDIR = obj

# Source files (only B-tree related files)
SOURCES = src/Btree.cpp src/main.cpp src/page_manager.cpp src/page_cache.cpp src/writer_queue.cpp src/wal.cpp src/job_scheduler.cpp src/checkpoint_manager.cpp
SOURCES = src/Btree.cpp src/main.cpp src/page_manager.cpp src/page_cache.cpp src/writer_queue.cpp src/wal.cpp src/job_scheduler.cpp src/checkpoint_manager.cpp src/version_manager.cpp src/health_monitor.cpp
OBJECTS = $(SOURCES:$(SRCDIR)/%.cpp=$(OBJDIR)/%.o)

# Demo source files
DEMO_SOURCES = src/Btree.cpp src/content_hash_demo.cpp src/page_manager.cpp src/page_cache.cpp src/writer_queue.cpp src/wal.cpp src/job_scheduler.cpp src/checkpoint_manager.cpp
DEMO_SOURCES = src/Btree.cpp src/content_hash_demo.cpp src/page_manager.cpp src/page_cache.cpp src/writer_queue.cpp src/wal.cpp src/job_scheduler.cpp src/checkpoint_manager.cpp src/version_manager.cpp src/health_monitor.cpp
DEMO_OBJECTS = $(DEMO_SOURCES:$(SRCDIR)/%.cpp=$(OBJDIR)/%.o)

# Content addressable demo
ADDRESSABLE_SOURCES = src/Btree.cpp src/content_addressable_demo.cpp src/page_manager.cpp src/page_cache.cpp src/writer_queue.cpp src/wal.cpp src/job_scheduler.cpp src/checkpoint_manager.cpp
ADDRESSABLE_SOURCES = src/Btree.cpp src/content_addressable_demo.cpp src/page_manager.cpp src/page_cache.cpp src/writer_queue.cpp src/wal.cpp src/job_scheduler.cpp src/checkpoint_manager.cpp src/version_manager.cpp src/health_monitor.cpp
ADDRESSABLE_OBJECTS = $(ADDRESSABLE_SOURCES:$(SRCDIR)/%.cpp=$(OBJDIR)/%.o)

# Deduplication demo
DEDUP_SOURCES = src/Btree.cpp src/deduplication_demo.cpp src/page_manager.cpp src/page_cache.cpp src/writer_queue.cpp src/wal.cpp src/job_scheduler.cpp src/checkpoint_manager.cpp
DEDUP_SOURCES = src/Btree.cpp src/deduplication_demo.cpp src/page_manager.cpp src/page_cache.cpp src/writer_queue.cpp src/wal.cpp src/job_scheduler.cpp src/checkpoint_manager.cpp src/version_manager.cpp src/health_monitor.cpp
DEDUP_OBJECTS = $(DEDUP_SOURCES:$(SRCDIR)/%.cpp=$(OBJDIR)/%.o)

# Cache performance demo
CACHE_PERF_SOURCES = src/Btree.cpp src/cache_performance_demo.cpp src/page_manager.cpp src/page_cache.cpp src/writer_queue.cpp src/wal.cpp src/job_scheduler.cpp src/checkpoint_manager.cpp
CACHE_PERF_SOURCES = src/Btree.cpp src/cache_performance_demo.cpp src/page_manager.cpp src/page_cache.cpp src/writer_queue.cpp src/wal.cpp src/job_scheduler.cpp src/checkpoint_manager.cpp src/version_manager.cpp src/health_monitor.cpp
CACHE_PERF_OBJECTS = $(CACHE_PERF_SOURCES:$(SRCDIR)/%.cpp=$(OBJDIR)/%.o)

# Job scheduler demo
JOB_SCHED_SOURCES = src/Btree.cpp src/job_scheduler_demo.cpp src/page_manager.cpp src/page_cache.cpp src/writer_queue.cpp src/wal.cpp src/job_scheduler.cpp src/checkpoint_manager.cpp
JOB_SCHED_SOURCES = src/Btree.cpp src/job_scheduler_demo.cpp src/page_manager.cpp src/page_cache.cpp src/writer_queue.cpp src/wal.cpp src/job_scheduler.cpp src/checkpoint_manager.cpp src/version_manager.cpp src/health_monitor.cpp
JOB_SCHED_OBJECTS = $(JOB_SCHED_SOURCES:$(SRCDIR)/%.cpp=$(OBJDIR)/%.o)

# MVCC and Health demo
MVCC_HEALTH_SOURCES = src/mvcc_health_demo.cpp src/page_manager.cpp src/version_manager.cpp src/health_monitor.cpp src/job_scheduler.cpp
MVCC_HEALTH_OBJECTS = $(MVCC_HEALTH_SOURCES:$(SRCDIR)/%.cpp=$(OBJDIR)/%.o)

# Target executables
TARGET = btree_test
DEMO_TARGET = content_hash_demo
ADDRESSABLE_TARGET = content_addressable_demo
DEDUP_TARGET = deduplication_demo
CACHE_PERF_TARGET = cache_performance_demo
JOB_SCHED_TARGET = job_scheduler_demo
MVCC_HEALTH_TARGET = mvcc_health_demo

# Default target
all: $(TARGET) $(DEMO_TARGET) $(ADDRESSABLE_TARGET) $(DEDUP_TARGET) $(CACHE_PERF_TARGET) $(JOB_SCHED_TARGET)
all: $(TARGET) $(DEMO_TARGET) $(ADDRESSABLE_TARGET) $(DEDUP_TARGET) $(CACHE_PERF_TARGET) $(JOB_SCHED_TARGET) $(MVCC_HEALTH_TARGET)

# Create object directory if it doesn't exist
$(OBJDIR):
Expand Down Expand Up @@ -70,9 +75,13 @@ $(CACHE_PERF_TARGET): $(CACHE_PERF_OBJECTS)
$(JOB_SCHED_TARGET): $(JOB_SCHED_OBJECTS)
$(CXX) $(JOB_SCHED_OBJECTS) -o $(JOB_SCHED_TARGET)

# Link MVCC and health demo executable
$(MVCC_HEALTH_TARGET): $(MVCC_HEALTH_OBJECTS)
$(CXX) $(MVCC_HEALTH_OBJECTS) -o $(MVCC_HEALTH_TARGET)

# Clean build files
clean:
rm -rf $(OBJDIR) $(TARGET) $(DEMO_TARGET) $(ADDRESSABLE_TARGET) $(DEDUP_TARGET) $(CACHE_PERF_TARGET) $(JOB_SCHED_TARGET)
rm -rf $(OBJDIR) $(TARGET) $(DEMO_TARGET) $(ADDRESSABLE_TARGET) $(DEDUP_TARGET) $(CACHE_PERF_TARGET) $(JOB_SCHED_TARGET) $(MVCC_HEALTH_TARGET)

# Run the test
run: $(TARGET)
Expand All @@ -98,6 +107,10 @@ cache_perf: $(CACHE_PERF_TARGET)
job_sched: $(JOB_SCHED_TARGET)
./$(JOB_SCHED_TARGET)

# Run the MVCC and health demo
mvcc_health: $(MVCC_HEALTH_TARGET)
./$(MVCC_HEALTH_TARGET)

# Run all tests (for CI/CD compatibility)
tests: $(TARGET) $(DEMO_TARGET) $(ADDRESSABLE_TARGET) $(DEDUP_TARGET)
@echo "=== Running Content Hash Demo ==="
Expand All @@ -113,4 +126,4 @@ tests: $(TARGET) $(DEMO_TARGET) $(ADDRESSABLE_TARGET) $(DEDUP_TARGET)
@echo -e "insert 1 apple\ninsert 2 banana\nsearch 1\nsearch 2\nquit" | ./$(TARGET) > /dev/null
@echo "All tests passed!"

.PHONY: all clean run demo addressable dedup cache_perf job_sched tests
.PHONY: all clean run demo addressable dedup cache_perf job_sched mvcc_health tests
Binary file added mvcc_health_demo
Binary file not shown.
Loading