Skip to content

Commit cf41c71

Browse files
committed
Memory efficiency: progressive eviction of metadata transaction entries
1 parent 424ab3b commit cf41c71

9 files changed

Lines changed: 137 additions & 23 deletions

File tree

ChangeLog

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,24 @@
11
----------------------------------------------------------------------------
22

3+
DTLMod (0.5) not released yet (target: May 2026)
4+
5+
Improvements:
6+
- Memory efficiency: progressive eviction of metadata transaction entries
7+
- Once all subscribers have consumed a transaction, its entries are evicted
8+
from the in-memory Metadata::transaction_infos_ map
9+
- When metadata export is enabled and publishers and subscribers coexist
10+
(file streaming), evicted entries are progressively flushed to
11+
per-variable temporary files; the final metadata file is assembled at
12+
pub_close() from those files and any remaining in-memory entries,
13+
preserving the existing format and transaction count
14+
- When the stream is opened by subscribers only after all publishers have
15+
closed (sequential scenario), memory-only eviction is performed (the
16+
metadata file has already been written by pub_close())
17+
- Memory footprint of the File engine now grows as O(N_pub) instead of
18+
O(N_pub × N_transactions) for long-running concurrent streaming workloads
19+
20+
----------------------------------------------------------------------------
21+
322
DTLMod (0.4) February 16, 2026
423

524
Major improvements:

include/dtlmod/Engine.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ class Engine {
5555
std::shared_ptr<Transport> transport_ = nullptr;
5656
std::weak_ptr<Stream> stream_;
5757

58+
bool pub_ever_present_ = false;
59+
5860
ActorRegistry publishers_;
5961
ActorRegistry subscribers_;
6062

@@ -86,6 +88,8 @@ class Engine {
8688
[[nodiscard]] ActorRegistry& get_subscribers() noexcept { return subscribers_; }
8789
[[nodiscard]] const ActorRegistry& get_subscribers() const noexcept { return subscribers_; }
8890

91+
[[nodiscard]] bool pub_ever_present() const noexcept { return pub_ever_present_; }
92+
8993
// Pure virtual methods for derived classes to implement
9094
virtual void create_transport(const Transport::Method& transport_method) = 0;
9195
virtual void begin_pub_transaction() = 0;

include/dtlmod/FileEngine.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ class FileEngine : public Engine {
3939

4040
unsigned int current_sub_transaction_id_ = 0;
4141
bool sub_transaction_in_progress_ = false;
42+
unsigned int subs_completed_current_tx_ = 0;
4243

4344
void create_transport(const Transport::Method& transport_method) override;
4445
[[nodiscard]] const std::shared_ptr<sgfs::FileSystem>& get_file_system() const noexcept { return file_system_; }

include/dtlmod/Metadata.hpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
#ifndef __DTLMOD_METADATA_HPP__
77
#define __DTLMOD_METADATA_HPP__
88

9+
#include <fstream>
10+
911
#include <fsmod/File.hpp>
1012
#include <simgrid/s4u/Actor.hpp>
1113

@@ -29,6 +31,8 @@ class Metadata {
2931
std::less<>>
3032
transaction_infos_;
3133

34+
unsigned int flushed_count_ = 0; // number of transactions already flushed to the prog file
35+
3236
protected:
3337
const std::map<std::pair<std::vector<size_t>, std::vector<size_t>>, std::pair<std::string, sg4::ActorPtr>,
3438
std::less<>>&
@@ -45,7 +49,12 @@ class Metadata {
4549
{
4650
return transaction_infos_.empty() ? 0 : (transaction_infos_.rbegin())->first;
4751
}
48-
void export_to_file(std::ofstream& ostream) const;
52+
// Write entries for tx_id to out, increment flushed_count_, erase from transaction_infos_
53+
void write_transaction_to_stream(unsigned int tx_id, std::ofstream& out);
54+
// Remove tx_id from transaction_infos_ without writing to file
55+
void evict_transaction(unsigned int tx_id);
56+
// Write all remaining transactions; prog_file_path contains already-flushed entries to prepend
57+
void export_to_file(std::ofstream& ostream, const std::string& prog_file_path = "") const;
4958
};
5059
/// \endcond
5160

include/dtlmod/Stream.hpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ class Stream : public std::enable_shared_from_this<Stream> {
4747
Transport::Method transport_method_ = Transport::Method::Undefined;
4848
bool metadata_export_ = false;
4949
std::string metadata_file_;
50+
std::unordered_map<std::string, std::string> var_prog_file_paths_; // variable name -> prog file path
51+
bool metadata_exported_ = false; // true once export_metadata_to_file() has been called
5052
sg4::MutexPtr mutex_ = sg4::Mutex::create();
5153
Mode access_mode_ = Mode::Publish;
5254

@@ -63,7 +65,8 @@ class Stream : public std::enable_shared_from_this<Stream> {
6365
}
6466
void close() noexcept { engine_ = nullptr; }
6567

66-
void export_metadata_to_file() const;
68+
void export_metadata_to_file();
69+
void flush_and_evict_transaction(unsigned int tx_id);
6770

6871
// Helper methods for Stream::open
6972
void validate_open_parameters(std::string_view name, Mode mode) const;

src/Engine.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ void Engine::close()
9393
/// \cond EXCLUDE_FROM_DOCUMENTATION
9494
void Engine::add_publisher(sg4::ActorPtr actor)
9595
{
96+
pub_ever_present_ = true;
9697
transport_->add_publisher(publishers_.count());
9798
publishers_.add(actor);
9899
}

src/FileEngine.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,22 @@ void FileEngine::end_sub_transaction()
224224
if (auto sub_barrier = get_subscribers().get_or_create_barrier())
225225
XBT_DEBUG("Barrier created for %zu subscribers", get_subscribers().count());
226226

227+
// Evict this transaction's metadata once all subscribers have completed their reads.
228+
// Only applies in the concurrent streaming scenario (pub was registered on this same engine).
229+
if (pub_ever_present()) {
230+
unsigned int tx_to_evict = 0;
231+
{
232+
std::unique_lock lock(*get_subscribers().get_mutex());
233+
if (++subs_completed_current_tx_ == get_subscribers().count()) {
234+
subs_completed_current_tx_ = 0;
235+
tx_to_evict = current_sub_transaction_id_;
236+
}
237+
}
238+
if (tx_to_evict > 0)
239+
if (auto s = get_stream())
240+
s->flush_and_evict_transaction(tx_to_evict);
241+
}
242+
227243
// Mark this transaction as over
228244
sub_transaction_in_progress_ = false;
229245
}

src/Metadata.cpp

Lines changed: 49 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,38 +18,69 @@ void Metadata::add_transaction(unsigned int id,
1818
transaction_infos_[id][start_and_count] = std::make_pair(location, publisher);
1919
}
2020

21-
void Metadata::export_to_file(std::ofstream& ostream) const
21+
static void write_block_entries(std::ofstream& ostream,
22+
const std::map<std::pair<std::vector<size_t>, std::vector<size_t>>,
23+
std::pair<std::string, sg4::ActorPtr>, std::less<>>& transaction)
24+
{
25+
for (const auto& [block_info, location] : transaction) {
26+
const auto& [block_start, block_count] = block_info;
27+
const auto& [where, actor] = location;
28+
29+
ostream << " " << where.c_str() << ": [";
30+
XBT_DEBUG(" Actor %s wrote:", actor->get_cname());
31+
unsigned long last = block_start.size() - 1;
32+
for (unsigned long i = 0; i < last; i++) {
33+
ostream << block_start[i] << ":" << block_start[i] + block_count[i] << ", ";
34+
XBT_DEBUG(" Dimension %lu : [%zu..%zu]", i + 1, block_start[i], block_start[i] + block_count[i]);
35+
}
36+
ostream << block_start[last] << ":" << block_start[last] + block_count[last] << "]" << std::endl;
37+
XBT_DEBUG(" Dimension %lu : [%zu..%zu]", last + 1, block_start[last], block_start[last] + block_count[last]);
38+
XBT_DEBUG(" in: %s", where.c_str());
39+
}
40+
}
41+
42+
void Metadata::write_transaction_to_stream(unsigned int tx_id, std::ofstream& out)
43+
{
44+
auto it = transaction_infos_.find(tx_id);
45+
if (it == transaction_infos_.end())
46+
return;
47+
XBT_DEBUG(" Transaction %u:", tx_id);
48+
out << " Transaction " << tx_id << ":" << std::endl;
49+
write_block_entries(out, it->second);
50+
flushed_count_++;
51+
transaction_infos_.erase(it);
52+
}
53+
54+
void Metadata::evict_transaction(unsigned int tx_id)
55+
{
56+
transaction_infos_.erase(tx_id);
57+
}
58+
59+
void Metadata::export_to_file(std::ofstream& ostream, const std::string& prog_file_path) const
2260
{
2361
auto var = variable_.lock();
2462
xbt_assert(var, "Metadata::export_to_file called after its Variable has been destroyed");
2563
XBT_DEBUG("Variable %s:", var->get_cname());
26-
ostream << var->get_element_size() << "\t" << var->get_cname() << "\t" << transaction_infos_.size();
64+
unsigned int total = flushed_count_ + static_cast<unsigned int>(transaction_infos_.size());
65+
ostream << var->get_element_size() << "\t" << var->get_cname() << "\t" << total;
2766
ostream << "*{";
2867
auto shape = var->get_shape();
2968
const auto last_index = shape.size() - 1;
3069
for (unsigned int i = 0; i < last_index; i++)
3170
ostream << shape[i] << ",";
3271
ostream << shape[last_index] << "}" << std::endl;
3372

73+
// Copy already-flushed entries from the per-variable prog file (if any)
74+
if (!prog_file_path.empty()) {
75+
std::ifstream prog(prog_file_path, std::ios::binary);
76+
ostream << prog.rdbuf();
77+
}
78+
79+
// Write remaining in-memory entries
3480
for (const auto& [id, transaction] : transaction_infos_) {
3581
XBT_DEBUG(" Transaction %u:", id);
3682
ostream << " Transaction " << id << ":" << std::endl;
37-
for (const auto& [block_info, location] : transaction) {
38-
const auto& [block_start, block_count] = block_info;
39-
const auto& [where, actor] = location;
40-
41-
ostream << " " << where.c_str() << ": [";
42-
XBT_DEBUG(" Actor %s wrote:", actor->get_cname());
43-
unsigned long last = block_start.size() - 1;
44-
for (unsigned long i = 0; i < last; i++) {
45-
ostream << block_start[i] << ":" << block_start[i] + block_count[i] << ", ";
46-
XBT_DEBUG(" Dimension %lu : [%zu..%zu]", i + 1, block_start[i], block_start[i] + block_count[i]);
47-
}
48-
ostream << block_start[last] << ":" << block_start[last] + block_count[last] << "]" << std::endl;
49-
XBT_DEBUG(" Dimension %lu : [%zu..%zu]", last + 1, block_start[last], block_start[last] + block_count[last]);
50-
51-
XBT_DEBUG(" in: %s", where.c_str());
52-
}
83+
write_block_entries(ostream, transaction);
5384
}
5485
}
5586
/// \endcond

src/Stream.cpp

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,13 +146,43 @@ Stream& Stream::unset_metadata_export() noexcept
146146
return *this;
147147
}
148148

149-
void Stream::export_metadata_to_file() const
149+
void Stream::export_metadata_to_file()
150150
{
151+
metadata_exported_ = true;
151152
if (metadata_export_) {
152153
std::ofstream export_stream(metadata_file_, std::ofstream::out);
153-
for (const auto& [name, v] : variables_)
154-
v->get_metadata()->export_to_file(export_stream);
154+
for (const auto& [name, v] : variables_) {
155+
auto it = var_prog_file_paths_.find(name);
156+
std::string prog = (it != var_prog_file_paths_.end()) ? it->second : "";
157+
v->get_metadata()->export_to_file(export_stream, prog);
158+
}
155159
export_stream.close();
160+
// Remove per-variable prog files now that the final file is written
161+
for (const auto& [name, path] : var_prog_file_paths_)
162+
std::remove(path.c_str());
163+
var_prog_file_paths_.clear();
164+
}
165+
}
166+
167+
void Stream::flush_and_evict_transaction(unsigned int tx_id)
168+
{
169+
if (metadata_exported_) {
170+
// Sequential scenario or post-export: the metadata file is already complete; just free memory
171+
for (const auto& [name, v] : variables_)
172+
v->get_metadata()->evict_transaction(tx_id);
173+
return;
174+
}
175+
if (metadata_export_) {
176+
for (const auto& [name, v] : variables_) {
177+
if (!var_prog_file_paths_.count(name))
178+
var_prog_file_paths_[name] = metadata_file_ + "." + name + ".prog";
179+
std::ofstream out(var_prog_file_paths_.at(name), std::ofstream::app);
180+
v->get_metadata()->write_transaction_to_stream(tx_id, out);
181+
// write_transaction_to_stream also increments flushed_count_ and evicts from transaction_infos_
182+
}
183+
} else {
184+
for (const auto& [name, v] : variables_)
185+
v->get_metadata()->evict_transaction(tx_id);
156186
}
157187
}
158188

0 commit comments

Comments
 (0)