Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions src/interactive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ namespace mode::interactive {

using namespace std::string_literals;


//struct InteractiveQueryProcessor : query::QueryProcessor {
// void process_header(const mg_list *header) override {}
// void process_row(const mg_list *row) override {}
// void process_summary(const mg_map *summary) override {}
//};



int Run(const utils::bolt::Config &bolt_config, const std::string &history, bool no_history,
bool verbose_execution_info, const format::CsvOptions &csv_opts, const format::OutputOptions &output_opts) {
Replxx *replxx_instance = InitAndSetupReplxx();
Expand Down Expand Up @@ -113,12 +122,16 @@ int Run(const utils::bolt::Config &bolt_config, const std::string &history, bool
}

try {
auto ret = query::ExecuteQuery(session.get(), query->query);
if (ret.records.size() > 0) {
//TODO: CSV Processor and CSV do not need to wait for all results
// auto processor = InteractiveQueryProcessor{};
auto ret = query::ExecuteQuery/*Ex*/(session.get(), query->query);

if (!ret.records.empty()) {
// HERE
Output(ret.header, ret.records, output_opts, csv_opts);
}
std::string summary;
if (ret.records.size() == 0) {
if (ret.records.empty()) {
summary = "Empty set";
} else if (ret.records.size() == 1) {
summary = std::to_string(ret.records.size()) + " row in set";
Expand Down
167 changes: 117 additions & 50 deletions src/utils/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,76 @@ std::map<std::string, std::string> ParseNotifications(const mg_value *mg_notific

double ParseFloat(const mg_value *mg_val_float) { return mg_value_float(mg_val_float); }




struct QueryResultProcessor : query::QueryProcessor {
QueryResultProcessor() : start{std::chrono::system_clock::now()} {}

void process_header(mg_list const *header) override {
for (uint32_t i = 0; i < mg_list_size(header); ++i) {
const mg_value *field = mg_list_at(header, i);
if (mg_value_get_type(field) == MG_VALUE_TYPE_STRING) {
result.header.emplace_back(mg_string_data(mg_value_string(field)), mg_string_size(mg_value_string(field)));
} else {
std::stringstream field_stream;
utils::PrintValue(field_stream, field);
result.header.push_back(field_stream.str());
}
}
}

void process_row(mg_list const *row) override {
result.records.push_back(mg_memory::MakeCustomUnique<mg_list>(mg_list_copy(row)));
if (!result.records.back()) {
std::cerr << "out of memory";
abort();
}
}

void process_summary(mg_map const *summary) override {
if (summary && mg_map_size(summary) > 0) {
{
std::map<std::string, double> execution_info;
for (auto key : {"cost_estimate", "parsing_time", "planning_time", "plan_execution_time"}) {
if (const mg_value *info = mg_map_at(summary, key); info) {
execution_info.emplace(key, ParseFloat(info));
}
}
if (!execution_info.empty()) {
result.execution_info = execution_info;
}
}

if (const mg_value *mg_stats = mg_map_at(summary, "stats"); mg_stats) {
result.stats.emplace(ParseStats(mg_stats));
}
if (const mg_value *mg_notifications = mg_map_at(summary, "notifications"); mg_notifications) {
result.notification.emplace(ParseNotifications(mg_notifications));
}
}
}

auto finish() -> query::QueryResult {
result.wall_time = std::chrono::system_clock::now() - start;
return std::move(result);
}

void process_fatal() override {
std::cout << "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" << std::endl;
}
void process_query_error() override {
std::cout << "$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$" << std::endl;
}

private:

query::QueryResult result;
std::chrono::time_point<std::chrono::system_clock> start;
};



} // namespace

namespace console {
Expand Down Expand Up @@ -850,9 +920,8 @@ void PrintQueryInfo(const Query &query) {
std::cout << "line: " << query.line_number << " index: " << query.index << " query: " << query.query << std::endl;
}

QueryResult ExecuteQuery(mg_session *session, const std::string &query) {
void ExecuteQueryEx(mg_session *session, const std::string &query, QueryProcessor & processor) {
int status = mg_session_run(session, query.c_str(), nullptr, nullptr, nullptr, nullptr);
auto start = std::chrono::system_clock::now();
if (status != 0) {
if (mg_session_status(session) == MG_SESSION_BAD) {
throw utils::ClientFatalException(mg_session_error(session));
Expand Down Expand Up @@ -881,62 +950,55 @@ QueryResult ExecuteQuery(mg_session *session, const std::string &query) {
}
}

QueryResult ret;
mg_result *result;
while ((status = mg_session_fetch(session, &result)) == 1) {
ret.records.push_back(mg_memory::MakeCustomUnique<mg_list>(mg_list_copy(mg_result_row(result))));
if (!ret.records.back()) {
std::cerr << "out of memory";
std::abort();
}
}
if (status != 0) {
if (mg_session_status(session) == MG_SESSION_BAD) {
throw utils::ClientFatalException(mg_session_error(session));
} else {
throw utils::ClientQueryException(mg_session_error(session));
}
}
enum states {
START,
RECEIVING,
DONE,
};

{
const mg_list *header = mg_result_columns(result);
for (uint32_t i = 0; i < mg_list_size(header); ++i) {
const mg_value *field = mg_list_at(header, i);
if (mg_value_get_type(field) == MG_VALUE_TYPE_STRING) {
ret.header.push_back(
std::string(mg_string_data(mg_value_string(field)), mg_string_size(mg_value_string(field))));
} else {
std::stringstream field_stream;
utils::PrintValue(field_stream, field);
ret.header.push_back(field_stream.str());
}
auto result_handler = [session, state = states::START, &processor](int status, mg_result *result) mutable {
switch (status) {
case 0:
state = states::DONE;
break;
case 1:
break;
default:
if (mg_session_status(session) == MG_SESSION_BAD) {
processor.process_fatal();
throw utils::ClientFatalException(mg_session_error(session));
} else {
processor.process_query_error();
throw utils::ClientQueryException(mg_session_error(session));
}
}
}

const mg_map *summary = mg_result_summary(result);
if (summary && mg_map_size(summary) > 0) {
{
std::map<std::string, double> execution_info;
for (auto key : {"cost_estimate", "parsing_time", "planning_time", "plan_execution_time"}) {
if (const mg_value *info = mg_map_at(summary, key); info) {
execution_info.emplace(key, ParseFloat(info));
}
switch (state) {
case states::START: {
const mg_list *header = mg_result_columns(result);
processor.process_header(header);
state = states::RECEIVING;
[[fallthrough]];
}
if (!execution_info.empty()) {
ret.execution_info = execution_info;
case states::RECEIVING: {
const mg_list *row = mg_result_row(result);
processor.process_row(row);
return true;
}
case states::DONE: {
const mg_map *summary = mg_result_summary(result);
processor.process_summary(summary);
return false;
}
}
};

if (const mg_value *mg_stats = mg_map_at(summary, "stats"); mg_stats) {
ret.stats.emplace(ParseStats(mg_stats));
}
if (const mg_value *mg_notifications = mg_map_at(summary, "notifications"); mg_notifications) {
ret.notification.emplace(ParseNotifications(mg_notifications));
}
}
mg_result *result;

ret.wall_time = std::chrono::system_clock::now() - start;
return ret;
while (true) {
status = mg_session_fetch(session, &result);
if (!result_handler(status, result)) break;
}
}

void PrintBatchesInfo(const std::vector<Batch> &batches) {
Expand Down Expand Up @@ -989,6 +1051,11 @@ BatchResult ExecuteBatch(mg_session *session, const Batch &batch) {
}
return BatchResult{.is_executed = true};
}
QueryResult ExecuteQuery(mg_session *session, const std::string &query) {
auto processor = QueryResultProcessor{};
ExecuteQueryEx(session, query, processor);
return processor.finish();
}

} // namespace query

Expand Down
20 changes: 20 additions & 0 deletions src/utils/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
#endif /* _WIN32 */

#include "mgclient.h"

#include <functional>

#include "replxx.h"

#include "query_type.hpp"
Expand Down Expand Up @@ -321,6 +324,23 @@ struct BatchResult {
// The extra part is perserved for the next GetQuery call
std::optional<Query> GetQuery(Replxx *replxx_instance, bool collect_info = false);


//auto build_handler(query::QueryResult &ret, mg_session *session) -> std::function<bool(int, mg_result *)>;

struct QueryProcessor{
virtual void process_header(mg_list const *header) = 0;

virtual void process_row(mg_list const *row) =0;

virtual void process_summary(mg_map const *summary)= 0;

virtual void process_fatal() = 0;
virtual void process_query_error() = 0;

};

void ExecuteQueryEx(mg_session *session, const std::string &query, QueryProcessor & processor);

QueryResult ExecuteQuery(mg_session *session, const std::string &query);
BatchResult ExecuteBatch(mg_session *session, const Batch &batch);

Expand Down