Skip to content

Commit 4cf3eac

Browse files
committed
initial frame for design change - now chunks can refer to DeltaLists as well, which is required to get the copying right. This surely makes things more coplex, but should still result in a performance improvement
1 parent 48fdcf4 commit 4cf3eac

File tree

2 files changed

+75
-23
lines changed

2 files changed

+75
-23
lines changed

fun.py

Lines changed: 73 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -86,13 +86,14 @@ class DeltaChunk(object):
8686
'to', # start offset in the target buffer in bytes
8787
'ts', # size of this chunk in the target buffer in bytes
8888
'so', # start offset in the source buffer in bytes or None
89-
'data' # chunk of bytes to be added to the target buffer or None
89+
'data' # chunk of bytes to be added to the target buffer,
90+
# DeltaChunkList to use as base, or None
9091
)
9192

9293
def __init__(self, to, ts, so, data):
9394
self.to = to
9495
self.ts = ts
95-
self.so = so
96+
self.so = sos
9697
self.data = data
9798

9899
def __repr__(self):
@@ -103,11 +104,15 @@ def __repr__(self):
103104
def rbound(self):
104105
return self.to + self.ts
105106

107+
def has_data(self):
108+
""":return: True if the instance has data to add to the target stream"""
109+
return self.data is None or not isinstance(self.data, DeltaChunkList)
110+
106111
def apply(self, source, write):
107112
"""Apply own data to the target buffer
108113
:param source: buffer providing source bytes for copy operations
109114
:param write: write method to call with data to write"""
110-
if self.data is None:
115+
if self.has_data():
111116
# COPY DATA FROM SOURCE
112117
assert len(source) - self.so - self.ts > 0
113118
write(buffer(source, self.so, self.ts))
@@ -166,7 +171,7 @@ def _split_delta(dcl, d, di, relofs, insert_offset=0):
166171
def _handle_merge(ld, rd):
167172
"""Optimize the layout of the lhs delta and the rhs delta
168173
TODO: Once the default implementation is working"""
169-
if d.data is None:
174+
if d.has_data():
170175
if od.data:
171176
# OVERWRITE DATA
172177
pass
@@ -217,14 +222,15 @@ def _merge_delta(dcl, dc):
217222
_move_delta_lbound(cd, dc.rbound() - cd.to)
218223
break
219224
else:
225+
# xx.|---|
220226
# WE DON'T OVERLAP IT
221227
# this can actually happen, once multiple streams are merged
222228
break
223229
# END rbound overlap handling
224230
# END lbound overlap handling
225231
else:
226232
if dc.to >= cd.rbound():
227-
#|---|...xx
233+
#|---|xx
228234
break
229235
# END
230236

@@ -269,9 +275,30 @@ def _merge_delta(dcl, dc):
269275
class DeltaChunkList(list):
270276
"""List with special functionality to deal with DeltaChunks"""
271277

272-
def terminate_at(self, size):
278+
def init(self, size):
279+
"""Intialize this instance with chunks defining to fill up size from a base
280+
buffer of equal size
281+
:return: self"""
282+
if len(self) != 0:
283+
return
284+
# pretend we have one huge delta chunk, which just copies everything
285+
# from source to destination
286+
maxint32 = 2**32
287+
for x in range(0, size, maxint32):
288+
self.append(DeltaChunk(x, maxint32, x, None))
289+
# END create copy chunks
290+
offset = x*maxint32
291+
remainder = size-offset
292+
if remainder:
293+
self.append(DeltaChunk(offset, remainder, offset, None))
294+
# END handle all done in loop
295+
296+
return self
297+
298+
def set_rbound(self, size):
273299
"""Chops the list at the given size, splitting and removing DeltaNodes
274-
as required"""
300+
as required
301+
:return: self"""
275302
di = _closest_index(self, size)
276303
d = self[di]
277304
rsize = size - d.to
@@ -283,6 +310,26 @@ def terminate_at(self, size):
283310
## DEBUG ##
284311
self.check_integrity(size)
285312

313+
return self
314+
315+
def connect_with(self, bdlc):
316+
"""Connect this instance's delta chunks virtually with the given base.
317+
This means that all copy deltas will simply apply to the given region
318+
of the given base. Afterwards, the base is optimized so that add-deltas
319+
will be truncated to the region actually used, or removed completely where
320+
adequate. This way, memory usage is reduced.
321+
:param bdlc: DeltaChunkList to serve as base"""
322+
raise NotImplementedError("todo")
323+
324+
def apply(self, bbuf, write):
325+
"""Apply the chain's changes and write the final result using the passed
326+
write function.
327+
:param bbuf: base buffer containing the base of all deltas contained in this
328+
list. It will only be used if the chunk in question does not have a base
329+
chain.
330+
:param write: function taking a string of bytes to write to the output"""
331+
raise NotImplementedError("todo")
332+
286333
def check_integrity(self, target_size=-1):
287334
"""Verify the list has non-overlapping chunks only, and the total size matches
288335
target_size
@@ -437,31 +484,31 @@ def stream_copy(read, write, size, chunk_size):
437484
def reverse_merge_deltas(dcl, dstreams):
438485
"""Read the condensed delta chunk information from dstream and merge its information
439486
into a list of existing delta chunks
440-
:param dcl: see merge_deltas
487+
:param dcl: see 3
441488
:param dstreams: iterable of delta stream objects. They must be ordered latest first,
442489
hence the delta to be applied last comes first, then its ancestors
443490
:return: None"""
444491
raise NotImplementedError("This is left out up until we actually iterate the dstreams - they are prefetched right now")
445492

446-
def merge_deltas(dcl, dstreams):
493+
def merge_deltas(dstreams):
447494
"""Read the condensed delta chunk information from dstream and merge its information
448495
into a list of existing delta chunks
449-
:param dcl: DeltaChunkList, may be empty initially, and will be changed
450-
during the merge process
451496
:param dstreams: iterable of delta stream objects. They must be ordered latest last,
452497
hence the delta to be applied last comes last, its oldest ancestor first
453-
:return: None"""
498+
:return: DeltaChunkList, containing all operations to apply"""
499+
bdcl = None # data chunk list for initial base
500+
dcl = DeltaChunkList()
454501
for dsi, ds in enumerate(dstreams):
455502
# print "Stream", dsi
456503
db = ds.read()
457504
delta_buf_size = ds.size
458505

459506
# read header
460-
i, src_size = msb_size(db)
507+
i, base_size = msb_size(db)
461508
i, target_size = msb_size(db, i)
462509

463510
# interpret opcodes
464-
tbw = 0 # amount of target bytes written
511+
tbw = 0 # amount of target bytes written
465512
while i < delta_buf_size:
466513
c = ord(db[i])
467514
i += 1
@@ -494,23 +541,31 @@ def merge_deltas(dcl, dstreams):
494541

495542
rbound = cp_off + cp_size
496543
if (rbound < cp_size or
497-
rbound > src_size):
544+
rbound > base_size):
498545
break
499546

500-
_merge_delta(dcl, DeltaChunk(tbw, cp_size, cp_off, None))
547+
# _merge_delta(dcl, DeltaChunk(tbw, cp_size, cp_off, None))
548+
dcl.append(DeltaChunk(tbw, cp_size, cp_off, None))
501549
tbw += cp_size
502550
elif c:
503551
# TODO: Concatenate multiple deltachunks
504-
_merge_delta(dcl, DeltaChunk(tbw, c, 0, db[i:i+c]))
552+
# _merge_delta(dcl, DeltaChunk(tbw, c, 0, db[i:i+c]))
553+
dcl.append(DeltaChunk(tbw, c, 0, db[i:i+c]))
505554
i += c
506555
tbw += c
507556
else:
508557
raise ValueError("unexpected delta opcode 0")
509558
# END handle command byte
510559
# END while processing delta data
511560

512-
dcl.terminate_at(target_size)
561+
# merge the lists !
562+
if base is not None:
563+
dcl.connect_with(base)
564+
# END handle merge
513565

566+
# prepare next base
567+
base = dcl
568+
dcl = DeltaChunkList()
514569
# END for each delta stream
515570

516571
# print dcl

stream.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -325,8 +325,7 @@ def _set_cache_(self, attr):
325325
# Aggregate all deltas into one delta in reverse order. Hence we take
326326
# the last delta, and reverse-merge its ancestor delta, until we receive
327327
# the final delta data stream.
328-
dcl = DeltaChunkList()
329-
merge_deltas(dcl, reversed(self._dstreams))
328+
dcl = merge_deltas(reversed(self._dstreams))
330329

331330
if len(dcl) == 0:
332331
self._size = 0
@@ -342,9 +341,7 @@ def _set_cache_(self, attr):
342341

343342
# APPLY CHUNKS
344343
write = self._mm_target.write
345-
for dc in dcl:
346-
dc.apply(bbuf, write)
347-
# END for each deltachunk to apply
344+
dcl.apply(bbuf, write)
348345

349346
self._mm_target.seek(0)
350347

0 commit comments

Comments
 (0)