Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 105 additions & 19 deletions cpp/csp/adapters/kafka/KafkaAdapterManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ void KafkaAdapterManager::setConfProperties( RdKafka::Conf * conf, const Diction
void KafkaAdapterManager::forceShutdown( const std::string & err )
{
m_unrecoverableError = true; // So we can alert the producer to stop trying to flush
forceConsumerReplayComplete();
//Force all adapters replay complete so they dont stay blocked
for( auto &[_,topicData] : m_topics )
topicData.markReplayComplete();
try
{
CSP_THROW( RuntimeException, "Kafka fatal error. " + err );
Expand All @@ -147,12 +149,6 @@ void KafkaAdapterManager::forceShutdown( const std::string & err )
}
}

void KafkaAdapterManager::forceConsumerReplayComplete()
{
for( auto & consumer : m_consumerVector )
consumer -> forceReplayCompleted();
}

void KafkaAdapterManager::start( DateTime starttime, DateTime endtime )
{
std::string errstr;
Expand All @@ -166,6 +162,14 @@ void KafkaAdapterManager::start( DateTime starttime, DateTime endtime )
}
}

// wildcard subscription has no guarantee of being in order
// we flag replay complete as soon as we identify it.
for( auto &[_,topicData] : m_topics )
{
if( topicData.wildcardSubscriber )
topicData.wildcardSubscriber -> flagReplayComplete();
}

// start all consumers
for( auto & it : m_consumerVector )
it -> start( starttime );
Expand Down Expand Up @@ -244,6 +248,95 @@ void KafkaAdapterManager::pollProducers()
}
}

void KafkaAdapterManager::onMessage( RdKafka::Message * msg ) const
{
auto topicIt = m_topics.find( msg -> topic_name() );
if( topicIt == m_topics.end() )
{
std::string errmsg = "KafkaAdapterManager: Message received on unknown topic: " + msg -> topic_name() +
" errcode: " + RdKafka::err2str( msg -> err() ) + " error: " + msg -> errstr();
pushStatus( StatusLevel::ERROR, KafkaStatusMessageType::MSG_RECV_ERROR, errmsg );
return;
}
auto & topicData = topicIt -> second;

if( !msg -> key() )
{
std::string errmsg = "KafkaAdapterManager: Message received with null key on topic " + msg -> topic_name() + ".";
pushStatus( StatusLevel::ERROR, KafkaStatusMessageType::MSG_RECV_ERROR, errmsg );
return;
}

auto subscribersIt = topicData.subscribers.find( *msg -> key() );
if( subscribersIt != topicData.subscribers.end() )
{
bool live = topicData.flaggedReplayComplete;
for( auto it : subscribersIt -> second )
it -> onMessage( msg, live );
}

//Note we always have to tick wildcard as live because it can get messages from multiple
//partitions, some which may have done replaying and some not ( not to mention that data can be out of order )
if( topicData.wildcardSubscriber )
topicData.wildcardSubscriber -> onMessage( msg, true );
}

//Called from individual consumers once that partitions they are servicing for a given topic have all hit EOF
void KafkaAdapterManager::markConsumerReplayDone( KafkaConsumer * consumer, const std::string & topic )
{
auto topicIt = m_topics.find( topic );
assert( topicIt != m_topics.end() );
if( topicIt == m_topics.end() )
return;

topicIt -> second.markConsumerReplayDone( consumer );
}

/*** TopicData ***/
void KafkaAdapterManager::TopicData::addSubscriber( KafkaConsumer * consumer, const std::string & key, KafkaSubscriber * subscriber )
{
consumers.emplace( consumer, false );
if( key.empty() )
{
assert( wildcardSubscriber == nullptr );
wildcardSubscriber = subscriber;
}
else
subscribers[key].emplace_back( subscriber );
}

void KafkaAdapterManager::TopicData::markConsumerReplayDone( KafkaConsumer * consumer )
{
auto it = consumers.find( consumer );
assert( it != consumers.end() );
it -> second = true;

for( auto [_,done] : consumers )
{
if( !done )
return;
}

//All consumer partitions for the given topic are done replaying
markReplayComplete();
}

void KafkaAdapterManager::TopicData::markReplayComplete()
{
//this can be called from multiple consumer threads
bool prevVal = flaggedReplayComplete.exchange( true );
if( !prevVal )
{
// Flag all regular subscribers
for( auto& subscriberEntry : subscribers )
{
for( auto* subscriber : subscriberEntry.second )
subscriber -> flagReplayComplete();
}
}
}
/*** end TopicData ***/

PushInputAdapter * KafkaAdapterManager::getInputAdapter( CspTypePtr & type, PushMode pushMode, const Dictionary & properties )
{
std::string topic = properties.get<std::string>( "topic" );
Expand Down Expand Up @@ -274,25 +367,16 @@ OutputAdapter * KafkaAdapterManager::getOutputAdapter( CspTypePtr & type, const
}
}

KafkaConsumer * KafkaAdapterManager::getConsumer( const std::string & topic, const Dictionary & properties )
KafkaConsumer * KafkaAdapterManager::getConsumer( const Dictionary & properties )
{
// If we have seen this topic before, look up the consumer for it in the map
// Otherwise, make a new consumer (and insert it into the map)
// If we have reached m_maxThreads, then round-robin the topic onto a consumer (and insert it into the map)
if( m_consumerMap.find( topic ) != m_consumerMap.end() )
{
return m_consumerMap[ topic ].get();
}
if( m_consumerVector.size() < m_maxThreads )
{
auto consumer = std::make_shared<KafkaConsumer>( this, properties );
m_consumerVector.emplace_back( consumer );
m_consumerMap.emplace( topic, consumer );
return m_consumerMap[ topic ].get();
return consumer.get();
}

auto consumer = m_consumerVector[ m_consumerIdx++ ];
m_consumerMap.emplace( topic, consumer );
if( m_consumerIdx >= m_maxThreads )
m_consumerIdx = 0;
return consumer.get();
Expand All @@ -308,7 +392,9 @@ KafkaSubscriber * KafkaAdapterManager::getSubscriber( const std::string & topic,
std::unique_ptr<KafkaSubscriber> subscriber( new KafkaSubscriber( this, properties ) );
rv.first -> second = std::move( subscriber );

this -> getConsumer( topic, properties ) -> addSubscriber( topic, key, rv.first -> second.get() );
auto * consumer = this -> getConsumer( properties );
consumer -> addTopic( topic );
m_topics[ topic ].addSubscriber( consumer, key, rv.first -> second.get() );
}

return rv.first -> second.get();
Expand Down
29 changes: 24 additions & 5 deletions cpp/csp/adapters/kafka/KafkaAdapterManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <csp/engine/AdapterManager.h>
#include <csp/engine/Dictionary.h>
#include <csp/engine/PushInputAdapter.h>
#include <librdkafka/rdkafkacpp.h>
#include <atomic>
#include <string>
#include <thread>
Expand Down Expand Up @@ -66,8 +67,6 @@ class KafkaAdapterManager final : public csp::AdapterManager
PushInputAdapter * getInputAdapter( CspTypePtr & type, PushMode pushMode, const Dictionary & properties );
OutputAdapter * getOutputAdapter( CspTypePtr & type, const Dictionary & properties );

KafkaConsumer * getConsumer( const std::string & topic, const Dictionary & properties );

RdKafka::Conf * getConsumerConf() { return m_consumerConf.get(); }

const Dictionary::Value & startOffsetProperty() const { return m_startOffsetProperty; }
Expand All @@ -76,22 +75,42 @@ class KafkaAdapterManager final : public csp::AdapterManager

void forceShutdown( const std::string & err );

void markConsumerReplayDone( KafkaConsumer * consumer, const std::string & topic );
void onMessage( RdKafka::Message * msg ) const;

private:

using TopicKeyPair = std::pair<std::string, std::string>;

KafkaConsumer * getConsumer( const Dictionary & properties );
void setConfProperties( RdKafka::Conf * conf, const Dictionary & properties );
void pollProducers();
void forceConsumerReplayComplete();

KafkaSubscriber * getSubscriber( const std::string & topic, const std::string & key, const Dictionary & properties );
KafkaPublisher * getStaticPublisher( const TopicKeyPair & pair, const Dictionary & properties );
KafkaPublisher * getDynamicPublisher( const std::string & topic, const Dictionary & properties );

struct TopicData
{
//Key -> Subscriber
using SubscriberMap = std::unordered_map<std::string, std::vector<KafkaSubscriber*>>;
using ConsumerMap = std::unordered_map<KafkaConsumer *, bool>;
ConsumerMap consumers;
SubscriberMap subscribers;
KafkaSubscriber * wildcardSubscriber = nullptr;
std::atomic<bool> flaggedReplayComplete = false;

void addSubscriber( KafkaConsumer * consumer, const std::string & key, KafkaSubscriber * subscriber );
void markConsumerReplayDone( KafkaConsumer * consumer );
void markReplayComplete();
};

using TopicMap = std::unordered_map<std::string,TopicData>;
TopicMap m_topics;

using ConsumerVector = std::vector<std::shared_ptr<KafkaConsumer>>;
ConsumerVector m_consumerVector;
using ConsumerMap = std::unordered_map<std::string, std::shared_ptr<KafkaConsumer>>;
ConsumerMap m_consumerMap;


using StaticPublishers = std::unordered_map<TopicKeyPair, std::unique_ptr<KafkaPublisher>, hash::hash_pair>;
StaticPublishers m_staticPublishers;
Expand Down
Loading
Loading