Skip to content

Commit c0a1dc6

Browse files
committed
Merge branch 'reversedeltaaggregation'
2 parents cecb40c + 3837806 commit c0a1dc6

File tree

3 files changed

+124
-108
lines changed

3 files changed

+124
-108
lines changed

fun.py

Lines changed: 119 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ def _set_delta_rbound(d, size):
6262

6363
# NOTE: data is truncated automatically when applying the delta
6464
# MUST NOT DO THIS HERE
65-
6665
return d
6766

6867
def _move_delta_lbound(d, bytes):
@@ -76,7 +75,7 @@ def _move_delta_lbound(d, bytes):
7675
d.to += bytes
7776
d.so += bytes
7877
d.ts -= bytes
79-
if d.has_data():
78+
if d.data is not None:
8079
d.data = d.data[bytes:]
8180
# END handle data
8281

@@ -92,8 +91,6 @@ def delta_chunk_apply(dc, bbuf, write):
9291
if dc.data is None:
9392
# COPY DATA FROM SOURCE
9493
write(buffer(bbuf, dc.so, dc.ts))
95-
elif isinstance(dc.data, DeltaChunkList):
96-
delta_list_apply(dc.data, bbuf, write, dc.so, dc.ts)
9794
else:
9895
# APPEND DATA
9996
# whats faster: if + 4 function calls or just a write with a slice ?
@@ -105,6 +102,7 @@ def delta_chunk_apply(dc, bbuf, write):
105102
# END handle truncation
106103
# END handle chunk mode
107104

105+
108106
class DeltaChunk(object):
109107
"""Represents a piece of a delta, it can either add new data, or copy existing
110108
one from a source buffer"""
@@ -127,36 +125,12 @@ def __repr__(self):
127125

128126
#{ Interface
129127

130-
def copy_offset(self):
131-
""":return: offset to apply when copying from a base buffer, or 0
132-
if this is not a copying delta chunk"""
133-
134-
if self.data is not None:
135-
if isinstance(self.data, DeltaChunkList):
136-
return self.data.lbound() + self.so
137-
else:
138-
return self.so
139-
# END handle data type
140-
return 0
141-
142128
def rbound(self):
143129
return self.to + self.ts
144130

145131
def has_data(self):
146132
""":return: True if the instance has data to add to the target stream"""
147-
return self.data is not None and not isinstance(self.data, DeltaChunkList)
148-
149-
def has_copy_chunklist(self):
150-
""":return: True if we copy our data from a chunklist"""
151-
return self.data is not None and isinstance(self.data, DeltaChunkList)
152-
153-
def set_copy_chunklist(self, dcl):
154-
"""Set the deltachunk list to be used as basis for copying.
155-
:note: only works if this chunk is a copy delta chunk"""
156-
self.data = dcl
157-
self.so = 0 # allows lbound moves to be virtual
158-
159-
133+
return self.data is not None
160134

161135
#} END interface
162136

@@ -233,9 +207,54 @@ def delta_list_apply(dcl, bbuf, write, lbound_offset=0, size=0):
233207
# END for each dc
234208
# END handle application values
235209

210+
def delta_list_slice(dcl, absofs, size):
211+
""":return: Subsection of this list at the given absolute offset, with the given
212+
size in bytes.
213+
:return: list (copy) which represents the given chunk"""
214+
dcllbound = dcl.lbound()
215+
absofs = max(absofs, dcllbound)
216+
size = min(dcl.rbound() - dcllbound, size)
217+
cdi = _closest_index(dcl, absofs) # delta start index
218+
cd = dcl[cdi]
219+
slen = len(dcl)
220+
ndcl = list()
221+
lappend = ndcl.append
222+
223+
if cd.to != absofs:
224+
tcd = delta_duplicate(cd)
225+
_move_delta_lbound(tcd, absofs - cd.to)
226+
_set_delta_rbound(tcd, min(tcd.ts, size))
227+
lappend(tcd)
228+
size -= tcd.ts
229+
cdi += 1
230+
# END lbound overlap handling
231+
232+
while cdi < slen and size:
233+
# are we larger than the current block
234+
cd = dcl[cdi]
235+
if cd.ts <= size:
236+
lappend(delta_duplicate(cd))
237+
size -= cd.ts
238+
else:
239+
tcd = delta_duplicate(cd)
240+
_set_delta_rbound(tcd, size)
241+
lappend(tcd)
242+
size -= tcd.ts
243+
break
244+
# END hadle size
245+
cdi += 1
246+
# END for each chunk
247+
248+
return ndcl
236249

237250
class DeltaChunkList(list):
238-
"""List with special functionality to deal with DeltaChunks"""
251+
"""List with special functionality to deal with DeltaChunks.
252+
There are two types of lists we represent. The one was created bottom-up, working
253+
towards the latest delta, the other kind was created top-down, working from the
254+
latest delta down to the earliest ancestor. This attribute is queryable
255+
after all processing with is_reversed."""
256+
257+
__slots__ = tuple()
239258

240259
def rbound(self):
241260
""":return: rightmost extend in bytes, absolute"""
@@ -253,19 +272,6 @@ def size(self):
253272
""":return: size of bytes as measured by our delta chunks"""
254273
return self.rbound() - self.lbound()
255274

256-
def connect_with(self, bdcl):
257-
"""Connect this instance's delta chunks virtually with the given base.
258-
This means that all copy deltas will simply apply to the given region
259-
of the given base. Afterwards, the base is optimized so that add-deltas
260-
will be truncated to the region actually used, or removed completely where
261-
adequate. This way, memory usage is reduced.
262-
:param bdcl: DeltaChunkList to serve as base"""
263-
for dc in self:
264-
if not dc.has_data():
265-
dc.set_copy_chunklist(bdcl[dc.so:dc.ts])
266-
# END handle overlap
267-
# END for each dc
268-
269275
def apply(self, bbuf, write, lbound_offset=0, size=0):
270276
"""Only used by public clients, internally we only use the global routines
271277
for performance"""
@@ -285,7 +291,7 @@ def compress(self):
285291
while i < slen:
286292
dc = self[i]
287293
i += 1
288-
if not dc.has_data():
294+
if dc.data is None:
289295
if first_data_index is not None and i-2-first_data_index > 1:
290296
#if first_data_index is not None:
291297
nd = StringIO() # new data
@@ -333,8 +339,6 @@ def check_integrity(self, target_size=-1):
333339
assert dc.ts > 0
334340
if dc.has_data():
335341
assert len(dc.data) >= dc.ts
336-
if dc.has_copy_chunklist():
337-
assert dc.ts <= dc.data.size()
338342
# END for each dc
339343

340344
left = islice(self, 0, len(self)-1)
@@ -347,49 +351,71 @@ def check_integrity(self, target_size=-1):
347351
assert lft.to + lft.ts == rgt.to
348352
# END for each pair
349353

350-
def __getslice__(self, absofs, size):
351-
""":return: Subsection of this list at the given absolute offset, with the given
352-
size in bytes.
353-
:return: DeltaChunkList (copy) which represents the given chunk"""
354-
if len(self) == 0:
355-
return DeltaChunkList()
354+
355+
class TopdownDeltaChunkList(DeltaChunkList):
356+
"""Represents a list which is generated by feeding its ancestor streams one by
357+
one"""
358+
__slots__ = tuple()
359+
360+
def connect_with_next_base(self, bdcl):
361+
"""Connect this chain with the next level of our base delta chunklist.
362+
The goal in this game is to mark as many of our chunks rigid, hence they
363+
cannot be changed by any of the upcoming bases anymore. Once all our
364+
chunks are marked like that, we can stop all processing
365+
:param bdcl: data chunk list being one of our bases. They must be fed in
366+
consequtively and in order, towards the earliest ancestor delta
367+
:return: True if processing was done. Use it to abort processing of
368+
remaining streams if False is returned"""
369+
nfc = 0 # number of frozen chunks
370+
dci = 0 # delta chunk index
371+
slen = len(self) # len of self
372+
while dci < slen:
373+
dc = self[dci]
374+
dci += 1
356375

357-
absofs = max(absofs, self.lbound())
358-
size = min(self.rbound() - self.lbound(), size)
359-
cdi = _closest_index(self, absofs) # delta start index
360-
cd = self[cdi]
361-
slen = len(self)
362-
ndcl = self.__class__()
363-
364-
if cd.to != absofs:
365-
tcd = delta_duplicate(cd)
366-
_move_delta_lbound(tcd, absofs - cd.to)
367-
_set_delta_rbound(tcd, min(tcd.ts, size))
368-
ndcl.append(tcd)
369-
size -= tcd.ts
370-
cdi += 1
371-
# END lbound overlap handling
372-
373-
while cdi < slen and size:
374-
# are we larger than the current block
375-
cd = self[cdi]
376-
if cd.ts <= size:
377-
ndcl.append(delta_duplicate(cd))
378-
size -= cd.ts
376+
# all add-chunks which are already topmost don't need additional processing
377+
if dc.data is not None:
378+
nfc += 1
379+
continue
380+
# END skip add chunks
381+
382+
# copy chunks
383+
# integrate the portion of the base list into ourselves. Lists
384+
# dont support efficient insertion ( just one at a time ), but for now
385+
# we live with it. Internally, its all just a 32/64bit pointer, and
386+
# the portions of moved memory should be smallish. Maybe we just rebuild
387+
# ourselves in order to reduce the amount of insertions ...
388+
ccl = delta_list_slice(bdcl, dc.so, dc.ts)
389+
390+
# move the target bounds into place to match with our chunk
391+
ofs = dc.to - dc.so
392+
for cdc in ccl:
393+
cdc.to += ofs
394+
# END update target bounds
395+
396+
if len(ccl) == 1:
397+
self[dci-1] = ccl[0]
379398
else:
380-
tcd = delta_duplicate(cd)
381-
_set_delta_rbound(tcd, size)
382-
ndcl.append(tcd)
383-
size -= tcd.ts
384-
break
385-
# END hadle size
386-
cdi += 1
399+
# maybe try to compute the expenses here, and pick the right algorithm
400+
# It would normally be faster than copying everything physically though
401+
# TODO: Use a deque here, and decide by the index whether to extend
402+
# or extend left !
403+
post_dci = self[dci:]
404+
del(self[dci-1:]) # include deletion of dc
405+
self.extend(ccl)
406+
self.extend(post_dci)
407+
408+
slen = len(self)
409+
dci += len(ccl)-1 # deleted dc, added rest
410+
411+
# END handle chunk replacement
387412
# END for each chunk
388413

389-
# ndcl.check_integrity()
390-
return ndcl
391-
392-
414+
if nfc == slen:
415+
return False
416+
# END handle completeness
417+
return True
418+
393419

394420
#} END structures
395421

@@ -513,23 +539,16 @@ def stream_copy(read, write, size, chunk_size):
513539
# END duplicate data
514540
return dbw
515541

516-
def reverse_connect_deltas(dcl, dstreams):
517-
"""Read the condensed delta chunk information from dstream and merge its information
518-
into a list of existing delta chunks
519-
:param dcl: see 3
520-
:param dstreams: iterable of delta stream objects. They must be ordered latest first,
521-
hence the delta to be applied last comes first, then its ancestors
522-
:return: None"""
523-
raise NotImplementedError("This is left out up until we actually iterate the dstreams - they are prefetched right now")
524-
525542
def connect_deltas(dstreams):
526543
"""Read the condensed delta chunk information from dstream and merge its information
527544
into a list of existing delta chunks
528-
:param dstreams: iterable of delta stream objects. They must be ordered latest last,
529-
hence the delta to be applied last comes last, its oldest ancestor first
545+
:param dstreams: iterable of delta stream objects, the delta to be applied last
546+
comes first, then all its ancestors in order
530547
:return: DeltaChunkList, containing all operations to apply"""
531548
bdcl = None # data chunk list for initial base
532-
dcl = DeltaChunkList()
549+
tdcl = None # topmost dcl
550+
551+
dcl = tdcl = TopdownDeltaChunkList()
533552
for dsi, ds in enumerate(dstreams):
534553
# print "Stream", dsi
535554
db = ds.read()
@@ -593,18 +612,16 @@ def connect_deltas(dstreams):
593612

594613
# merge the lists !
595614
if bdcl is not None:
596-
dcl.connect_with(bdcl)
615+
if not tdcl.connect_with_next_base(dcl):
616+
break
597617
# END handle merge
598618

599-
# dcl.check_integrity()
600-
601619
# prepare next base
602620
bdcl = dcl
603621
dcl = DeltaChunkList()
604622
# END for each delta stream
605623

606-
return bdcl
607-
624+
return tdcl
608625

609626
def apply_delta_data(src_buf, src_buf_size, delta_buf, delta_buf_size, write):
610627
"""

stream.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -328,17 +328,16 @@ def __init__(self, stream_list):
328328
def _set_cache_(self, attr):
329329
# the direct algorithm is fastest and most direct if there is only one
330330
# delta. Also, the extra overhead might not be worth it for items smaller
331-
# than X - definitely the case in python
332-
# hence we apply a worst-case scenario here
333-
# TODO: read the final size from the deltastream - have to partly unpack
334-
# if len(self._dstreams) * self._size < self.k_max_memory_move:
331+
# than X - definitely the case in python, every function call costs
332+
# huge amounts of time
333+
# if len(self._dstreams) * self._bstream.size < self.k_max_memory_move:
335334
if len(self._dstreams) == 1:
336335
return self._set_cache_brute_(attr)
337336

338337
# Aggregate all deltas into one delta in reverse order. Hence we take
339338
# the last delta, and reverse-merge its ancestor delta, until we receive
340339
# the final delta data stream.
341-
dcl = connect_deltas(reversed(self._dstreams))
340+
dcl = connect_deltas(self._dstreams)
342341

343342
if len(dcl) == 0:
344343
self._size = 0

test/test_pack.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ def test_pack(self):
130130
self._assert_pack_file(pack, version, size)
131131
# END for each pack to test
132132

133-
def _test_pack_entity(self):
133+
def test_pack_entity(self):
134134
for packinfo, indexinfo in ( (self.packfile_v2_1, self.packindexfile_v1),
135135
(self.packfile_v2_2, self.packindexfile_v2),
136136
(self.packfile_v2_3_ascii, self.packindexfile_v2_3_ascii)):

0 commit comments

Comments
 (0)