11// SPDX-License-Identifier: BSD-3-Clause
22
3+ /*
4+ Creation/destruction of Deferreds.
5+ Implementation of worker loop processing deferred objects.
6+ This worker loop is executed in a separate thread until the system
7+ gets shut down.
8+ */
9+
310#include " include/ddptensor/Deferred.hpp"
411#include " include/ddptensor/Mediator.hpp"
512#include " include/ddptensor/Registry.hpp"
1421#include < iostream>
1522#include < unordered_set>
1623
24+ // thread-safe FIFO queue holding deferred objects
1725static tbb::concurrent_bounded_queue<Runable::ptr_type> _deferred;
1826
27+ // add a deferred object to the queue
1928void push_runable (Runable::ptr_type &&r) { _deferred.push (std::move (r)); }
2029
30+ // if needed, object/promise is broadcasted to worker processes
31+ // (for controller/worker mode)
2132void _dist (const Runable *p) {
2233 if (getTransceiver ()->is_cw () && getTransceiver ()->rank () == 0 )
2334 getMediator ()->to_workers (p);
2435}
2536
37+ // create a enriched future
2638Deferred::future_type Deferred::get_future () {
2739 return {std::move (promise_type::get_future ().share ()), _guid, _dtype, _rank,
2840 _balanced};
2941}
3042
43+ // defer a tensor-producing computation by adding it to the queue.
44+ // return a future for the resulting tensor.
45+ // set is_global to false if result is a local temporary which does not need a
46+ // guid
3147Deferred::future_type defer_tensor (Runable::ptr_type &&_d, bool is_global) {
3248 Deferred *d = dynamic_cast <Deferred *>(_d.get ());
3349 if (!d)
@@ -42,6 +58,7 @@ Deferred::future_type defer_tensor(Runable::ptr_type &&_d, bool is_global) {
4258 return f;
4359}
4460
61+ // defer a global tensor producer
4562void Deferred::defer (Runable::ptr_type &&p) {
4663 defer_tensor (std::move (p), true );
4764}
@@ -50,6 +67,15 @@ void Runable::defer(Runable::ptr_type &&p) { push_runable(std::move(p)); }
5067
5168void Runable::fini () { _deferred.clear (); }
5269
70+ // process promises as they arrive through calls to defer
71+ // This is run in a separate thread until shutdon is requested.
72+ // Shutdown is indicated by a Deferred object which evaluates to false.
73+ // The loop repeatedly creates MLIR functions for jit-compilation by letting
74+ // Deferred objects add their MLIR code until an object can not produce MLIR
75+ // but wants immediate execution (indicated by generate_mlir returning true).
76+ // When execution is needed, the function signature (input args, return
77+ // statement) is finalized, the function gets compiled and executed. The loop
78+ // completes by calling run() on the requesting object.
5379void process_promises () {
5480 bool done = false ;
5581 jit::JIT jit;
0 commit comments