-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathvisualize_overlap_buffer.py
More file actions
46 lines (34 loc) · 1.21 KB
/
visualize_overlap_buffer.py
File metadata and controls
46 lines (34 loc) · 1.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from collections import deque
import numpy as np
import time
'''
Simple test to visualize the overlapping buffer
'''
def process():
print(f'processing_buffer: {processing_buffer}')
out = [x for x in processing_buffer]
available_chunks = len(out)
[processed_buffer.append(x[0]) for x in np.array_split(out, available_chunks)[overlap_size:available_chunks-overlap_size]]
for i in range(overlap_size*2):
overlap_buffer.append(processing_buffer.pop())
window_size = 8
overlap_size = 1
pending_buffer = deque(maxlen=window_size)
processing_buffer = deque(maxlen=window_size + (overlap_size*2))
processed_buffer = deque(maxlen=window_size)
overlap_buffer = deque(maxlen=overlap_size*2)
counter = 0
while True:
pending_buffer.append(counter)
counter += 1
if len(pending_buffer) >= window_size-2:
processing_buffer.clear()
while len(overlap_buffer) > 0:
processing_buffer.append(overlap_buffer.pop())
for i in range(window_size-2):
processing_buffer.append(pending_buffer.popleft())
process()
if len(processed_buffer) > 0:
print(f'processed_buffer: {processed_buffer}')
processed_buffer.clear()
time.sleep(0.2)