Skip to content

Commit e814fda

Browse files
committed
First frame to implement the actual data aggregation, but ... its probbaly going to change quite a lot again
1 parent cecb40c commit e814fda

File tree

2 files changed

+32
-16
lines changed

2 files changed

+32
-16
lines changed

fun.py

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ def _move_delta_lbound(d, bytes):
8383
return d
8484

8585
def delta_duplicate(src):
86-
return DeltaChunk(src.to, src.ts, src.so, src.data)
86+
return DeltaChunk(src.to, src.ts, src.so, src.data, src.flags)
8787

8888
def delta_chunk_apply(dc, bbuf, write):
8989
"""Apply own data to the target buffer
@@ -114,16 +114,18 @@ class DeltaChunk(object):
114114
'so', # start offset in the source buffer in bytes or None
115115
'data', # chunk of bytes to be added to the target buffer,
116116
# DeltaChunkList to use as base, or None
117+
'flags' # currently only True or False
117118
)
118119

119-
def __init__(self, to, ts, so, data):
120+
def __init__(self, to, ts, so, data, flags):
120121
self.to = to
121122
self.ts = ts
122123
self.so = so
123124
self.data = data
125+
self.flags = flags
124126

125127
def __repr__(self):
126-
return "DeltaChunk(%i, %i, %s, %s)" % (self.to, self.ts, self.so, self.data or "")
128+
return "DeltaChunk(%i, %i, %s, %s, %i)" % (self.to, self.ts, self.so, self.data or "", self.flags)
127129

128130
#{ Interface
129131

@@ -253,18 +255,24 @@ def size(self):
253255
""":return: size of bytes as measured by our delta chunks"""
254256
return self.rbound() - self.lbound()
255257

256-
def connect_with(self, bdcl):
258+
def connect_with(self, bdcl, tdcl):
257259
"""Connect this instance's delta chunks virtually with the given base.
258260
This means that all copy deltas will simply apply to the given region
259261
of the given base. Afterwards, the base is optimized so that add-deltas
260262
will be truncated to the region actually used, or removed completely where
261263
adequate. This way, memory usage is reduced.
262-
:param bdcl: DeltaChunkList to serve as base"""
263-
for dc in self:
264-
if not dc.has_data():
265-
dc.set_copy_chunklist(bdcl[dc.so:dc.ts])
266-
# END handle overlap
267-
# END for each dc
264+
:param bdcl: DeltaChunkList to serve as base
265+
:param tdcl: topmost delta chunk list. If set, reverse order is assumed
266+
and the list is connected more efficiently"""
267+
if tdcl is None:
268+
for dc in self:
269+
if not dc.has_data():
270+
dc.set_copy_chunklist(bdcl[dc.so:dc.ts])
271+
# END handle overlap
272+
# END for each dc
273+
else:
274+
raise NotImplementedError("todo")
275+
# END handle order
268276

269277
def apply(self, bbuf, write, lbound_offset=0, size=0):
270278
"""Only used by public clients, internally we only use the global routines
@@ -297,7 +305,7 @@ def compress(self):
297305

298306
del(self[first_data_index:i-1])
299307
buf = nd.getvalue()
300-
self.insert(first_data_index, DeltaChunk(so, len(buf), 0, buf))
308+
self.insert(first_data_index, DeltaChunk(so, len(buf), 0, buf, False))
301309

302310
slen = len(self)
303311
i = first_data_index + 1
@@ -522,14 +530,18 @@ def reverse_connect_deltas(dcl, dstreams):
522530
:return: None"""
523531
raise NotImplementedError("This is left out up until we actually iterate the dstreams - they are prefetched right now")
524532

525-
def connect_deltas(dstreams):
533+
def connect_deltas(dstreams, reverse):
526534
"""Read the condensed delta chunk information from dstream and merge its information
527535
into a list of existing delta chunks
528536
:param dstreams: iterable of delta stream objects. They must be ordered latest last,
529537
hence the delta to be applied last comes last, its oldest ancestor first
538+
:param reverse: If False, the given iterable of delta-streams returns
539+
items in from latest ancestor to the last delta.
540+
If True, deltas are ordered so that the one to be applied last comes first.
530541
:return: DeltaChunkList, containing all operations to apply"""
531542
bdcl = None # data chunk list for initial base
532543
dcl = DeltaChunkList()
544+
tdcl = None # topmost dcl, only effective if reverse is True
533545
for dsi, ds in enumerate(dstreams):
534546
# print "Stream", dsi
535547
db = ds.read()
@@ -576,12 +588,12 @@ def connect_deltas(dstreams):
576588
rbound > base_size):
577589
break
578590

579-
dcl.append(DeltaChunk(tbw, cp_size, cp_off, None))
591+
dcl.append(DeltaChunk(tbw, cp_size, cp_off, None, False))
580592
tbw += cp_size
581593
elif c:
582594
# NOTE: in C, the data chunks should probably be concatenated here.
583595
# In python, we do it as a post-process
584-
dcl.append(DeltaChunk(tbw, c, 0, db[i:i+c]))
596+
dcl.append(DeltaChunk(tbw, c, 0, db[i:i+c], False))
585597
i += c
586598
tbw += c
587599
else:
@@ -591,9 +603,13 @@ def connect_deltas(dstreams):
591603

592604
dcl.compress()
593605

606+
if reverse and tdcl is None:
607+
tdcl = dcl
608+
# END handle reverse
609+
594610
# merge the lists !
595611
if bdcl is not None:
596-
dcl.connect_with(bdcl)
612+
dcl.connect_with(bdcl, tdcl)
597613
# END handle merge
598614

599615
# dcl.check_integrity()

stream.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ def _set_cache_(self, attr):
338338
# Aggregate all deltas into one delta in reverse order. Hence we take
339339
# the last delta, and reverse-merge its ancestor delta, until we receive
340340
# the final delta data stream.
341-
dcl = connect_deltas(reversed(self._dstreams))
341+
dcl = connect_deltas(self._dstreams, reverse=True)
342342

343343
if len(dcl) == 0:
344344
self._size = 0

0 commit comments

Comments
 (0)