Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 18 additions & 14 deletions be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,14 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
uint32_t new_segid = mapping->at(segid);
DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
butil::IOBuf buf = data->movable();
auto flush_func = [this, new_segid, eos, buf, header, file_type]() mutable {
signal::set_signal_task_id(_load_id);
auto self = shared_from_this();
auto flush_func = [self, new_segid, eos, buf, header, file_type]() mutable {
signal::set_signal_task_id(self->_load_id);
g_load_stream_flush_running_threads << -1;
auto st = _load_stream_writer->append_data(new_segid, header.offset(), buf, file_type);
auto st =
self->_load_stream_writer->append_data(new_segid, header.offset(), buf, file_type);
if (!st.ok() && !config::is_cloud_mode()) {
auto res = ExecEnv::get_tablet(_id);
auto res = ExecEnv::get_tablet(self->_id);
TabletSharedPtr tablet =
res.has_value() ? std::dynamic_pointer_cast<Tablet>(res.value()) : nullptr;
if (tablet) {
Expand All @@ -163,7 +165,7 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
DBUG_EXECUTE_IF("TabletStream.append_data.unknown_file_type",
{ file_type = static_cast<FileType>(-1); });
if (file_type == FileType::SEGMENT_FILE || file_type == FileType::INVERTED_INDEX_FILE) {
st = _load_stream_writer->close_writer(new_segid, file_type);
st = self->_load_stream_writer->close_writer(new_segid, file_type);
} else {
st = Status::InternalError(
"appent data failed, file type error, file type = {}, "
Expand All @@ -174,8 +176,8 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
DBUG_EXECUTE_IF("TabletStream.append_data.append_failed",
{ st = Status::InternalError("fault injection"); });
if (!st.ok()) {
_status.update(st);
LOG(WARNING) << "write data failed " << st << ", " << *this;
self->_status.update(st);
LOG(WARNING) << "write data failed " << st << ", " << *self;
}
};
auto load_stream_flush_token_max_tasks = config::load_stream_flush_token_max_tasks;
Expand Down Expand Up @@ -247,14 +249,15 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data
}
DCHECK(new_segid != std::numeric_limits<uint32_t>::max());

auto add_segment_func = [this, new_segid, stat]() {
signal::set_signal_task_id(_load_id);
auto st = _load_stream_writer->add_segment(new_segid, stat);
auto self = shared_from_this();
auto add_segment_func = [self, new_segid, stat]() {
signal::set_signal_task_id(self->_load_id);
auto st = self->_load_stream_writer->add_segment(new_segid, stat);
DBUG_EXECUTE_IF("TabletStream.add_segment.add_segment_failed",
{ st = Status::InternalError("fault injection"); });
if (!st.ok()) {
_status.update(st);
LOG(INFO) << "add segment failed " << *this;
self->_status.update(st);
LOG(INFO) << "add segment failed " << *self;
}
};
Status st = Status::OK();
Expand All @@ -274,8 +277,9 @@ Status TabletStream::_run_in_heavy_work_pool(std::function<Status()> fn) {
std::unique_lock<bthread::Mutex> lock(mu);
bthread::ConditionVariable cv;
auto st = Status::OK();
auto func = [this, &mu, &cv, &st, &fn] {
signal::set_signal_task_id(_load_id);
auto self = shared_from_this();
auto func = [self, &mu, &cv, &st, &fn] {
signal::set_signal_task_id(self->_load_id);
st = fn();
std::lock_guard<bthread::Mutex> lock(mu);
cv.notify_one();
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/load_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class OlapTableSchemaParam;
// origin_segid(index) -> new_segid(value in vector)
using SegIdMapping = std::vector<uint32_t>;
using FailedTablets = std::vector<std::pair<int64_t, Status>>;
class TabletStream {
class TabletStream : public std::enable_shared_from_this<TabletStream> {
public:
TabletStream(const PUniqueId& load_id, int64_t id, int64_t txn_id,
LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile);
Expand Down
Loading