Skip to content

Commit 9b0773b

Browse files
committed
Removed previous non-reverse delta-application functionality. Although it was slightly faster, this new version only needs a faster slicing, which consumes ridiculous amounts of time
1 parent 0381cae commit 9b0773b

File tree

2 files changed

+32
-111
lines changed

2 files changed

+32
-111
lines changed

fun.py

Lines changed: 28 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ def _set_delta_rbound(d, size):
6262

6363
# NOTE: data is truncated automatically when applying the delta
6464
# MUST NOT DO THIS HERE
65-
6665
return d
6766

6867
def _move_delta_lbound(d, bytes):
@@ -76,14 +75,14 @@ def _move_delta_lbound(d, bytes):
7675
d.to += bytes
7776
d.so += bytes
7877
d.ts -= bytes
79-
if d.has_data():
78+
if d.data is not None:
8079
d.data = d.data[bytes:]
8180
# END handle data
8281

8382
return d
8483

8584
def delta_duplicate(src):
86-
return DeltaChunk(src.to, src.ts, src.so, src.data, src.flags)
85+
return DeltaChunk(src.to, src.ts, src.so, src.data)
8786

8887
def delta_chunk_apply(dc, bbuf, write):
8988
"""Apply own data to the target buffer
@@ -92,8 +91,6 @@ def delta_chunk_apply(dc, bbuf, write):
9291
if dc.data is None:
9392
# COPY DATA FROM SOURCE
9493
write(buffer(bbuf, dc.so, dc.ts))
95-
elif isinstance(dc.data, DeltaChunkList):
96-
delta_list_apply(dc.data, bbuf, write, dc.so, dc.ts)
9794
else:
9895
# APPEND DATA
9996
# whats faster: if + 4 function calls or just a write with a slice ?
@@ -105,6 +102,7 @@ def delta_chunk_apply(dc, bbuf, write):
105102
# END handle truncation
106103
# END handle chunk mode
107104

105+
108106
class DeltaChunk(object):
109107
"""Represents a piece of a delta, it can either add new data, or copy existing
110108
one from a source buffer"""
@@ -114,51 +112,25 @@ class DeltaChunk(object):
114112
'so', # start offset in the source buffer in bytes or None
115113
'data', # chunk of bytes to be added to the target buffer,
116114
# DeltaChunkList to use as base, or None
117-
'flags' # currently only True or False
118115
)
119116

120-
def __init__(self, to, ts, so, data, flags):
117+
def __init__(self, to, ts, so, data):
121118
self.to = to
122119
self.ts = ts
123120
self.so = so
124121
self.data = data
125-
self.flags = flags
126122

127123
def __repr__(self):
128-
return "DeltaChunk(%i, %i, %s, %s, %i)" % (self.to, self.ts, self.so, self.data or "", self.flags)
124+
return "DeltaChunk(%i, %i, %s, %s)" % (self.to, self.ts, self.so, self.data or "")
129125

130126
#{ Interface
131127

132-
def copy_offset(self):
133-
""":return: offset to apply when copying from a base buffer, or 0
134-
if this is not a copying delta chunk"""
135-
136-
if self.data is not None:
137-
if isinstance(self.data, DeltaChunkList):
138-
return self.data.lbound() + self.so
139-
else:
140-
return self.so
141-
# END handle data type
142-
return 0
143-
144128
def rbound(self):
145129
return self.to + self.ts
146130

147131
def has_data(self):
148132
""":return: True if the instance has data to add to the target stream"""
149-
return self.data is not None and not isinstance(self.data, DeltaChunkList)
150-
151-
def has_copy_chunklist(self):
152-
""":return: True if we copy our data from a chunklist"""
153-
return self.data is not None and isinstance(self.data, DeltaChunkList)
154-
155-
def set_copy_chunklist(self, dcl):
156-
"""Set the deltachunk list to be used as basis for copying.
157-
:note: only works if this chunk is a copy delta chunk"""
158-
self.data = dcl
159-
self.so = 0 # allows lbound moves to be virtual
160-
161-
133+
return self.data is not None
162134

163135
#} END interface
164136

@@ -239,21 +211,20 @@ def delta_list_slice(dcl, absofs, size):
239211
""":return: Subsection of this list at the given absolute offset, with the given
240212
size in bytes.
241213
:return: DeltaChunkList (copy) which represents the given chunk"""
242-
if len(dcl) == 0:
243-
return DeltaChunkList()
244-
245-
absofs = max(absofs, dcl.lbound())
246-
size = min(dcl.rbound() - dcl.lbound(), size)
214+
dcllbound = dcl.lbound()
215+
absofs = max(absofs, dcllbound)
216+
size = min(dcl.rbound() - dcllbound, size)
247217
cdi = _closest_index(dcl, absofs) # delta start index
248218
cd = dcl[cdi]
249219
slen = len(dcl)
250-
ndcl = dcl.__class__()
220+
ndcl = DeltaChunkList()
221+
lappend = ndcl.append
251222

252223
if cd.to != absofs:
253224
tcd = delta_duplicate(cd)
254225
_move_delta_lbound(tcd, absofs - cd.to)
255226
_set_delta_rbound(tcd, min(tcd.ts, size))
256-
ndcl.append(tcd)
227+
lappend(tcd)
257228
size -= tcd.ts
258229
cdi += 1
259230
# END lbound overlap handling
@@ -262,12 +233,12 @@ def delta_list_slice(dcl, absofs, size):
262233
# are we larger than the current block
263234
cd = dcl[cdi]
264235
if cd.ts <= size:
265-
ndcl.append(delta_duplicate(cd))
236+
lappend(delta_duplicate(cd))
266237
size -= cd.ts
267238
else:
268239
tcd = delta_duplicate(cd)
269240
_set_delta_rbound(tcd, size)
270-
ndcl.append(tcd)
241+
lappend(tcd)
271242
size -= tcd.ts
272243
break
273244
# END hadle size
@@ -301,19 +272,6 @@ def size(self):
301272
""":return: size of bytes as measured by our delta chunks"""
302273
return self.rbound() - self.lbound()
303274

304-
def connect_with(self, bdcl):
305-
"""Connect this instance's delta chunks virtually with the given base.
306-
This means that all copy deltas will simply apply to the given region
307-
of the given base. Afterwards, the base is optimized so that add-deltas
308-
will be truncated to the region actually used, or removed completely where
309-
adequate. This way, memory usage is reduced.
310-
:param bdcl: DeltaChunkList to serve as base"""
311-
for dc in self:
312-
if not dc.has_data():
313-
dc.set_copy_chunklist(delta_list_slice(bdcl, dc.so, dc.ts))
314-
# END handle overlap
315-
# END for each dc
316-
317275
def apply(self, bbuf, write, lbound_offset=0, size=0):
318276
"""Only used by public clients, internally we only use the global routines
319277
for performance"""
@@ -333,7 +291,7 @@ def compress(self):
333291
while i < slen:
334292
dc = self[i]
335293
i += 1
336-
if not dc.has_data():
294+
if dc.data is None:
337295
if first_data_index is not None and i-2-first_data_index > 1:
338296
#if first_data_index is not None:
339297
nd = StringIO() # new data
@@ -345,7 +303,7 @@ def compress(self):
345303

346304
del(self[first_data_index:i-1])
347305
buf = nd.getvalue()
348-
self.insert(first_data_index, DeltaChunk(so, len(buf), 0, buf, False))
306+
self.insert(first_data_index, DeltaChunk(so, len(buf), 0, buf))
349307

350308
slen = len(self)
351309
i = first_data_index + 1
@@ -381,8 +339,6 @@ def check_integrity(self, target_size=-1):
381339
assert dc.ts > 0
382340
if dc.has_data():
383341
assert len(dc.data) >= dc.ts
384-
if dc.has_copy_chunklist():
385-
assert dc.ts <= dc.data.size()
386342
# END for each dc
387343

388344
left = islice(self, 0, len(self)-1)
@@ -417,16 +373,8 @@ def connect_with_next_base(self, bdcl):
417373
dc = self[dci]
418374
dci += 1
419375

420-
if dc.flags:
421-
nfc += 1
422-
continue
423-
# END skip frozen chunks
424-
425-
# all data chunks must be frozen, we are topmost already
426-
# (Also if its a copy operation onto the lowest base, but we cannot
427-
# determine that without the number of deltas to come)
428-
if dc.has_data():
429-
dc.flags = True
376+
# all add-chunks which are already topmost don't need additional processing
377+
if dc.data is not None:
430378
nfc += 1
431379
continue
432380
# END skip add chunks
@@ -448,7 +396,6 @@ def connect_with_next_base(self, bdcl):
448396
if len(ccl) == 1:
449397
self[dci-1] = ccl[0]
450398
else:
451-
452399
# maybe try to compute the expenses here, and pick the right algorithm
453400
# It would normally be faster than copying everything physically though
454401
# TODO: Use a deque here, and decide by the index whether to extend
@@ -592,33 +539,16 @@ def stream_copy(read, write, size, chunk_size):
592539
# END duplicate data
593540
return dbw
594541

595-
def reverse_connect_deltas(dcl, dstreams):
542+
def connect_deltas(dstreams):
596543
"""Read the condensed delta chunk information from dstream and merge its information
597544
into a list of existing delta chunks
598-
:param dcl: see 3
599-
:param dstreams: iterable of delta stream objects. They must be ordered latest first,
600-
hence the delta to be applied last comes first, then its ancestors
601-
:return: None"""
602-
raise NotImplementedError("This is left out up until we actually iterate the dstreams - they are prefetched right now")
603-
604-
def connect_deltas(dstreams, reverse):
605-
"""Read the condensed delta chunk information from dstream and merge its information
606-
into a list of existing delta chunks
607-
:param dstreams: iterable of delta stream objects. They must be ordered latest last,
608-
hence the delta to be applied last comes last, its oldest ancestor first
609-
:param reverse: If False, the given iterable of delta-streams returns
610-
items in from latest ancestor to the last delta.
611-
If True, deltas are ordered so that the one to be applied last comes first.
545+
:param dstreams: iterable of delta stream objects, the delta to be applied last
546+
comes first, then all its ancestors in order
612547
:return: DeltaChunkList, containing all operations to apply"""
613548
bdcl = None # data chunk list for initial base
614-
tdcl = None # topmost dcl, only effective if reverse is True
615-
616-
if reverse:
617-
dcl = tdcl = TopdownDeltaChunkList()
618-
else:
619-
dcl = DeltaChunkList()
620-
# END handle type of first chunk list
549+
tdcl = None # topmost dcl
621550

551+
dcl = tdcl = TopdownDeltaChunkList()
622552
for dsi, ds in enumerate(dstreams):
623553
# print "Stream", dsi
624554
db = ds.read()
@@ -665,12 +595,12 @@ def connect_deltas(dstreams, reverse):
665595
rbound > base_size):
666596
break
667597

668-
dcl.append(DeltaChunk(tbw, cp_size, cp_off, None, False))
598+
dcl.append(DeltaChunk(tbw, cp_size, cp_off, None))
669599
tbw += cp_size
670600
elif c:
671601
# NOTE: in C, the data chunks should probably be concatenated here.
672602
# In python, we do it as a post-process
673-
dcl.append(DeltaChunk(tbw, c, 0, db[i:i+c], False))
603+
dcl.append(DeltaChunk(tbw, c, 0, db[i:i+c]))
674604
i += c
675605
tbw += c
676606
else:
@@ -682,24 +612,16 @@ def connect_deltas(dstreams, reverse):
682612

683613
# merge the lists !
684614
if bdcl is not None:
685-
if tdcl:
686-
if not tdcl.connect_with_next_base(dcl):
687-
break
688-
# END early abort
689-
else:
690-
dcl.connect_with(bdcl)
615+
if not tdcl.connect_with_next_base(dcl):
616+
break
691617
# END handle merge
692618

693619
# prepare next base
694620
bdcl = dcl
695621
dcl = DeltaChunkList()
696622
# END for each delta stream
697623

698-
if tdcl:
699-
return tdcl
700-
else:
701-
return bdcl
702-
624+
return tdcl
703625

704626
def apply_delta_data(src_buf, src_buf_size, delta_buf, delta_buf_size, write):
705627
"""

stream.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -328,17 +328,16 @@ def __init__(self, stream_list):
328328
def _set_cache_(self, attr):
329329
# the direct algorithm is fastest and most direct if there is only one
330330
# delta. Also, the extra overhead might not be worth it for items smaller
331-
# than X - definitely the case in python
332-
# hence we apply a worst-case scenario here
333-
# TODO: read the final size from the deltastream - have to partly unpack
334-
# if len(self._dstreams) * self._size < self.k_max_memory_move:
331+
# than X - definitely the case in python, every function call costs
332+
# huge amounts of time
333+
# if len(self._dstreams) * self._bstream.size < self.k_max_memory_move:
335334
if len(self._dstreams) == 1:
336335
return self._set_cache_brute_(attr)
337336

338337
# Aggregate all deltas into one delta in reverse order. Hence we take
339338
# the last delta, and reverse-merge its ancestor delta, until we receive
340339
# the final delta data stream.
341-
dcl = connect_deltas(self._dstreams, reverse=True)
340+
dcl = connect_deltas(self._dstreams)
342341

343342
if len(dcl) == 0:
344343
self._size = 0

0 commit comments

Comments
 (0)