Skip to content
This repository was archived by the owner on Jan 26, 2026. It is now read-only.

Commit f68fab8

Browse files
committed
CW initial commit of controller-worker (CW) mode
1 parent c1aef79 commit f68fab8

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1361
-673
lines changed

ddptensor/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
UINT32 as uint32,
2626
UINT16 as uint16,
2727
UINT8 as uint8,
28+
BOOL as bool,
29+
init,
2830
fini,
2931
sync
3032
)
@@ -55,7 +57,7 @@
5557
)
5658
elif func == "full":
5759
exec(
58-
f"{func} = lambda shape, val, dtype: dtensor(_cdt.Creator.full(_cdt.{FUNC}, shape, val, dtype))"
60+
f"{func} = lambda shape, val, dtype: dtensor(_cdt.Creator.full(shape, val, dtype))"
5961
)
6062
elif func == "arange":
6163
exec(

ddptensor/ddptensor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,4 @@ def __getitem__(self, *args):
5454
return dtensor(self._t.__getitem__(*args))
5555

5656
def __setitem__(self, key, value):
57-
x = self._t.__setitem__(key, value._t) # if isinstance(value, dtensor) else value)
57+
self._t = self._t.__setitem__(key, value._t) # if isinstance(value, dtensor) else value)

scripts/code_gen.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
namespace py = pybind11;
1818
""")
1919

20-
prev = "0"
20+
prev = 0
2121
for cat, lst in api.api_categories.items():
2222
print(f"enum {cat}Id : int {{")
2323
for x in lst:
@@ -28,7 +28,6 @@
2828
print("};\n")
2929

3030
print("static void def_enums(py::module_ & m)\n{")
31-
3231
for cat, lst in api.api_categories.items():
3332
print(f' py::enum_<{cat}Id>(m, "{cat}Id")')
3433
for x in lst:

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def build_cmake(self, ext):
2929
extdir.parent.mkdir(parents=True, exist_ok=True)
3030

3131
# example of cmake args
32-
config = 'Debug' if self.debug else 'Release' # 'RelWithDebInfo' #'Release'
32+
config = 'Debug' # if self.debug else 'Release' # 'RelWithDebInfo' #'Release'
3333
cmake_args = [
3434
'-DCMAKE_LIBRARY_OUTPUT_DIRECTORY=' + str(extdir.parent.absolute()),
3535
'-DCMAKE_BUILD_TYPE=' + config

src/Creator.cpp

Lines changed: 80 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include "ddptensor/TypeDispatch.hpp"
33
#include "ddptensor/x.hpp"
44
#include "ddptensor/Deferred.hpp"
5+
#include "ddptensor/Factory.hpp"
56

67
namespace x {
78

@@ -15,7 +16,7 @@ namespace x {
1516
static ptr_type op(CreatorId c, const shape_type & shp)
1617
{
1718
PVSlice pvslice(shp);
18-
shape_type shape(std::move(pvslice.shape_of_rank()));
19+
shape_type shape(std::move(pvslice.tile_shape()));
1920
switch(c) {
2021
case EMPTY:
2122
return operatorx<T>::mk_tx(std::move(pvslice), std::move(xt::empty<T>(std::move(shape))));
@@ -28,14 +29,19 @@ namespace x {
2829
};
2930
};
3031

31-
template<typename V>
32-
static ptr_type op(CreatorId c, const shape_type & shp, const V & v)
32+
static ptr_type op(CreatorId c, const shape_type & shp, PyScalar v)
3333
{
34+
T val;
35+
if constexpr (std::is_integral<T>::value) val = static_cast<T>(v._int);
36+
else if constexpr (std::is_floating_point<T>::value) val = static_cast<T>(v._float);
3437
if(c == FULL) {
38+
if(VPROD(shp) <= 1) {
39+
return operatorx<T>::mk_tx(val, REPLICATED);
40+
}
3541
PVSlice pvslice(shp);
36-
shape_type shape(std::move(pvslice.shape_of_rank()));
42+
shape_type shape(std::move(pvslice.tile_shape()));
3743
auto a = xt::empty<T>(std::move(shape));
38-
a.fill(to_native<T>(v));
44+
a.fill(val);
3945
return operatorx<T>::mk_tx(std::move(pvslice), std::move(a));
4046
}
4147
throw std::runtime_error("Unknown creator");
@@ -47,24 +53,39 @@ namespace x {
4753
auto lslc = pvslice.slice_of_rank();
4854
const auto & l1dslc = lslc.dim(0);
4955
auto a = xt::arange<T>(start + l1dslc._start*step, start + l1dslc._end * step, l1dslc._step);
50-
return operatorx<T>::mk_tx(std::move(pvslice), std::move(a));
56+
auto r = operatorx<T>::mk_tx(std::move(pvslice), std::move(a));
57+
return r;
5158
}
5259
}; // class creatorx
5360
} // namespace x
5461

5562
struct DeferredFromShape : public Deferred
5663
{
57-
CreatorId _op;
5864
shape_type _shape;
5965
DTypeId _dtype;
66+
CreatorId _op;
6067

68+
DeferredFromShape() = default;
6169
DeferredFromShape(CreatorId op, const shape_type & shape, DTypeId dtype)
62-
: _op(op), _shape(shape), _dtype(dtype)
70+
: _shape(shape), _dtype(dtype), _op(op)
6371
{}
6472

6573
void run()
6674
{
67-
set_value(TypeDispatch<x::Creator>(_dtype, _op, _shape));
75+
set_value(std::move(TypeDispatch<x::Creator>(_dtype, _op, _shape)));
76+
}
77+
78+
FactoryId factory() const
79+
{
80+
return F_FROMSHAPE;
81+
}
82+
83+
template<typename S>
84+
void serialize(S & ser)
85+
{
86+
ser.template container<sizeof(shape_type::value_type)>(_shape, 8);
87+
ser.template value<sizeof(_dtype)>(_dtype);
88+
ser.template value<sizeof(_op)>(_op);
6889
}
6990
};
7091

@@ -76,30 +97,46 @@ tensor_i::future_type Creator::create_from_shape(CreatorId op, const shape_type
7697
struct DeferredFull : public Deferred
7798
{
7899
shape_type _shape;
79-
const py::object & _val;
100+
PyScalar _val;
80101
DTypeId _dtype;
81102

82-
DeferredFull(const shape_type & shape, const py::object & val, DTypeId dtype)
103+
DeferredFull() = default;
104+
DeferredFull(const shape_type & shape, PyScalar val, DTypeId dtype)
83105
: _shape(shape), _val(val), _dtype(dtype)
84106
{}
85107

86108
void run()
87109
{
88110
auto op = FULL;
89-
set_value(TypeDispatch<x::Creator>(_dtype, op, _shape, _val));
111+
set_value(std::move(TypeDispatch<x::Creator>(_dtype, op, _shape, _val)));
112+
}
113+
114+
FactoryId factory() const
115+
{
116+
return F_FULL;
117+
}
118+
119+
template<typename S>
120+
void serialize(S & ser)
121+
{
122+
ser.template container<sizeof(shape_type::value_type)>(_shape, 8);
123+
ser.template value<sizeof(_val)>(_val._int);
124+
ser.template value<sizeof(_dtype)>(_dtype);
90125
}
91126
};
92127

93128
tensor_i::future_type Creator::full(const shape_type & shape, const py::object & val, DTypeId dtype)
94129
{
95-
return defer<DeferredFull>(shape, val, dtype);
130+
auto v = mk_scalar(val, dtype);
131+
return defer<DeferredFull>(shape, v, dtype);
96132
}
97133

98134
struct DeferredArange : public Deferred
99135
{
100136
uint64_t _start, _end, _step;
101137
DTypeId _dtype;
102138

139+
DeferredArange() = default;
103140
DeferredArange(uint64_t start, uint64_t end, uint64_t step, DTypeId dtype)
104141
: _start(start), _end(end), _step(step), _dtype(dtype)
105142
{}
@@ -108,9 +145,39 @@ struct DeferredArange : public Deferred
108145
{
109146
set_value(std::move(TypeDispatch<x::Creator>(_dtype, _start, _end, _step)));
110147
};
148+
149+
FactoryId factory() const
150+
{
151+
return F_ARANGE;
152+
}
153+
154+
template<typename S>
155+
void serialize(S & ser)
156+
{
157+
ser.template value<sizeof(_start)>(_start);
158+
ser.template value<sizeof(_end)>(_end);
159+
ser.template value<sizeof(_step)>(_step);
160+
ser.template value<sizeof(_dtype)>(_dtype);
161+
}
111162
};
112163

113164
tensor_i::future_type Creator::arange(uint64_t start, uint64_t end, uint64_t step, DTypeId dtype)
114165
{
115166
return defer<DeferredArange>(start, end, step, dtype);
116167
}
168+
169+
tensor_i::future_type Creator::mk_future(const py::object & b)
170+
{
171+
if(py::isinstance<tensor_i::future_type>(b)) {
172+
return b.cast<tensor_i::future_type>();
173+
} else if(py::isinstance<py::float_>(b)) {
174+
return Creator::full({1}, b, FLOAT64);
175+
} else if(py::isinstance<py::int_>(b)) {
176+
return Creator::full({1}, b, INT64);
177+
}
178+
throw std::runtime_error("Invalid right operand to elementwise binary operation");
179+
};
180+
181+
FACTORY_INIT(DeferredFromShape, F_FROMSHAPE);
182+
FACTORY_INIT(DeferredFull, F_FULL);
183+
FACTORY_INIT(DeferredArange, F_ARANGE);

src/Deferred.cpp

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,33 @@
1-
#include "include/ddptensor/Deferred.hpp"
21
#include <oneapi/tbb/concurrent_queue.h>
2+
#include "include/ddptensor/Deferred.hpp"
3+
#include "include/ddptensor/Transceiver.hpp"
4+
#include "include/ddptensor/Mediator.hpp"
5+
#include "include/ddptensor/Registry.hpp"
36

47
static tbb::concurrent_bounded_queue<Deferred::ptr_type> _deferred;
58

6-
Deferred::future_type Deferred::defer(Deferred::ptr_type && d)
9+
Deferred::future_type Deferred::get_future()
10+
{
11+
return {std::move(tensor_i::promise_type::get_future()), _guid};
12+
}
13+
14+
void Deferred::set_value(tensor_i::ptr_type && v)
715
{
16+
if(_guid != Registry::NOGUID) {
17+
Registry::put(_guid, v);
18+
}
19+
tensor_i::promise_type::set_value(std::forward<tensor_i::ptr_type>(v));
20+
}
21+
22+
Deferred::future_type Deferred::defer(Deferred::ptr_type && d, bool is_global)
23+
{
24+
if(is_global) {
25+
if(is_cw() && theTransceiver->rank() == 0) theMediator->to_workers(d);
26+
if(d) d->_guid = Registry::get_guid();
27+
}
828
auto f = d ? d->get_future() : Deferred::future_type();
929
_deferred.push(std::move(d));
1030
return f;
11-
/* auto aa = Deferred::undefer_next();
12-
aa->run();
13-
return aa->get_future(); */
1431
}
1532

1633
Deferred::ptr_type Deferred::undefer_next()
@@ -25,7 +42,7 @@ void process_promises()
2542
while(true) {
2643
Deferred::ptr_type d;
2744
_deferred.pop(d);
28-
// auto d = std::move(Deferred::undefer_next());
45+
std::cerr << "Executing something" << std::endl;
2946
if(d) d->run();
3047
else break;
3148
d.reset();
@@ -34,6 +51,7 @@ void process_promises()
3451

3552
void sync()
3653
{
54+
// FIXME this does not wait for the last deferred to complete
3755
while(!_deferred.empty()) {
3856
std::this_thread::sleep_for(std::chrono::milliseconds(1));
3957
}

src/EWBinOp.cpp

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22
#include "ddptensor/LinAlgOp.hpp"
33
#include "ddptensor/TypeDispatch.hpp"
44
#include "ddptensor/x.hpp"
5+
#include "ddptensor/Factory.hpp"
6+
#include "ddptensor/Registry.hpp"
7+
#include "ddptensor/Creator.hpp"
58

69
namespace x {
710

@@ -135,27 +138,43 @@ namespace x {
135138

136139
struct DeferredEWBinOp : public Deferred
137140
{
138-
tensor_i::future_type _a;
139-
tensor_i::future_type _b;
141+
id_type _a;
142+
id_type _b;
140143
EWBinOpId _op;
141144

145+
DeferredEWBinOp() = default;
142146
DeferredEWBinOp(EWBinOpId op, const tensor_i::future_type & a, const tensor_i::future_type & b)
143-
: _a(a), _b(b), _op(op)
147+
: _a(a.id()), _b(b.id()), _op(op)
144148
{}
145149

146150
void run()
147151
{
148-
const auto a = std::move(_a.get());
149-
const auto b = std::move(_b.get());
152+
const auto a = std::move(Registry::get(_a));
153+
const auto b = std::move(Registry::get(_b));
150154
set_value(std::move(TypeDispatch<x::EWBinOp>(a, b, _op)));
151155
}
156+
157+
FactoryId factory() const
158+
{
159+
return F_EWBINOP;
160+
}
161+
162+
template<typename S>
163+
void serialize(S & ser)
164+
{
165+
ser.template value<sizeof(_a)>(_a);
166+
ser.template value<sizeof(_b)>(_b);
167+
ser.template value<sizeof(_op)>(_op);
168+
}
152169
};
153170

154171
tensor_i::future_type EWBinOp::op(EWBinOpId op, const tensor_i::future_type & a, const py::object & b)
155172
{
156-
auto bb = x::mk_ftx(b);
173+
auto bb = Creator::mk_future(b);
157174
if(op == __MATMUL__) {
158175
return LinAlgOp::vecdot(a, bb, 0);
159176
}
160177
return defer<DeferredEWBinOp>(op, a, bb);
161178
}
179+
180+
FACTORY_INIT(DeferredEWBinOp, F_EWBINOP);

src/EWUnyOp.cpp

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "ddptensor/EWUnyOp.hpp"
22
#include "ddptensor/TypeDispatch.hpp"
33
#include "ddptensor/x.hpp"
4+
#include "ddptensor/Factory.hpp"
45

56
namespace x {
67

@@ -109,22 +110,37 @@ namespace x {
109110
} //namespace x
110111

111112
struct DeferredEWUnyOp : public Deferred
112-
{
113-
tensor_i::future_type _a;
113+
{
114+
id_type _a;
114115
EWUnyOpId _op;
115116

117+
DeferredEWUnyOp() = default;
116118
DeferredEWUnyOp(EWUnyOpId op, const tensor_i::future_type & a)
117-
: _a(a), _op(op)
119+
: _a(a.id()), _op(op)
118120
{}
119121

120122
void run()
121123
{
122-
const auto a = std::move(_a.get());
124+
const auto a = std::move(Registry::get(_a));
123125
set_value(std::move(TypeDispatch<x::EWUnyOp>(a, _op)));
124126
}
127+
128+
FactoryId factory() const
129+
{
130+
return F_EWUNYOP;
131+
}
132+
133+
template<typename S>
134+
void serialize(S & ser)
135+
{
136+
ser.template value<sizeof(_a)>(_a);
137+
ser.template value<sizeof(_op)>(_op);
138+
}
125139
};
126140

127141
tensor_i::future_type EWUnyOp::op(EWUnyOpId op, const tensor_i::future_type & a)
128142
{
129143
return defer<DeferredEWUnyOp>(op, a);
130144
}
145+
146+
FACTORY_INIT(DeferredEWUnyOp, F_EWUNYOP);

0 commit comments

Comments
 (0)