Skip to content
This repository was archived by the owner on Jan 26, 2026. It is now read-only.

Commit ee0bb9e

Browse files
committed
async execution in speparate thread
1 parent c8e106b commit ee0bb9e

File tree

15 files changed

+59
-44
lines changed

15 files changed

+59
-44
lines changed

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ find_package(pybind11 CONFIG)
2020
find_package(MPI REQUIRED)
2121
#find_package(OpenMP)
2222

23-
set(MKL_LIBRARIES -L$ENV{MKLROOT}/lib -lmkl_intel_lp64 -lmkl_intel_thread -lmkl_core -liomp5 -lpthread -lrt -ldl -lm)
23+
set(MKL_LIBRARIES -L$ENV{MKLROOT}/lib -lmkl_intel_lp64 -lmkl_intel_thread -lmkl_core -liomp5 -ltbb -lpthread -lrt -ldl -lm)
2424
#set(CMAKE_INSTALL_RPATH $ENV{MKLROOT}/lib)
2525
# Use -fPIC even if statically compiled
2626
set(CMAKE_POSITION_INDEPENDENT_CODE ON)

ddptensor/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@
2525
UINT32 as uint32,
2626
UINT16 as uint16,
2727
UINT8 as uint8,
28-
fini
28+
fini,
29+
sync
2930
)
3031
from .ddptensor import dtensor
3132
from os import getenv

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def build_cmake(self, ext):
2929
extdir.parent.mkdir(parents=True, exist_ok=True)
3030

3131
# example of cmake args
32-
config = 'Debug'# if self.debug else 'Release'
32+
config = 'Debug' if self.debug else 'Release' #'RelWithDebInfo'
3333
cmake_args = [
3434
'-DCMAKE_LIBRARY_OUTPUT_DIRECTORY=' + str(extdir.parent.absolute()),
3535
'-DCMAKE_BUILD_TYPE=' + config

src/Creator.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ namespace x {
4343

4444
static ptr_type op(uint64_t start, uint64_t end, uint64_t step)
4545
{
46-
PVSlice pvslice({Slice(start, end, step).size()});
46+
PVSlice pvslice({static_cast<uint64_t>(Slice(start, end, step).size())});
4747
auto lslc = pvslice.slice_of_rank();
4848
const auto & l1dslc = lslc.dim(0);
4949
auto a = xt::arange<T>(start + l1dslc._start*step, start + l1dslc._end * step, l1dslc._step);

src/Deferred.cpp

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,40 @@
11
#include "include/ddptensor/Deferred.hpp"
2-
#include <queue>
2+
#include <oneapi/tbb/concurrent_queue.h>
33

4-
static std::queue<Deferred::ptr_type> _deferred;
4+
static tbb::concurrent_bounded_queue<Deferred::ptr_type> _deferred;
55

66
Deferred::future_type Deferred::defer(Deferred::ptr_type && d)
77
{
8-
//auto f = d->get_future();
8+
auto f = d ? d->get_future() : Deferred::future_type();
99
_deferred.push(std::move(d));
10-
// return f;
11-
auto aa = Deferred::undefer_next();
10+
return f;
11+
/* auto aa = Deferred::undefer_next();
1212
aa->run();
13-
return aa->get_future();
13+
return aa->get_future(); */
1414
}
1515

1616
Deferred::ptr_type Deferred::undefer_next()
1717
{
18-
auto r = std::move(_deferred.front());
19-
_deferred.pop();
18+
Deferred::ptr_type r;
19+
_deferred.pop(r);
2020
return r;
2121
}
22+
23+
void process_promises()
24+
{
25+
while(true) {
26+
Deferred::ptr_type d;
27+
_deferred.pop(d);
28+
// auto d = std::move(Deferred::undefer_next());
29+
if(d) d->run();
30+
else break;
31+
d.reset();
32+
}
33+
}
34+
35+
void sync()
36+
{
37+
while(!_deferred.empty()) {
38+
std::this_thread::sleep_for(std::chrono::milliseconds(1));
39+
}
40+
}

src/MPIMediator.cpp

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,9 @@ uint64_t MPIMediator::register_array(tensor_i::ptr_type ary)
5353
return s_last_id;
5454
}
5555

56-
uint64_t MPIMediator::unregister_array(uint64_t id)
56+
void MPIMediator::unregister_array(uint64_t id)
5757
{
58+
locker _l(ak_mutex);
5859
s_ak.erase(id);
5960
}
6061

@@ -72,7 +73,6 @@ void MPIMediator::pull(rank_type from, const tensor_i & ary, const NDSlice & sli
7273
ser.adapter().flush();
7374

7475
auto sz = slice.size() * ary.item_size();
75-
std::cerr << "alsdkjf " << sz << " " << buff.size() << " " << rbuff << std::endl;
7676
MPI_Irecv(rbuff, sz, MPI_CHAR, from, PUSH_TAG, comm, &request[1]);
7777
MPI_Isend(buff.data(), buff.size(), MPI_CHAR, from, PULL_TAG, comm, &request[0]);
7878
auto error_code = MPI_Waitall(2, &request[0], &status[0]);
@@ -123,12 +123,16 @@ void MPIMediator::listen()
123123
MPI_Irecv(buff.data(), buff.size(), MPI_CHAR, MPI_ANY_SOURCE, PULL_TAG, comm, &request_in);
124124

125125
// Now find the array in question and send back its bufferized slice
126-
locker _l(ak_mutex);
127-
auto x = s_ak.find(id);
128-
if(x == s_ak.end()) throw(std::runtime_error("Encountered pull request for unknown tensor."));
126+
array_keeper_type::iterator x;
127+
tensor_i::ptr_type ptr;
128+
{
129+
locker _l(ak_mutex);
130+
x = s_ak.find(id);
131+
if(x == s_ak.end()) throw(std::runtime_error("Encountered pull request for unknown tensor."));
132+
ptr = x->second.lock();
133+
}
129134
// Wait for previous answer to complete so that we can re-use the buffer
130135
MPI_Wait(&request_out, MPI_STATUS_IGNORE);
131-
auto ptr = x->second.lock();
132136
ptr->bufferize(slice, rbuff);
133137
if(slice.size() * ptr->item_size() != rbuff.size()) throw(std::runtime_error("Got unexpected buffer size."));
134138
MPI_Isend(rbuff.data(), rbuff.size(), MPI_CHAR, requester, PUSH_TAG, comm, &request_out);

src/Random.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@ struct DeferredRandomOp : public Deferred
3636
switch(_dtype) {
3737
case FLOAT64:
3838
set_value(x::Rand<double>::op(_shape, _lower, _upper));
39+
return;
3940
case FLOAT32:
4041
set_value(x::Rand<float>::op(_shape, _lower, _upper));
42+
return;
4143
}
4244
throw std::runtime_error("rand: dtype must be a floating point type");
4345
}

src/SetGetItem.cpp

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,45 +32,33 @@ namespace x {
3232
// (DPTensorX<T> & dest, const NDSlice & dest_slice, const DPTensorX<U> & val, const NDSlice & val_slice)
3333
{
3434
// const PVSlice & org_slice = dest.slice();
35-
std::cerr << "_set_slice " << org_slice << " " << dest_slice << " " << val->slice() << " " << val_slice << std::endl;
3635
auto nd = org_slice.ndims();
37-
// if(dest.owner() == REPLICATED && nd > 0)
38-
// std::cerr << "Warning: __setitem__ on replicated data updates local tile only" << std::endl;
3936
if(nd != dest_slice.ndims())
4037
throw std::runtime_error("Index dimensionality must match array dimensionality");
4138
if(val_slice.size() != dest_slice.size())
4239
throw std::runtime_error("Input and output slices must be of same size");
4340

4441
// Use given slice to create a global view into orig array
4542
PVSlice g_slc_view(org_slice, dest_slice);
46-
std::cerr << "g_slice: " << g_slc_view.slice() << std::endl;
4743
// Create a view into val
4844
PVSlice needed_val_view(val->slice(), val_slice);
49-
std::cerr << "needed_val_view: " << needed_val_view.slice() << " (was " << val->slice().slice() << ")" << std::endl;
5045

5146
// we can now compute which ranks actually hold which piece of the data from val that we need locally
5247
for(rank_type i=0; i<theTransceiver->nranks(); ++i ) {
5348
// get local view into val
5449
PVSlice val_local_view(val->slice(), i);
55-
std::cerr << i << " val_local_view: " << val_local_view.slice() << std::endl;
5650
NDSlice curr_needed_val_slice = needed_val_view.slice_of_rank(i);
57-
std::cerr << i << " curr_needed_val_slice: " << curr_needed_val_slice << std::endl;
5851
NDSlice curr_local_val_slice = val_local_view.map_slice(curr_needed_val_slice);
59-
std::cerr << i << " curr_local_val_slice: " << curr_local_val_slice << std::endl;
6052
NDSlice curr_needed_norm_slice = needed_val_view.map_slice(curr_needed_val_slice);
61-
std::cerr << i << " curr_needed_norm_slice: " << curr_needed_norm_slice << std::endl;
6253
PVSlice my_curr_needed_view = PVSlice(g_slc_view, curr_needed_norm_slice);
63-
std::cerr << i << " my_curr_needed_slice: " << my_curr_needed_view.slice() << std::endl;
6454
NDSlice my_curr_local_slice = my_curr_needed_view.local_slice_of_rank(theTransceiver->rank());
65-
std::cerr << i << " my_curr_local_slice: " << my_curr_local_slice << std::endl;
55+
6656
if(curr_needed_norm_slice.size()) {
6757
py::tuple tpl = _make_tuple(my_curr_local_slice); //my_curr_view.slice());
6858
if(i == theTransceiver->rank()) {
6959
// copy locally
70-
std::cerr << "local copy\n";
7160
auto to_v = xt::strided_view(dest/*.xarray()*/, to_xt(my_curr_local_slice));
7261
auto from_v = xt::strided_view(val->xarray(), to_xt(curr_local_val_slice));
73-
std::cerr << "to: " << to_v << std::endl << "from: " << from_v << std::endl;
7462
to_v = from_v;
7563
} else {
7664
// pull slice directly into new array
@@ -92,13 +80,9 @@ namespace x {
9280
{
9381
// Use given slice to create a global view into orig array
9482
PVSlice g_slc_view(a_ptr->slice(), slice);
95-
std::cerr << "g_slice: " << g_slc_view.slice() << std::endl;
9683
NDSlice my_slice = g_slc_view.slice_of_rank();
97-
std::cerr << "my_slice: " << my_slice << std::endl;
9884
NDSlice my_norm_slice = g_slc_view.map_slice(my_slice);
99-
std::cerr << "my_norm_slice: " << my_norm_slice << std::endl;
10085
NDSlice my_rel_slice = a_ptr->slice().map_slice(my_slice);
101-
std::cerr << "my_rel_slice: " << my_rel_slice << std::endl;
10286

10387
theTransceiver->barrier();
10488
_set_slice<A>(a_ptr->xarray(), a_ptr->slice(),

src/ddptensor.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ using namespace pybind11::literals; // to bring _a
2121

2222
#include "ddptensor/MPITransceiver.hpp"
2323
#include "ddptensor/MPIMediator.hpp"
24+
#include "ddptensor/Deferred.hpp"
2425
#include "ddptensor/Creator.hpp"
2526
#include "ddptensor/IEWBinOp.hpp"
2627
#include "ddptensor/EWBinOp.hpp"
@@ -41,10 +42,14 @@ rank_type myrank()
4142

4243
Transceiver * theTransceiver = nullptr;
4344
Mediator * theMediator = nullptr;
45+
std::thread * pprocessor;
4446

4547
// users currently need to call fini to make MPI terminate gracefully
4648
void fini()
4749
{
50+
Deferred::defer(nullptr);
51+
pprocessor->join();
52+
delete pprocessor;
4853
delete theMediator;
4954
theMediator = nullptr;
5055
delete theTransceiver;
@@ -56,12 +61,14 @@ void fini()
5661
PYBIND11_MODULE(_ddptensor, m) {
5762
theTransceiver = new MPITransceiver();
5863
theMediator = new MPIMediator();
64+
pprocessor = new std::thread(process_promises);
5965

6066
m.doc() = "A partitioned and distributed tensor";
6167

6268
def_enums(m);
6369

6470
m.def("fini", &fini)
71+
.def("sync", &sync)
6572
.def("myrank", &myrank)
6673
.def("_get_slice", &GetItem::get_slice)
6774
.def("_get_local", &GetItem::get_local);

src/include/ddptensor/Deferred.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,6 @@ struct UnDeferred : public Deferred
3737
{
3838
}
3939
};
40+
41+
extern void process_promises();
42+
extern void sync();

0 commit comments

Comments
 (0)