Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ jobs:
- run: echo "The job is automatically triggered by a ${{github.event_name}} event."
- name: "Install APT packages"
run: >
sudo apt update;
sudo apt update;
sudo apt install libssl-dev librdmacm-dev libibverbs-dev libspdlog-dev -y;
sudo apt install libboost-all-dev ragel python3.10 python3-pip -y
sudo apt install libreadline-dev libboost-all-dev ragel python3 python3-pip -y
- run: g++ --version
- run: cmake --version
- run: lscpu
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/build_use_zerocopy_delta_api.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ jobs:
- run: echo "The job is automatically triggered by a ${{github.event_name}} event."
- name: "Install APT packages"
run: >
sudo apt update;
sudo apt update;
sudo apt install libssl-dev librdmacm-dev libibverbs-dev libspdlog-dev -y;
sudo apt install libboost-all-dev ragel python3.10 python3-pip -y
sudo apt install libreadline-dev libboost-all-dev ragel python3 python3-pip -y
- run: g++ --version
- run: cmake --version
- run: lscpu
Expand Down
16 changes: 15 additions & 1 deletion include/cascade/cascade_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,22 @@ class ICascadeStore {
*/
virtual void put_and_forget(const VT& value, bool as_trigger) const = 0;

#ifdef ENABLE_EVALUATION

/**
* @brief oob_send
*
* @param[in] data_addr Local memory address of data to write to remote node
* @param[in] gpu_addr Remote address to write to
* @param[in] rkey Access key to the remote memory
* @param[in] size The size of the remote allocated memory
*/
virtual bool oob_send(uint64_t data_addr,uint64_t gpu_addr, uint64_t rkey,size_t size) const {
dbg_default_warn("Calling unsupported func:{}", __PRETTY_FUNCTION__);
return false;
}

#ifdef ENABLE_EVALUATION
/**
* @brief A function to evaluate the performance of an internal shard
*
* @param[in] max_payload_size The maximum size of the payload.
Expand Down
25 changes: 25 additions & 0 deletions include/cascade/detail/service_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,31 @@ derecho::rpc::QueryResults<version_tuple> ServiceClient<CascadeTypes...>::put(
return this->template type_recursive_put<ObjectType,CascadeTypes...>(subgroup_type_index,value,subgroup_index,shard_index,as_trigger);
}

template <typename... CascadeTypes>
template <typename SubgroupType>
void ServiceClient<CascadeTypes...>::oob_get_remote(const node_id_t& node_id, uint32_t subgroup_index, uint64_t data_addr, uint64_t landing_addr, uint64_t rkey, size_t size){

if (!is_external_client()) {
auto& subgroup_handle = group_ptr->template get_subgroup<SubgroupType>(subgroup_index);
subgroup_handle.template p2p_send<RPC_NAME(oob_send)>(node_id,data_addr, landing_addr, rkey, size);
std::cout << "SENT P2P OOB SEND RPC CALL to node of id:" << node_id << std::endl;

}
}

template <typename... CascadeTypes>
void ServiceClient<CascadeTypes...>::oob_register_mem_ex(void* addr, size_t size, const memory_attribute_t& attr) {
group_ptr->register_oob_memory_ex(addr, size, attr);
}

template <typename... CascadeTypes>
void ServiceClient<CascadeTypes...>::oob_deregister_mem(void* addr) {
group_ptr->deregister_oob_memory(addr);
}
template <typename... CascadeTypes>
uint64_t ServiceClient<CascadeTypes...>::oob_rkey(void* addr){
return group_ptr->get_oob_memory_key(addr);
}
template <typename... CascadeTypes>
template <typename SubgroupType>
void ServiceClient<CascadeTypes...>::put_and_forget(
Expand Down
17 changes: 17 additions & 0 deletions include/cascade/detail/volatile_store_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,23 @@ double internal_perf_put(derecho::Replicated<CascadeType>& subgroup_handle, cons
return (num_messages_sent)*1e9 / (now_ns - start_ns);
}

template <typename KT, typename VT, KT* IK, VT* IV>
bool VolatileCascadeStore<KT, VT, IK, IV>::oob_send(uint64_t data_addr, uint64_t gpu_addr, uint64_t rkey, size_t size) const{
// STEP 2 - do RDMA write to send the OOB data
dbg_default_debug("called oob_send with, data_addr={}, gpu_addr={}, rkey={}, size={}", data_addr, gpu_addr, rkey, size);
auto& subgroup_handle = group->template get_subgroup<VolatileCascadeStore>(this->subgroup_index);
struct iovec iov;
iov.iov_base = reinterpret_cast<void*>(data_addr); iov.iov_len = static_cast<size_t>(size);
subgroup_handle.oob_remote_write(group->get_rpc_caller_id(),&iov,1,gpu_addr,rkey,size);
dbg_default_debug("Finished ASYNC oob remote write Derecho call");
subgroup_handle.wait_for_oob_op(group->get_rpc_caller_id(),OOB_OP_WRITE,1000);
dbg_default_debug("FINISHED OOB REMOTE WRITE");

std::cout << "FINISHED OOB Remote Write" << std::endl;
return true;

}

template <typename KT, typename VT, KT* IK, VT* IV>
double VolatileCascadeStore<KT, VT, IK, IV>::perf_put(const uint32_t max_payload_size, const uint64_t duration_sec) const {
debug_enter_func_with_args("max_payload_size={},duration_sec={}", max_payload_size, duration_sec);
Expand Down
19 changes: 17 additions & 2 deletions include/cascade/service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -768,8 +768,23 @@ namespace cascade {
*/
template <typename ObjectType>
derecho::rpc::QueryResults<version_tuple> put(const ObjectType& object, bool as_trigger = false);

/**

/**
* @param[in] node_id Node_id of the node that we want to execute a GPU-direct RDMA write an object
* @param[in] data_addr The address of the data on the node specified by node_id
* @param[in] gpu_addr The address of the allocated memory region (Starting address where the data will be written to during the one-sided RDMA write)
* @param[in] rkey The access key for allocated memory
* @param[in] size The size of the allocated memory region
*/
template <typename SubgroupType>
void oob_get_remote(const node_id_t& node_id, uint32_t subgroup_index, const uint64_t data_addr, uint64_t landing_addr, uint64_t rkey, size_t size);

void oob_register_mem_ex(void* addr, size_t size, const memory_attribute_t& attr);

void oob_deregister_mem(void* addr);

uint64_t oob_rkey(void* addr);
/**
* "put_and_forget" writes an object to a given subgroup/shard, but no return value.
*
* @param[in] object the object to write.
Expand Down
4 changes: 3 additions & 1 deletion include/cascade/volatile_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ class VolatileCascadeStore : public ICascadeStore<KT, VT, IK, IV>,
multi_get_size,
get_size,
get_size_by_time,
trigger_put
trigger_put,
oob_send
#ifdef ENABLE_EVALUATION
,
dump_timestamp_log
Expand Down Expand Up @@ -91,6 +92,7 @@ class VolatileCascadeStore : public ICascadeStore<KT, VT, IK, IV>,
#endif // ENABLE_EVALUATION
virtual void trigger_put(const VT& value) const override;
virtual version_tuple put(const VT& value, bool as_trigger) const override;
virtual bool oob_send(uint64_t data_addr, uint64_t gpu_addr, uint64_t rkey, size_t size) const override;
#ifdef ENABLE_EVALUATION
virtual double perf_put(const uint32_t max_payload_size, const uint64_t duration_sec) const override;
#endif // ENABLE_EVALUATION
Expand Down
15 changes: 15 additions & 0 deletions src/service/perftest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1150,6 +1150,21 @@ PerfTestServer::~PerfTestServer() {
// PerfTestClient implementation //
//////////////////////////////////////

std::ostream& operator<<(std::ostream& os, PutType pt) {
switch(pt) {
case PutType::PUT:
os << "PUT";
break;
case PutType::PUT_AND_FORGET:
os << "PUT_AND_FORGET";
break;
case PutType::TRIGGER_PUT:
os << "TRIGGER_PUT";
break;
}
return os;
}

PerfTestClient::PerfTestClient(ServiceClientAPI& capi):capi(capi) {}

void PerfTestClient::add_or_update_server(const std::string& host, uint16_t port) {
Expand Down
7 changes: 7 additions & 0 deletions src/service/perftest.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once
#include <cascade/service_client_api.hpp>
#include <iostream>
#include <limits>
#include <rpc/server.h>
#include <rpc/client.h>
Expand Down Expand Up @@ -176,6 +177,8 @@ enum PutType {
TRIGGER_PUT // trigger put
};

std::ostream& operator<<(std::ostream& os, PutType pt);

class PerfTestClient {
private:
std::map<std::pair<std::string,uint16_t>,std::unique_ptr<::rpc::client>> connections;
Expand Down Expand Up @@ -778,3 +781,7 @@ bool PerfTestClient::perf_get_by_time(uint32_t subgroup_index,
}
}
}

// Formatter boilerplate for the spdlog library
template <>
struct fmt::formatter<derecho::cascade::PutType> : fmt::ostream_formatter {};
37 changes: 35 additions & 2 deletions src/service/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,41 @@
namespace derecho {
namespace cascade {

/**
* cpu/gpu list examples:

std::ostream& operator<<(std::ostream& stream, const ShardMemberSelectionPolicy& policy) {
switch(policy) {
case ShardMemberSelectionPolicy::FirstMember:
stream << "FirstMember";
break;
case ShardMemberSelectionPolicy::LastMember:
stream << "LastMember";
break;
case ShardMemberSelectionPolicy::Random:
stream << "Random";
break;
case ShardMemberSelectionPolicy::FixedRandom:
stream << "FixedRandom";
break;
case ShardMemberSelectionPolicy::RoundRobin:
stream << "RoundRobin";
break;
case ShardMemberSelectionPolicy::KeyHashing:
stream << "KeyHashing";
break;
case ShardMemberSelectionPolicy::UserSpecified:
stream << "UserSpecified";
break;
case ShardMemberSelectionPolicy::InvalidPolicy:
default:
stream << "InvalidPolicy";
break;
}
return stream;
}


/**
* cpu/gpu list examples:
* cpu_cores = 0,1,2,3
* cpu_cores = 0,1-5,6,8
* cpu_cores = 0-15
Expand Down
38 changes: 19 additions & 19 deletions src/udl_zoo/python/python_udl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class PythonOCDPO: public DefaultOffCriticalDataPathObserver {
}
}

private:
private:
/* request type to python thread */
struct python_request_t {
enum {
Expand Down Expand Up @@ -274,7 +274,7 @@ class PythonOCDPO: public DefaultOffCriticalDataPathObserver {
res.sequence_num = req.sequence_num;

dbg_default_trace("{}:{} [PYTHON] Processing request (type:{} sequence:{})",
__FILE__,__LINE__,req.type,req.sequence_num);
__FILE__,__LINE__,fmt::underlying(req.type),req.sequence_num);

switch(req.type) {
case python_request_t::TERMINATE:
Expand Down Expand Up @@ -343,15 +343,15 @@ class PythonOCDPO: public DefaultOffCriticalDataPathObserver {
dbg_default_trace("{}:{} calling the handler.", __FILE__,__LINE__);
PyObject* ret = PyObject_Call(req.request.execute_ocdpo.handler_ptr,targs,kwargs);
if (ret == nullptr) {
dbg_default_error("Exception raised in user application. {}:{}",
dbg_default_error("Exception raised in user application. {}:{}",
__FILE__,__LINE__);
PyErr_Print();
res.success = false;
break;
} else {
Py_DECREF(ret);
}

Py_DECREF(kwargs);
#ifdef ENABLE_EVALUATION
Py_DECREF(py_message_id);
Expand All @@ -366,7 +366,7 @@ class PythonOCDPO: public DefaultOffCriticalDataPathObserver {
Py_DECREF(py_pathname);
Py_DECREF(py_sender);
Py_DECREF(targs);

res.success = true;
dbg_default_trace("{}:{} User processing function returned.", __FILE__,__LINE__);
}
Expand Down Expand Up @@ -408,9 +408,9 @@ class PythonOCDPO: public DefaultOffCriticalDataPathObserver {
if (conf_ptr->contains(PYUDL_CONF_ENTRY_CLASS)) {
std::string class_name = (*conf_ptr)[PYUDL_CONF_ENTRY_CLASS].get<std::string>();
dbg_default_trace("{}:{} create python handler object from class:{}",__FILE__,__LINE__,class_name);

// we assure py_module will be valid because STEP 2 succeeded.
auto py_module = PythonOCDPO::get_module(module_name.c_str());
auto py_module = PythonOCDPO::get_module(module_name.c_str());
auto entry_class_type = PyObject_GetAttrString(py_module,class_name.c_str());
if (entry_class_type == nullptr || !PyType_Check(entry_class_type)) {
dbg_default_error("Failed loading python udl entry class:{}.{}. {}:{}",
Expand All @@ -419,15 +419,15 @@ class PythonOCDPO: public DefaultOffCriticalDataPathObserver {
res.success = false;
break;
}

// test if entry_class_type is a subclass of Type UserDefinedLogical.
if (!PythonOCDPO::is_valid_observer_type(reinterpret_cast<PyTypeObject*>(entry_class_type))) {
dbg_default_error("Error: {} is not a subclass of derecho.cascade.udl.UserDefinedLogic. {}:{}",
class_name, __FILE__, __LINE__);
res.success = false;
break;
}

// create object
std::string conf_str = to_string(*conf_ptr);
auto conf_arg = Py_BuildValue("s",conf_str.c_str());
Expand All @@ -441,7 +441,7 @@ class PythonOCDPO: public DefaultOffCriticalDataPathObserver {
auto pargs = PyTuple_New(1);
PyTuple_SetItem(pargs,0,conf_arg);
python_ocdpo = PyObject_Call(entry_class_type,pargs,nullptr);

Py_DECREF(pargs);
// Py_DECREF(conf_arg); <-- don't do this: conf_arg has been 'stolen' by PyTuple_SetItem()
if (python_ocdpo == nullptr) {
Expand Down Expand Up @@ -473,15 +473,15 @@ class PythonOCDPO: public DefaultOffCriticalDataPathObserver {
break;
}
dbg_default_trace("{}:{} ocdpo handler method is created @{:p}", __FILE__,__LINE__,static_cast<void*>(python_ocdpo_handler));

res.ocdpo = std::make_shared<PythonOCDPO>(python_ocdpo,python_ocdpo_handler,dynamic_cast<DefaultCascadeContextType*>(ctxt));
res.success = true;
}
break;
}

dbg_default_trace("{}:{} [PYTHON] Finished processing request (type:{} sequence:{}), response.success={}",
__FILE__,__LINE__,req.type,req.sequence_num,res.success);
__FILE__,__LINE__,fmt::underlying(req.type),req.sequence_num,res.success);

// notification
std::unique_lock res_lock(python_response_mutex);
Expand Down Expand Up @@ -525,7 +525,7 @@ class PythonOCDPO: public DefaultOffCriticalDataPathObserver {
std::unique_lock<std::mutex> req_lock(python_request_mutex);
request.sequence_num = python_request_sequence_number++;
dbg_default_trace("{}:{} posting request (type:{} seq:{})",
__FILE__,__LINE__,request.type,request.sequence_num);
__FILE__,__LINE__,fmt::underlying(request.type),request.sequence_num);
python_request_queue.emplace(request);
req_lock.unlock();
python_request_cv.notify_one();
Expand All @@ -543,7 +543,7 @@ class PythonOCDPO: public DefaultOffCriticalDataPathObserver {
python_response_queue.pop();
res_lock.unlock();
dbg_default_trace("{}:{} request(type:{} seq:{}/{}) is responsed.",
__FILE__,__LINE__, request.type, request.sequence_num, response.sequence_num);
__FILE__,__LINE__, fmt::underlying(request.type), request.sequence_num, response.sequence_num);
return response;
}

Expand Down Expand Up @@ -773,7 +773,7 @@ class PythonOCDPO: public DefaultOffCriticalDataPathObserver {
,message_id
#endif
,blob_wrapper);

Py_RETURN_NONE;
}

Expand Down Expand Up @@ -819,14 +819,14 @@ PyModuleDef PythonOCDPO::context_module = {
nullptr, nullptr, nullptr, nullptr
};

/*
* This will only be called once
/*
* This will only be called once
*/
void initialize(ICascadeContext* ctxt) {
PythonOCDPO::initialize();
}

/*
/*
* This will be called for each UDL(PythonOCDPO) instance.
*/
std::shared_ptr<OffCriticalDataPathObserver> get_observer(
Expand Down