Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@
//=== Bug Fixes

//=== Regressions
== {es} version 9.4.0

=== Enhancements

* Better handling of invalid JSON state documents (See {ml-pull}[]#2895].)

== {es} version 9.3.0

=== Enhancements
Expand Down
3 changes: 1 addition & 2 deletions include/core/CJsonStateRestoreTraverser.h
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,7 @@ class CORE_EXPORT CJsonStateRestoreTraverser : public CStateRestoreTraverser {
bool s_HaveCompleteToken{false};
};

//! JSON reader istream wrapper
// core::CBoostJsonUnbufferedIStreamWrapper m_ReadStream;
//! JSON stream to read from
std::istream& m_ReadStream;

//! JSON reader
Expand Down
24 changes: 15 additions & 9 deletions lib/api/CFieldDataCategorizer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,16 @@ bool CFieldDataCategorizer::restoreState(core::CDataSearcher& restoreSearcher,

LOG_DEBUG(<< "Restore categorizer state");

auto handleCorruptRestore = [this](const std::string& message) {
LOG_ERROR(<< message);
// This situation is fatal in terms of the categorizer we attempted to restore,
// but returning false here can throw the system into a repeated cycle
// of failure. It's better to reset the categorizer and re-categorize from
// scratch.
this->resetAfterCorruptRestore();
return true;
};

try {
// Restore from Elasticsearch compressed data.
// (To restore from uncompressed data for testing, comment the next line
Expand Down Expand Up @@ -310,17 +320,13 @@ bool CFieldDataCategorizer::restoreState(core::CDataSearcher& restoreSearcher,
core::CJsonStateRestoreTraverser traverser(*strm);

if (this->acceptRestoreTraverser(traverser) == false) {
LOG_ERROR(<< "JSON restore failed");
return false;
// We used to return false here. Putting it at odds with the exception handling case (below).
// We now follow the same logic for both failure branches.
return handleCorruptRestore("JSON restore failed");
}
LOG_DEBUG(<< "JSON restore complete");
} catch (std::exception& e) {
LOG_ERROR(<< "Failed to restore state! " << e.what());
// This is fatal in terms of the categorizer we attempted to restore,
// but returning false here can throw the system into a repeated cycle
// of failure. It's better to reset the categorizer and re-categorize from
// scratch.
this->resetAfterCorruptRestore();
return true;
return handleCorruptRestore("Failed to restore state! " + std::string(e.what()));
}

return true;
Expand Down
52 changes: 50 additions & 2 deletions lib/api/unittest/CFieldDataCategorizerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <set>
#include <sstream>
#include <tuple>
#include <vector>

BOOST_AUTO_TEST_SUITE(CFieldDataCategorizerTest)

Expand Down Expand Up @@ -210,6 +211,53 @@ std::string setupPerPartitionStopOnWarnTest(bool stopOnWarnAtInit,
}
}

BOOST_AUTO_TEST_CASE(testRestoreFromBadState) {
core::CLogger::instance().setLoggingLevel(core::CLogger::E_Trace);
model::CLimits limits;
CAnomalyJobConfig config;
BOOST_TEST_REQUIRE(config.initFromFile("testfiles/cat_job_config.json"));
CTestChainedProcessor testChainedProcessor;

std::ostringstream outputStrm;
core::CJsonOutputStreamWrapper wrappedOutputStream{outputStrm};

CTestFieldDataCategorizer categorizer{"job", config.analysisConfig(), limits,
&testChainedProcessor, wrappedOutputStream};

std::vector<std::string> badStates = {
// "Empty" base64 - []
R"({"compressed": ["H4sIAAAAAAAA","/4uOBQApu0wNAgAAAA=="],"eos":true})",
// Not compressed base64 - "junk"
R"({"compressed": ["anVuawo="],"eos":true})",
// Empty compressed array
R"({"compressed": [],"eos":true})",
// Not a JSON array
R"({"compressed": Junk,"eos":true})",
// Decompresses to "junk"
R"({"compressed": ["H4sIADlIcGkAA8sqzcvmAgAHddRtBQAAAA=="],"eos":true})",
// Invalid JSON
R"({ "foo: "bar" )",
// Missing 'compressed' field
R"({"eos":true})",
// 'compressed' is not an array
R"({"compressed": "a string","eos":true})",
// 'compressed' array contains non-string
R"({"compressed": [123],"eos":true})",
// Invalid base64 content
R"({"compressed": ["not-base64"],"eos":true})",
// Null state document
R"({"compressed": \0,"eos":true})",
// NULL character after object end
R"({"index":{"_id":"logs_services_count_logs_categories_categorizer_state#1"}})"};

for (const auto& badState : badStates) {
LOG_DEBUG(<< "Restoring from \"" << badState << "\"");
CTestDataSearcher restorer{badState};
core_t::TTime time{0};
BOOST_REQUIRE_EQUAL(true, categorizer.restoreState(restorer, time));
}
}

BOOST_AUTO_TEST_CASE(testWithoutPerPartitionCategorization) {
model::CLimits limits;
CAnomalyJobConfig config;
Expand Down Expand Up @@ -576,7 +624,7 @@ BOOST_AUTO_TEST_CASE(testHandleControlMessages) {
BOOST_REQUIRE_EQUAL(0, output.find("[{\"flush\":{\"id\":\"7\",\"last_finalized_bucket_end\":0,\"refresh_required\":true}}"));
}

BOOST_AUTO_TEST_CASE(testRestoreStateFailsWithEmptyState) {
BOOST_AUTO_TEST_CASE(testRestoreStateRecoversWithEmptyState) {
model::CLimits limits;
CAnomalyJobConfig config;
BOOST_TEST_REQUIRE(config.initFromFile("testfiles/new_persist_categorization.json"));
Expand All @@ -588,7 +636,7 @@ BOOST_AUTO_TEST_CASE(testRestoreStateFailsWithEmptyState) {

core_t::TTime completeToTime{0};
CEmptySearcher restoreSearcher;
BOOST_TEST_REQUIRE(categorizer.restoreState(restoreSearcher, completeToTime) == false);
BOOST_TEST_REQUIRE(categorizer.restoreState(restoreSearcher, completeToTime) == true);
}

BOOST_AUTO_TEST_CASE(testFlushWritesOnlyChangedCategories) {
Expand Down
26 changes: 26 additions & 0 deletions lib/api/unittest/testfiles/cat_job_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"job_id": "logs_services_count_logs_categories",
"analysis_config": {
"categorization_field_name": "message",
"per_partition_categorization": {
"enabled": true,
"stop_on_warn": false
},
"detectors": [
{
"detector_description": "count by mlcategory partitionfield=\"service.name\"",
"function": "count",
"by_field_name": "mlcategory",
"partition_field_name": "service.name"
}
],
"influencers": [
"mlcategory",
"service.name"
]
},
"data_description": {
"time_field": "@timestamp",
"time_format": "epoch_ms"
}
}
7 changes: 4 additions & 3 deletions lib/core/CJsonStateRestoreTraverser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ CJsonStateRestoreTraverser::CJsonStateRestoreTraverser(std::istream& inputStream
}

bool CJsonStateRestoreTraverser::isEof() const {
// CBoostJsonUnbufferedIStreamWrapper returns \0 when it reaches EOF
return m_ReadStream.peek() == '\0';
return m_ReadStream.eof();
}

bool CJsonStateRestoreTraverser::next() {
Expand Down Expand Up @@ -402,7 +401,9 @@ bool CJsonStateRestoreTraverser::advance() {
}

void CJsonStateRestoreTraverser::logError() {
LOG_ERROR(<< "Error parsing JSON: " << m_Reader.last_error() << ", stream state - bad: "
LOG_ERROR(<< "Error parsing JSON: "
<< "\"" << m_Buffer << "\""
<< "\"" << m_Reader.last_error() << ", stream state - bad: "
<< m_ReadStream.bad() << ", fail: " << m_ReadStream.fail()
<< ", eof: " << m_ReadStream.eof() << ", bytes remaining: " << m_BytesRemaining
<< ", buffer position: " << (m_BufferPtr ? (m_BufferPtr - m_Buffer) : -1)
Expand Down
18 changes: 16 additions & 2 deletions lib/core/CStateDecompressor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,23 @@ bool CStateDecompressor::CDechunkFilter::parseNext() {
do {
char c = m_InputStreamWrapper->take();
if (c == '\0') {
std::string message;
if (m_ParsingStarted == false) {
message = "Encountered NULL character in stream before parsing has started.";
ret = false;
}
if (m_Reader->handler().s_Type == SBoostJsonHandler::E_TokenObjectEnd) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since both if statements could be true, you would potentially reassign message. I guess it should be either else if here, or the messages should be concatenated.

if (message.size() > 0) {
message += "\n";
}
message += "Encountered NULL character in stream after object end.";
ret = false;
}
if (ret == false && message.empty() == false) {
std::string jsonStr(m_Reader->handler().s_CompressedChunk,
m_Reader->handler().s_CompressedChunkLength);
LOG_WARN(<< "Error parsing JSON: \"" << jsonStr << "\". " << message);
}
break;
}

Expand Down Expand Up @@ -160,7 +174,7 @@ bool CStateDecompressor::CDechunkFilter::readHeader() {
}
// If we are here, we have got an empty document from downstream,
// so the stream is finished
LOG_TRACE(<< "Failed to find 'compressed' data array!");
LOG_WARN(<< "Failed to find 'compressed' data array!");
m_Initialised = false;
m_IStream.reset();
++m_CurrentDocNum;
Expand Down Expand Up @@ -243,7 +257,7 @@ void CStateDecompressor::CDechunkFilter::handleRead(char* s,
std::streamsize CStateDecompressor::CDechunkFilter::endOfStream(char* s,
std::streamsize n,
std::streamsize bytesDone) {
// return [ ] if not m_Initialised
// return [ ] if not m_Initialised - i.e. if no valid json could be found
m_EndOfStream = true;
if (!m_SentData && bytesDone == 0) {
std::streamsize toCopy = std::min(std::streamsize(EMPTY_DATA.size()), n);
Expand Down