Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/rmq/rmqio/rmqio_asiotimer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,14 @@
// limitations under the License.

#include <rmqio_asiotimer.h>

namespace {
void instantiateTemplates()
{
// Instantiate the templates to ensure that they compile.
BloombergLP::rmqio::AsioEventLoop eventLoop;
BloombergLP::rmqio::AsioTimerFactory timerFactory(eventLoop);
BloombergLP::rmqio::AsioTimer timer(eventLoop.context(),
BloombergLP::bsls::TimeInterval(1));
}
} // namespace
89 changes: 45 additions & 44 deletions src/rmq/rmqio/rmqio_asiotimer.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ namespace rmqio {
//
// rmqio::AsioTimer: Provides a class for scheduling a cancellable
// callback to be executed after a given timeout
typedef boost::asio::chrono::system_clock DefaultClockType;

template <typename TIME = boost::asio::deadline_timer::time_type,
typename TIME_TRAITS = boost::asio::time_traits<TIME> >
template <typename ClockType = DefaultClockType>
class basic_AsioTimer
: public Timer,
public bsl::enable_shared_from_this<basic_AsioTimer<TIME, TIME_TRAITS> > {
public bsl::enable_shared_from_this<basic_AsioTimer<ClockType> > {
public:
basic_AsioTimer(boost::asio::io_context& context,
const bsls::TimeInterval& timeout);
Expand All @@ -60,23 +60,22 @@ class basic_AsioTimer
basic_AsioTimer& operator=(const basic_AsioTimer&) BSLS_KEYWORD_DELETED;

static void
handler_internal(bsl::weak_ptr<basic_AsioTimer<TIME, TIME_TRAITS> > timer,
handler_internal(bsl::weak_ptr<basic_AsioTimer<ClockType> > timer,
const Timer::Callback callback,
const boost::system::error_code& error);
void handler(const Timer::Callback& callback,
const boost::system::error_code& error);
void startTimer();

boost::asio::basic_deadline_timer<TIME, TIME_TRAITS> d_timer;
boost::asio::basic_waitable_timer<ClockType> d_timer;
Timer::Callback d_callback;
bsls::TimeInterval d_timeout;
BALL_LOG_SET_CLASS_CATEGORY("RMQIO.ASIOTIMER");
};

typedef basic_AsioTimer<> AsioTimer;

template <typename TIME = boost::asio::deadline_timer::time_type,
typename TIME_TRAITS = boost::asio::time_traits<TIME> >
template <typename ClockType = DefaultClockType>
class basic_AsioTimerFactory : public TimerFactory {
public:
basic_AsioTimerFactory(rmqio::AsioEventLoop& eventLoop);
Expand All @@ -95,33 +94,33 @@ class basic_AsioTimerFactory : public TimerFactory {

typedef basic_AsioTimerFactory<> AsioTimerFactory;

template <typename T, typename TT>
basic_AsioTimer<T, TT>::basic_AsioTimer(boost::asio::io_context& io_context,
const bsls::TimeInterval& timeout)
template <typename ClockType>
basic_AsioTimer<ClockType>::basic_AsioTimer(boost::asio::io_context& io_context,
const bsls::TimeInterval& timeout)
: Timer()
, d_timer(io_context)
, d_callback()
, d_timeout(timeout)
{
}

template <typename T, typename TT>
basic_AsioTimer<T, TT>::basic_AsioTimer(boost::asio::io_context& io_context,
const Timer::Callback& callback)
template <typename ClockType>
basic_AsioTimer<ClockType>::basic_AsioTimer(boost::asio::io_context& io_context,
const Timer::Callback& callback)
: Timer()
, d_timer(io_context)
, d_callback(callback)
, d_timeout()
{
}

template <typename T, typename TT>
basic_AsioTimer<T, TT>::~basic_AsioTimer()
template <typename ClockType>
basic_AsioTimer<ClockType>::~basic_AsioTimer()
{
}

template <typename T, typename TT>
void basic_AsioTimer<T, TT>::reset(const bsls::TimeInterval& timeout)
template <typename ClockType>
void basic_AsioTimer<ClockType>::reset(const bsls::TimeInterval& timeout)
{
if (!d_callback) {
BALL_LOG_ERROR << "reset() called before start()";
Expand All @@ -131,42 +130,42 @@ void basic_AsioTimer<T, TT>::reset(const bsls::TimeInterval& timeout)
startTimer();
}

template <typename T, typename TT>
void basic_AsioTimer<T, TT>::cancel()
template <typename ClockType>
void basic_AsioTimer<ClockType>::cancel()
{
d_timer.cancel();
}

template <typename T, typename TT>
void basic_AsioTimer<T, TT>::resetCallback(const Callback& callback)
template <typename ClockType>
void basic_AsioTimer<ClockType>::resetCallback(const Callback& callback)
{
d_callback = callback;
}

template <typename T, typename TT>
void basic_AsioTimer<T, TT>::start(const Timer::Callback& callback)
template <typename ClockType>
void basic_AsioTimer<ClockType>::start(const Timer::Callback& callback)
{
d_callback = callback;
startTimer();
}

template <typename T, typename TT>
void basic_AsioTimer<T, TT>::handler_internal(
bsl::weak_ptr<basic_AsioTimer<T, TT> > timer,
template <typename ClockType>
void basic_AsioTimer<ClockType>::handler_internal(
bsl::weak_ptr<basic_AsioTimer<ClockType> > timer,
const Timer::Callback callback,
const boost::system::error_code& error)
{
bsl::shared_ptr<basic_AsioTimer<T, TT> > t = timer.lock();
bsl::shared_ptr<basic_AsioTimer<ClockType> > t = timer.lock();
if (t) {
t->handler(callback, error);
}
}

template <typename T, typename TT>
void basic_AsioTimer<T, TT>::handler(const Timer::Callback& callback,
const boost::system::error_code& error)
template <typename ClockType>
void basic_AsioTimer<ClockType>::handler(const Timer::Callback& callback,
const boost::system::error_code& error)
{
if (!error && d_timer.expires_at() <= TT::now()) {
if (!error) {
callback(Timer::EXPIRE);
}
else {
Expand All @@ -178,38 +177,40 @@ void basic_AsioTimer<T, TT>::handler(const Timer::Callback& callback,
}
}

template <typename T, typename TT>
void basic_AsioTimer<T, TT>::startTimer()
template <typename ClockType>
void basic_AsioTimer<ClockType>::startTimer()
{
d_timer.expires_from_now(
boost::posix_time::milliseconds(d_timeout.totalMilliseconds()));
d_timer.expires_after(
boost::asio::chrono::milliseconds(d_timeout.totalMilliseconds()));
d_timer.async_wait(
bdlf::BindUtil::bind(&basic_AsioTimer<T, TT>::handler_internal,
bdlf::BindUtil::bind(&basic_AsioTimer<ClockType>::handler_internal,
this->weak_from_this(),
d_callback,
bdlf::PlaceHolders::_1));
}

template <typename T, typename TT>
basic_AsioTimerFactory<T, TT>::basic_AsioTimerFactory(
template <typename ClockType>
basic_AsioTimerFactory<ClockType>::basic_AsioTimerFactory(
rmqio::AsioEventLoop& eventLoop)
: d_eventLoop(eventLoop)
{
}

template <typename T, typename TT>
bsl::shared_ptr<rmqio::Timer> basic_AsioTimerFactory<T, TT>::createWithTimeout(
template <typename ClockType>
bsl::shared_ptr<rmqio::Timer>
basic_AsioTimerFactory<ClockType>::createWithTimeout(
const bsls::TimeInterval& timeout)
{
return bsl::make_shared<rmqio::basic_AsioTimer<T, TT> >(
return bsl::make_shared<rmqio::basic_AsioTimer<ClockType> >(
bsl::ref(d_eventLoop.context()), timeout);
}

template <typename T, typename TT>
bsl::shared_ptr<rmqio::Timer> basic_AsioTimerFactory<T, TT>::createWithCallback(
template <typename ClockType>
bsl::shared_ptr<rmqio::Timer>
basic_AsioTimerFactory<ClockType>::createWithCallback(
const Timer::Callback& callback)
{
return bsl::make_shared<rmqio::basic_AsioTimer<T, TT> >(
return bsl::make_shared<rmqio::basic_AsioTimer<ClockType> >(
bsl::ref(d_eventLoop.context()), callback);
}

Expand Down
1 change: 1 addition & 0 deletions src/tests/integration/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
requests
pytest
telnetlib3
2 changes: 1 addition & 1 deletion src/tests/integration/rmqapitests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import requests
from requests.auth import HTTPBasicAuth
from typing import Any
from telnetlib import Telnet
from telnetlib3 import Telnet
from contextlib import contextmanager
import logging
import re
Expand Down
10 changes: 4 additions & 6 deletions src/tests/rmqio/rmqio_asiotimer.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

#include <rmqio_asioeventloop.h>

#include <rmqtestutil_timeoverride.h>
#include <rmqtestutil_clockoverride.h>

#include <bdlf_bind.h>
#include <boost/asio.hpp>
Expand All @@ -32,9 +32,7 @@ using namespace rmqio;
using namespace ::testing;

namespace {
typedef rmqio::basic_AsioTimer<boost::asio::deadline_timer::time_type,
rmqtestutil::TimeOverride>
FakeAsioTimer;
typedef rmqio::basic_AsioTimer<rmqtestutil::ClockOverride> FakeAsioTimer;
} // namespace

class MockCallback {
Expand Down Expand Up @@ -70,7 +68,7 @@ TEST_F(AsioTimerTests, CallbackWhenExpires)
{
EXPECT_CALL(d_mockCallback, callback(Timer::EXPIRE)).Times(1);
d_timer->start(d_callback);
rmqtestutil::TimeOverride::step_time(boost::posix_time::seconds(10));
rmqtestutil::ClockOverride::step_time(boost::asio::chrono::seconds(10));
EXPECT_THAT(d_io.context().run_one(), Eq(1));
}

Expand All @@ -91,7 +89,7 @@ TEST_F(AsioTimerTests, Reset)
}
d_timer->start(d_callback);
d_timer->reset(bsls::TimeInterval(10));
rmqtestutil::TimeOverride::step_time(boost::posix_time::seconds(10));
rmqtestutil::ClockOverride::step_time(boost::asio::chrono::seconds(10));
EXPECT_THAT(d_io.context().run_one(), Eq(1));
EXPECT_THAT(d_io.context().run_one(), Eq(1));
}
Expand Down
2 changes: 1 addition & 1 deletion src/tests/rmqio/rmqio_backofflevelretrystrategy.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
#include <rmqio_backofflevelretrystrategy.h>

#include <rmqtestutil_callcount.h>
#include <rmqtestutil_clockoverride.h>
#include <rmqtestutil_mocktimerfactory.h>
#include <rmqtestutil_timeoverride.h>

#include <bdlf_bind.h>

Expand Down
2 changes: 1 addition & 1 deletion src/tests/rmqtestutil/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
add_library(rmqtestutil
rmqtestutil.m.cpp
rmqtestutil_callcount.cpp
rmqtestutil_clockoverride.cpp
rmqtestutil_mockchannel.t.cpp
rmqtestutil_mockeventloop.t.cpp
rmqtestutil_mockmetricpublisher.cpp
Expand All @@ -10,7 +11,6 @@ add_library(rmqtestutil
rmqtestutil_replayframe.cpp
rmqtestutil_savethreadid.cpp
rmqtestutil_timedmetric.cpp
rmqtestutil_timeoverride.cpp
)

target_link_libraries(rmqtestutil PUBLIC
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <rmqtestutil_timeoverride.h>
#include <rmqtestutil_clockoverride.h>

namespace BloombergLP {
namespace rmqtestutil {

TimeOverride::duration_type TimeOverride::d_timeOffset =
boost::posix_time::seconds(0);
ClockOverride::duration ClockOverride::d_timeOffset =
boost::asio::chrono::seconds(0);

}
} // namespace BloombergLP
Original file line number Diff line number Diff line change
Expand Up @@ -13,41 +13,41 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef INCLUDED_RMQTESTUTIL_TIMEOVERRIDE
#define INCLUDED_RMQTESTUTIL_TIMEOVERRIDE
#ifndef INCLUDED_RMQTESTUTIL_CLOCKOVERRIDE
#define INCLUDED_RMQTESTUTIL_CLOCKOVERRIDE

#include <boost/asio.hpp>
#include <rmqio_asiotimer.h>

//@PURPOSE: Provides TimeOverride class extended from
// boost::asio::deadline_timer::traits_type
//@PURPOSE: Provides ClockOverride class extended from
// rmqio::DefaultClockType
//
//@CLASSES:
// rmqtestutil::TimeOverride: Steps time in testing to trigger handler waiting
// on deadline_timer
// rmqtestutil::ClockOverride: Steps time in testing to trigger handler waiting
// on timer

namespace BloombergLP {
namespace rmqtestutil {

class TimeOverride : public boost::asio::deadline_timer::traits_type {
class ClockOverride : public rmqio::DefaultClockType {
public:
static time_type now()
static void step_time(duration t) { d_timeOffset += t; }

static time_point now()
{
return add(boost::asio::deadline_timer::traits_type::now(),
d_timeOffset);
return rmqio::DefaultClockType::now() + d_timeOffset;
}
static void step_time(duration_type t) { d_timeOffset += t; }
static boost::posix_time::time_duration to_posix_duration(duration_type d)
static duration to_wait_duration(duration d)
{
// This is the secret sauce to ensure that boost::asio keeps calling
// `now()` and responds to our adjustments via `step_time`
return d < boost::posix_time::milliseconds(1)
return d < boost::asio::chrono::milliseconds(1)
? d
: boost::posix_time::milliseconds(1);
: boost::asio::chrono::milliseconds(1);
}

static duration_type d_timeOffset;
static duration d_timeOffset;
};

} // namespace rmqtestutil
} // namespace BloombergLP

Expand Down