-
Notifications
You must be signed in to change notification settings - Fork 106
Expand file tree
/
Copy pathPABotBase2CC_ReliableStreamConnection.h
More file actions
150 lines (109 loc) · 4.02 KB
/
PABotBase2CC_ReliableStreamConnection.h
File metadata and controls
150 lines (109 loc) · 4.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
/* Reliable Stream Connection (CC)
*
* From: https://github.com/PokemonAutomation/
*
*/
#ifndef PokemonAutomation_PABotBase2CC_ReliableStreamConnection_H
#define PokemonAutomation_PABotBase2CC_ReliableStreamConnection_H
#include "Common/Cpp/Time.h"
#include "Common/Cpp/CancellableScope.h"
#include "Common/Cpp/Logging/AbstractLogger.h"
#include "Common/Cpp/Concurrency/Mutex.h"
#include "Common/Cpp/Concurrency/ConditionVariable.h"
#include "Common/Cpp/Concurrency/AsyncTask.h"
#include "Common/Cpp/Concurrency/ThreadPool.h"
#include "Common/Cpp/StreamConnections/PushingStreamConnections.h"
#include "PABotBase2_PacketSender.h"
#include "PABotBase2_PacketParser.h"
#include "PABotBase2_StreamCoalescer.h"
namespace PokemonAutomation{
namespace PABotBase2{
class ReliableStreamConnection final
: public CancellableScope
, public ReliableStreamConnectionPushing
, private UnreliableStreamSender
, private PacketRunner
, private StreamListener
{
public:
ReliableStreamConnection(
CancellableScope* parent,
Logger& logger, bool log_everything,
ThreadPool& thread_pool,
UnreliableStreamConnectionPushing& unreliable_connection,
WallDuration retransmit_timeout = Milliseconds(100),
Mutex* print_lock = nullptr
);
~ReliableStreamConnection();
virtual void stop() override{
cancel(nullptr);
}
virtual bool cancel(std::exception_ptr exception) noexcept override;
bool reset(WallDuration timeout = WallDuration::max());
bool remote_protocol_is_compatible() const{
return m_remote_protocol_compatible;
}
uint32_t remote_protocol() const{
return m_remote_protocol;
}
const std::string& error_string() const{
return m_error;
}
size_t pending() const;
bool wait_for_pending(WallDuration timeout = WallDuration::max());
public:
// Send in-band
bool try_send_request(uint8_t opcode);
void send_request(uint8_t opcode);
void send_stream(const void* data, size_t bytes){
reliable_send_blocking(data, bytes, WallDuration::max());
}
public:
// Debugging
void print() const;
Mutex* print_lock() const{
return m_print_lock;
}
private:
// Send
void send_ack(uint8_t seqnum, uint8_t opcode);
void send_ack_u16(uint8_t seqnum, uint8_t opcode, uint16_t data);
void retransmit_thread();
private:
// virtual size_t reliable_send_available() const override;
virtual size_t reliable_send_blocking(const void* data, size_t bytes, WallDuration timeout) override;
virtual void on_recv(const void* data, size_t bytes) override;
virtual size_t unreliable_send(const void* data, size_t bytes) override;
private:
// Virtuals: PacketRunner
virtual void on_packet(const PacketHeader* packet) override;
void process_UNKNOWN_OPCODE(const PacketHeader* packet);
void process_RET_RESET(const PacketHeader* packet);
void process_RET_VERSION(const PacketHeader* packet);
void process_RET_PACKET_SIZE(const PacketHeader* packet);
void process_RET_BUFFER_SLOTS(const PacketHeader* packet);
void process_RET_BUFFER_BYTES(const PacketHeader* packet);
void process_ASK_STREAM_DATA(const PacketHeader* packet);
void process_RET_STREAM_DATA(const PacketHeader* packet);
private:
Logger& m_logger;
UnreliableStreamConnectionPushing& m_unreliable_connection;
const WallDuration m_retransmit_timeout;
Mutex* m_print_lock;
PacketSender m_reliable_sender;
PacketParser m_parser;
StreamCoalescer m_stream_coalescer;
bool m_log_everything;
bool m_remote_protocol_compatible;
uint32_t m_remote_protocol;
// std::atomic<bool> m_version_verified;
uint8_t m_remote_slot_capacity;
uint16_t m_remote_buffer_capacity;
std::string m_error;
mutable Mutex m_lock;
ConditionVariable m_cv;
AsyncTask m_retransmit_thread;
};
}
}
#endif