Skip to content
34 changes: 34 additions & 0 deletions example/http_c++/http_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
#include <gflags/gflags.h>
#include <butil/logging.h>
#include <brpc/channel.h>
#include "bthread/countdown_event.h"

DEFINE_string(d, "", "POST this data to the http server");
DEFINE_bool(progressive, false, "whether or not progressive read data from server");
DEFINE_int32(progressive_read_timeout_ms, 5000, "progressive read data idle timeout in milliseconds");
DEFINE_string(load_balancer, "", "The algorithm for load balancing");
DEFINE_int32(timeout_ms, 2000, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
Expand All @@ -36,6 +39,25 @@ namespace brpc {
DECLARE_bool(http_verbose);
}

class PartDataReader: public brpc::ProgressiveReader {
public:
explicit PartDataReader(bthread::CountdownEvent* done): _done(done){}

butil::Status OnReadOnePart(const void* data, size_t length) {
memcpy(_buffer, data, length);
LOG(INFO) << "data : " << _buffer << " size : " << length;
return butil::Status::OK();
}

void OnEndOfMessage(const butil::Status& status) {
_done->signal();
LOG(INFO) << "progressive read data final status : " << status;
}
private:
char _buffer[1024];
bthread::CountdownEvent* _done;
};

int main(int argc, char* argv[]) {
// Parse gflags. We recommend you to use gflags as well.
GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true);
Expand Down Expand Up @@ -71,13 +93,25 @@ int main(int argc, char* argv[]) {
cntl.request_attachment().append(FLAGS_d);
}

if (FLAGS_progressive) {
cntl.set_progressive_read_timeout_ms(FLAGS_progressive_read_timeout_ms);
cntl.response_will_be_read_progressively();
}

// Because `done'(last parameter) is NULL, this function waits until
// the response comes back or error occurs(including timedout).
channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
std::cerr << cntl.ErrorText() << std::endl;
return -1;
}

if (FLAGS_progressive) {
bthread::CountdownEvent done(1);
cntl.ReadProgressiveAttachmentBy(new PartDataReader(&done));
done.wait();
LOG(INFO) << "wait client progressive read done safely";
}
// If -http_verbose is on, brpc already prints the response to stderr.
if (!brpc::FLAGS_http_verbose) {
std::cout << cntl.response_attachment() << std::endl;
Expand Down
7 changes: 7 additions & 0 deletions example/http_c++/http_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
DEFINE_string(certificate, "cert.pem", "Certificate file path to enable SSL");
DEFINE_string(private_key, "key.pem", "Private key file path to enable SSL");
DEFINE_string(ciphers, "", "Cipher suite used for SSL connections");
DEFINE_bool(enable_progressive_timeout, false, "whether or not trigger progressive write attachement data timeout");

namespace example {

Expand Down Expand Up @@ -104,6 +105,9 @@ class FileServiceImpl : public FileService {

// sleep a while to send another part.
bthread_usleep(10000);
if (FLAGS_enable_progressive_timeout && i > 50) {
bthread_usleep(100000000UL);
}
}
return NULL;
}
Expand Down Expand Up @@ -194,6 +198,9 @@ class HttpSSEServiceImpl : public HttpSSEService {

// sleep a while to send another part.
bthread_usleep(10000 * 10);
if (FLAGS_enable_progressive_timeout && i > 50) {
bthread_usleep(100000000UL);
}
}
return NULL;
}
Expand Down
92 changes: 91 additions & 1 deletion src/brpc/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ DEFINE_bool(graceful_quit_on_sigterm, false,
"Register SIGTERM handle func to quit graceful");
DEFINE_bool(graceful_quit_on_sighup, false,
"Register SIGHUP handle func to quit graceful");

DEFINE_bool(log_idle_progressive_read_close, false,
"Print log when an idle progressive read is closed");
const IdlNames idl_single_req_single_res = { "req", "res" };
const IdlNames idl_single_req_multi_res = { "req", "" };
const IdlNames idl_multi_req_single_res = { "", "res" };
Expand Down Expand Up @@ -174,6 +175,80 @@ class IgnoreAllRead : public ProgressiveReader {
void OnEndOfMessage(const butil::Status&) {}
};

class ProgressiveTimeoutReader : public ProgressiveReader {
public:
explicit ProgressiveTimeoutReader(SocketId id, int32_t read_timeout_ms, ProgressiveReader* reader):
_socket_id(id),
_read_timeout_ms(read_timeout_ms),
_reader(reader),
_timeout_id(0),
_is_read_timeout(false) {
AddIdleReadTimeoutMonitor();
}

~ProgressiveTimeoutReader() {
if(_timeout_id > 0) {
bthread_timer_del(_timeout_id);
}
}

butil::Status OnReadOnePart(const void* data, size_t length) {
return _reader->OnReadOnePart(data, length);
}

void OnEndOfMessage(const butil::Status& status) {
if (_is_read_timeout) {
_reader->OnEndOfMessage(butil::Status(EPROGREADTIMEOUT, "The progressive read timeout"));
} else {
_reader->OnEndOfMessage(status);
}
if(_timeout_id > 0) {
bthread_timer_del(_timeout_id);
_timeout_id = 0;
}
}

private:
static void HandleIdleProgressiveReader(void* arg) {
if(arg == nullptr){
LOG(ERROR) << "Controller::HandleIdleProgressiveReader arg is null.";
return;
}
ProgressiveTimeoutReader* reader = static_cast<ProgressiveTimeoutReader*>(arg);
SocketUniquePtr s;
if (Socket::Address(reader->_socket_id, &s) != 0) {
LOG(ERROR) << "not found the socket id : " << reader->_socket_id;
return;
}
auto log_idle = FLAGS_log_idle_progressive_read_close;
reader->_is_read_timeout = true;
LOG_IF(INFO, log_idle) << "progressive read timeout socket id : " << reader->_socket_id
<< " progressive read timeout us : " << reader->_read_timeout_ms;
if (s->parsing_context() != NULL) {
s->parsing_context()->Destroy();
}
s->ReleaseReferenceIfIdle(0);
}
void AddIdleReadTimeoutMonitor() {
if (_read_timeout_ms <= 0) {
return;
}
bthread_timer_add(&_timeout_id,
butil::milliseconds_from_now(_read_timeout_ms),
HandleIdleProgressiveReader,
this
);
}

private:
SocketId _socket_id;
int32_t _read_timeout_ms;
ProgressiveReader* _reader;
// Timer registered to trigger progressive timeout event
bthread_timer_t _timeout_id;
butil::atomic<bool> _is_read_timeout;
};

static IgnoreAllRead* s_ignore_all_read = NULL;
static pthread_once_t s_ignore_all_read_once = PTHREAD_ONCE_INIT;
static void CreateIgnoreAllRead() { s_ignore_all_read = new IgnoreAllRead; }
Expand Down Expand Up @@ -260,6 +335,7 @@ void Controller::ResetPods() {
_backup_request_ms = UNSET_MAGIC_NUM;
_backup_request_policy = NULL;
_connect_timeout_ms = UNSET_MAGIC_NUM;
_progressive_read_timeout_ms = UNSET_MAGIC_NUM;
_real_timeout_ms = UNSET_MAGIC_NUM;
_deadline_us = -1;
_timeout_id = 0;
Expand Down Expand Up @@ -331,6 +407,15 @@ void Controller::Call::Reset() {
stream_user_data = NULL;
}

void Controller::set_progressive_read_timeout_ms(int32_t progressive_read_timeout_ms){
if(progressive_read_timeout_ms <= 0x7fffffff){
_progressive_read_timeout_ms = progressive_read_timeout_ms;
} else {
_progressive_read_timeout_ms = 0x7fffffff;
LOG(WARNING) << "progressive_read_timeout_seconds is limited to 0x7fffffff";
}
}

void Controller::set_timeout_ms(int64_t timeout_ms) {
if (timeout_ms <= 0x7fffffff) {
_timeout_ms = timeout_ms;
Expand Down Expand Up @@ -1028,6 +1113,7 @@ void Controller::SubmitSpan() {
_span = NULL;
}


void Controller::HandleSendFailed() {
if (!FailedInline()) {
SetFailed("Must be SetFailed() before calling HandleSendFailed()");
Expand Down Expand Up @@ -1543,6 +1629,10 @@ void Controller::ReadProgressiveAttachmentBy(ProgressiveReader* r) {
__FUNCTION__));
}
add_flag(FLAGS_PROGRESSIVE_READER);
if (progressive_read_timeout_ms() > 0) {
auto reader = new ProgressiveTimeoutReader(_rpa->GetSocketId(), _progressive_read_timeout_ms, r);
return _rpa->ReadProgressiveAttachmentBy(reader);
}
return _rpa->ReadProgressiveAttachmentBy(r);
}

Expand Down
10 changes: 7 additions & 3 deletions src/brpc/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
#include "brpc/grpc.h"
#include "brpc/kvmap.h"
#include "brpc/rpc_dump.h"

// EAUTH is defined in MAC
#ifndef EAUTH
#define EAUTH ERPCAUTH
Expand Down Expand Up @@ -163,7 +162,6 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
uint64_t log_id;
std::string request_id;
};

public:
Controller();
Controller(const Inheritable& parent_ctx);
Expand All @@ -177,6 +175,9 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);

// Set/get timeout in milliseconds for the RPC call. Use
// ChannelOptions.timeout_ms on unset.
void set_progressive_read_timeout_ms(int32_t progressive_read_timeout_ms);
int32_t progressive_read_timeout_ms() const { return _progressive_read_timeout_ms; }

void set_timeout_ms(int64_t timeout_ms);
int64_t timeout_ms() const { return _timeout_ms; }

Expand Down Expand Up @@ -323,7 +324,9 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);

// Make the RPC end when the HTTP response has complete headers and let
// user read the remaining body by using ReadProgressiveAttachmentBy().
void response_will_be_read_progressively() { add_flag(FLAGS_READ_PROGRESSIVELY); }
void response_will_be_read_progressively() {
add_flag(FLAGS_READ_PROGRESSIVELY);
}
// Make the RPC end when the HTTP request has complete headers and let
// user read the remaining body by using ReadProgressiveAttachmentBy().
void request_will_be_read_progressively() { add_flag(FLAGS_READ_PROGRESSIVELY); }
Expand Down Expand Up @@ -837,6 +840,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
int32_t _timeout_ms;
int32_t _connect_timeout_ms;
int32_t _backup_request_ms;
int32_t _progressive_read_timeout_ms;
// Priority: `_backup_request_policy' > `_backup_request_ms'.
BackupRequestPolicy* _backup_request_policy;
// If this rpc call has retry/backup request,this var save the real timeout for current call
Expand Down
3 changes: 2 additions & 1 deletion src/brpc/errno.proto
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ enum Errno {
ESSL = 1016; // SSL related error
EH2RUNOUTSTREAMS = 1017; // The H2 socket was run out of streams
EREJECT = 1018; // The Request is rejected

EPROGREADTIMEOUT = 1019; // The Progressive read timeout

// Errno caused by server
EINTERNAL = 2001; // Internal Server Error
ERESPONSE = 2002; // Bad Response
Expand Down
1 change: 1 addition & 0 deletions src/brpc/policy/http_rpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1203,6 +1203,7 @@ ParseResult ParseHttpMessage(butil::IOBuf *source, Socket *socket,
LOG(FATAL) << "Fail to new HttpContext";
return MakeParseError(PARSE_ERROR_NO_RESOURCE);
}
http_imsg->SetSocketId(socket->id());
// Parsing http is costly, parsing an incomplete http message from the
// beginning repeatedly should be avoided, otherwise the cost may reach
// O(n^2) in the worst case. Save incomplete http messages in sockets
Expand Down
12 changes: 11 additions & 1 deletion src/brpc/policy/http_rpc_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,20 @@ class HttpContext : public ReadableProgressiveAttachment
, public InputMessageBase
, public HttpMessage {
public:
SocketId GetSocketId() override {
return _socket_id;
}

void SetSocketId(SocketId id) {
_socket_id = id;
}

explicit HttpContext(bool read_body_progressively,
HttpMethod request_method = HTTP_METHOD_GET)
: InputMessageBase()
, HttpMessage(read_body_progressively, request_method)
, _is_stage2(false) {
, _is_stage2(false)
, _socket_id(0) {
// add one ref for Destroy
butil::intrusive_ptr<HttpContext>(this).detach();
}
Expand Down Expand Up @@ -122,6 +131,7 @@ class HttpContext : public ReadableProgressiveAttachment

private:
bool _is_stage2;
SocketId _socket_id;
};

// Implement functions required in protocol.h
Expand Down
2 changes: 2 additions & 0 deletions src/brpc/progressive_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#define BRPC_PROGRESSIVE_READER_H

#include "brpc/shared_object.h"
#include "brpc/socket.h"


namespace brpc {
Expand Down Expand Up @@ -84,6 +85,7 @@ class ReadableProgressiveAttachment : public SharedObject {
// Any error occurred should destroy the reader by calling r->Destroy().
// r->Destroy() should be guaranteed to be called once and only once.
virtual void ReadProgressiveAttachmentBy(ProgressiveReader* r) = 0;
virtual SocketId GetSocketId() = 0;
};

} // namespace brpc
Expand Down
Loading