Skip to content

Commit bcfcf75

Browse files
committed
Improve data handling with buffer/offset tracking and use timeout
1 parent c8baca3 commit bcfcf75

2 files changed

Lines changed: 123 additions & 56 deletions

File tree

src/NimBLEStream.cpp

Lines changed: 114 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -127,19 +127,29 @@ bool NimBLEStream::begin() {
127127
}
128128

129129
bool NimBLEStream::end() {
130+
// Release any buffered RX item
131+
if (m_rxState.item && m_rxBuf) {
132+
vRingbufferReturnItem(m_rxBuf, m_rxState.item);
133+
m_rxState.item = nullptr;
134+
}
135+
m_rxState.itemSize = 0;
136+
m_rxState.offset = 0;
137+
130138
if (m_txTask) {
131139
vTaskDelete(m_txTask);
132140
m_txTask = nullptr;
133141
}
142+
134143
if (m_txBuf) {
135144
vRingbufferDelete(m_txBuf);
136145
m_txBuf = nullptr;
137146
}
147+
138148
if (m_rxBuf) {
139149
vRingbufferDelete(m_rxBuf);
140150
m_rxBuf = nullptr;
141151
}
142-
m_hasPeek = false;
152+
143153
return true;
144154
}
145155

@@ -148,22 +158,48 @@ size_t NimBLEStream::write(const uint8_t* data, size_t len) {
148158
return 0;
149159
}
150160

151-
ble_npl_time_t timeout = 0;
152-
ble_npl_time_ms_to_ticks(getTimeout(), &timeout);
153-
size_t chunk = std::min(len, xRingbufferGetCurFreeSize(m_txBuf));
154-
if (xRingbufferSend(m_txBuf, data, chunk, static_cast<TickType_t>(timeout)) != pdTRUE) {
155-
return 0;
161+
ble_npl_time_t timeout_ticks = 0;
162+
ble_npl_time_ms_to_ticks(getTimeout(), &timeout_ticks);
163+
ble_npl_time_t deadline = ble_npl_time_get() + timeout_ticks;
164+
size_t sent = 0;
165+
166+
while (sent < len) {
167+
size_t available = xRingbufferGetCurFreeSize(m_txBuf);
168+
if (available == 0) {
169+
// Check timeout
170+
if (timeout_ticks > 0 && ble_npl_time_get() >= deadline) {
171+
break; // Timeout reached
172+
}
173+
ble_npl_time_delay(ble_npl_time_ms_to_ticks32(1));
174+
continue;
175+
}
176+
177+
size_t chunk = std::min(len - sent, available);
178+
if (xRingbufferSend(m_txBuf, data + sent, chunk, 0) != pdTRUE) {
179+
break;
180+
}
181+
sent += chunk;
156182
}
157-
return chunk;
183+
184+
return sent;
158185
}
159186

160187
size_t NimBLEStream::availableForWrite() const {
161188
return m_txBuf ? xRingbufferGetCurFreeSize(m_txBuf) : 0;
162189
}
163190

164-
void NimBLEStream::flush() {
191+
void NimBLEStream::flush(uint32_t timeout_ms) {
192+
if (!m_txBuf) {
193+
return;
194+
}
195+
196+
ble_npl_time_t deadline = timeout_ms > 0 ? ble_npl_time_get() + ble_npl_time_ms_to_ticks32(timeout_ms) : 0;
197+
165198
// Wait until TX ring is drained
166199
while (m_txBuf && xRingbufferGetCurFreeSize(m_txBuf) < m_txBufSize) {
200+
if (deadline > 0 && ble_npl_time_get() >= deadline) {
201+
break;
202+
}
167203
ble_npl_time_delay(ble_npl_time_ms_to_ticks32(1));
168204
}
169205
}
@@ -174,65 +210,76 @@ int NimBLEStream::available() {
174210
return 0;
175211
}
176212

177-
if (m_hasPeek) {
178-
return 1; // at least the peeked byte
179-
}
213+
// Count buffered RX item remainder
214+
size_t buffered = m_rxState.itemSize > m_rxState.offset ? m_rxState.itemSize - m_rxState.offset : 0;
180215

181216
// Query items in RX ring
182217
UBaseType_t waiting = 0;
183218
vRingbufferGetInfo(m_rxBuf, nullptr, nullptr, nullptr, nullptr, &waiting);
184-
return static_cast<int>(waiting);
219+
220+
return static_cast<int>(buffered + waiting);
185221
}
186222

187223
int NimBLEStream::read() {
188224
if (!m_rxBuf) {
189225
return -1;
190226
}
191227

192-
// Return peeked byte if available
193-
if (m_hasPeek) {
194-
m_hasPeek = false;
195-
return static_cast<int>(m_peekByte);
228+
// Return from buffered item if available
229+
if (m_rxState.item && m_rxState.offset < m_rxState.itemSize) {
230+
uint8_t byte = m_rxState.item[m_rxState.offset++];
231+
232+
// Release item if we've consumed it all
233+
if (m_rxState.offset >= m_rxState.itemSize) {
234+
vRingbufferReturnItem(m_rxBuf, m_rxState.item);
235+
m_rxState.item = nullptr;
236+
m_rxState.itemSize = 0;
237+
m_rxState.offset = 0;
238+
}
239+
240+
return static_cast<int>(byte);
196241
}
197242

243+
// Fetch next item from ringbuffer
198244
size_t itemSize = 0;
199245
uint8_t* item = static_cast<uint8_t*>(xRingbufferReceive(m_rxBuf, &itemSize, 0));
200-
if (!item || itemSize == 0) return -1;
201-
202-
uint8_t byte = item[0];
203-
204-
// If item has more bytes, put the rest back
205-
if (itemSize > 1) {
206-
xRingbufferSend(m_rxBuf, item + 1, itemSize - 1, 0);
246+
if (!item || itemSize == 0) {
247+
return -1;
207248
}
208249

209-
vRingbufferReturnItem(m_rxBuf, item);
210-
return static_cast<int>(byte);
250+
// Store in buffer state and return first byte
251+
m_rxState.item = item;
252+
m_rxState.itemSize = itemSize;
253+
m_rxState.offset = 1; // Already consumed first byte
254+
255+
return static_cast<int>(item[0]);
211256
}
212257

213258
int NimBLEStream::peek() {
214259
if (!m_rxBuf) {
215260
return -1;
216261
}
217262

218-
if (m_hasPeek) {
219-
return static_cast<int>(m_peekByte);
220-
}
221-
222-
size_t itemSize = 0;
223-
uint8_t* item = static_cast<uint8_t*>(xRingbufferReceive(m_rxBuf, &itemSize, 0));
224-
if (!item || itemSize == 0) {
225-
return -1;
263+
// Return from buffered item if available
264+
if (m_rxState.item && m_rxState.offset < m_rxState.itemSize) {
265+
return static_cast<int>(m_rxState.item[m_rxState.offset]);
226266
}
227267

228-
m_peekByte = item[0];
229-
m_hasPeek = true;
268+
// Fetch next item from ringbuffer if not already buffered
269+
if (!m_rxState.item) {
270+
size_t itemSize = 0;
271+
uint8_t* item = static_cast<uint8_t*>(xRingbufferReceive(m_rxBuf, &itemSize, 0));
272+
if (!item || itemSize == 0) {
273+
return -1;
274+
}
230275

231-
// Put the entire item back
232-
xRingbufferSend(m_rxBuf, item, itemSize, 0);
233-
vRingbufferReturnItem(m_rxBuf, item);
276+
// Store in buffer state
277+
m_rxState.item = item;
278+
m_rxState.itemSize = itemSize;
279+
m_rxState.offset = 0;
280+
}
234281

235-
return static_cast<int>(m_peekByte);
282+
return static_cast<int>(m_rxState.item[m_rxState.offset]);
236283
}
237284

238285
size_t NimBLEStream::read(uint8_t* buffer, size_t len) {
@@ -242,13 +289,28 @@ size_t NimBLEStream::read(uint8_t* buffer, size_t len) {
242289

243290
size_t total = 0;
244291

245-
// Consume peeked byte first if present
246-
if (m_hasPeek && total < len) {
247-
buffer[total++] = m_peekByte;
248-
m_hasPeek = false;
292+
// First, consume any buffered RX item remainder
293+
if (m_rxState.item && m_rxState.offset < m_rxState.itemSize) {
294+
size_t available = m_rxState.itemSize - m_rxState.offset;
295+
size_t copyLen = std::min(len, available);
296+
memcpy(buffer, m_rxState.item + m_rxState.offset, copyLen);
297+
m_rxState.offset += copyLen;
298+
total += copyLen;
299+
300+
// Release item if fully consumed
301+
if (m_rxState.offset >= m_rxState.itemSize) {
302+
vRingbufferReturnItem(m_rxBuf, m_rxState.item);
303+
m_rxState.item = nullptr;
304+
m_rxState.itemSize = 0;
305+
m_rxState.offset = 0;
306+
}
307+
308+
if (total >= len) {
309+
return total;
310+
}
249311
}
250312

251-
// Drain RX ringbuffer items up to requested length (non-blocking)
313+
// Drain additional RX ringbuffer items
252314
while (total < len) {
253315
size_t itemSize = 0;
254316
uint8_t* item = static_cast<uint8_t*>(xRingbufferReceive(m_rxBuf, &itemSize, 0));
@@ -260,12 +322,15 @@ size_t NimBLEStream::read(uint8_t* buffer, size_t len) {
260322
memcpy(buffer + total, item, copyLen);
261323
total += copyLen;
262324

263-
// If there are leftover bytes from this item, push them back to RX
264-
if (itemSize > copyLen) {
265-
xRingbufferSend(m_rxBuf, item + copyLen, itemSize - copyLen, 0);
325+
// If we didn't consume the entire item, buffer it
326+
if (copyLen < itemSize) {
327+
m_rxState.item = item;
328+
m_rxState.itemSize = itemSize;
329+
m_rxState.offset = copyLen;
330+
} else {
331+
// Item fully consumed
332+
vRingbufferReturnItem(m_rxBuf, item);
266333
}
267-
268-
vRingbufferReturnItem(m_rxBuf, item);
269334
}
270335

271336
return total;
@@ -277,9 +342,6 @@ size_t NimBLEStream::pushRx(const uint8_t* data, size_t len) {
277342
return 0;
278343
}
279344

280-
// Clear peek state when new data arrives
281-
m_hasPeek = false;
282-
283345
if (xRingbufferSend(m_rxBuf, data, len, 0) != pdTRUE) {
284346
NIMBLE_UART_LOGE(LOG_TAG, "RX buffer full, dropping %u bytes", len);
285347
return 0;

src/NimBLEStream.h

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ class NimBLEStream : public Stream {
8484
}
8585

8686
size_t availableForWrite() const;
87-
void flush() override;
87+
void flush(uint32_t timeout_ms = 0);
8888

8989
// Stream RX methods
9090
virtual int available() override;
@@ -108,6 +108,13 @@ class NimBLEStream : public Stream {
108108
// Push received data into RX ring (called by subclass callbacks)
109109
size_t pushRx(const uint8_t* data, size_t len);
110110

111+
// RX buffering state: avoids requeueing/fragmentation
112+
struct RxState {
113+
uint8_t* item{nullptr};
114+
size_t itemSize{0};
115+
size_t offset{0};
116+
};
117+
111118
RingbufHandle_t m_txBuf{nullptr};
112119
RingbufHandle_t m_rxBuf{nullptr};
113120
TaskHandle_t m_txTask{nullptr};
@@ -116,9 +123,7 @@ class NimBLEStream : public Stream {
116123
uint32_t m_txBufSize{1024};
117124
uint32_t m_rxBufSize{1024};
118125

119-
// RX peek state
120-
mutable uint8_t m_peekByte{0};
121-
mutable bool m_hasPeek{false};
126+
mutable RxState m_rxState{}; // Track current RX item to avoid requeueing
122127
};
123128

124129
# if MYNEWT_VAL(BLE_ROLE_PERIPHERAL)

0 commit comments

Comments
 (0)