1010decompressobj = zlib .decompressobj
1111
1212import mmap
13+ from itertools import islice , izip
1314
1415# INVARIANTS
1516OFS_DELTA = 6
@@ -93,7 +94,10 @@ def __init__(self, to, ts, so, data):
9394 self .ts = ts
9495 self .so = so
9596 self .data = data
96-
97+
98+ def __repr__ (self ):
99+ return "DeltaChunk(%i, %i, %s, %s)" % (self .to , self .ts , self .so , self .data or "" )
100+
97101 #{ Interface
98102
99103 def rbound (self ):
@@ -105,6 +109,7 @@ def apply(self, source, write):
105109 :param write: write method to call with data to write"""
106110 if self .data is None :
107111 # COPY DATA FROM SOURCE
112+ assert len (source ) - self .so - self .ts > 0
108113 write (buffer (source , self .so , self .ts ))
109114 else :
110115 # APPEND DATA
@@ -121,14 +126,16 @@ def apply(self, source, write):
121126def _closest_index (dcl , absofs ):
122127 """:return: index at which the given absofs should be inserted. The index points
123128 to the DeltaChunk with a target buffer absofs that equals or is greater than
124- absofs
129+ absofs.
125130 :note: global method for performance only, it belongs to DeltaChunkList"""
126131 # TODO: binary search !!
127132 for i ,d in enumerate (dcl ):
128- if absofs >= d .to :
133+ if absofs < d .to :
134+ return i - 1
135+ elif absofs == d .to :
129136 return i
130137 # END for each delta absofs
131- raise AssertionError ( "Should never be here" )
138+ return len ( dcl ) - 1
132139
133140def _split_delta (dcl , d , di , relofs , insert_offset = 0 ):
134141 """Split the delta at di into two deltas, adjusting their sizes, offsets and data
@@ -150,7 +157,7 @@ def _split_delta(dcl, d, di, relofs, insert_offset=0):
150157
151158 nd = DeltaChunk ( drb ,
152159 osize ,
153- ( d .so and d . so + osize ) or None ,
160+ d .so + osize ,
154161 (d .data and d .data [osize :]) or None )
155162
156163 self .insert (di + 1 + insert_offset , nd )
@@ -178,64 +185,70 @@ def _handle_merge(ld, rd):
178185 # END combine or insert data
179186 # END handle chunk mode
180187
181- def _merge_delta (dcl , d ):
188+ def _merge_delta (dcl , dc ):
182189 """Merge the given DeltaChunk instance into the dcl
183190 :param d: the DeltaChunk to merge"""
184- cdi = _closest_index (dcl , d .to ) # current delta index
191+ if len (dcl ) == 0 :
192+ dcl .append (dc )
193+ return
194+ # END early return on empty list
195+
196+ cdi = _closest_index (dcl , dc .to ) # current delta index
185197 cd = dcl [cdi ] # current delta
186198
187199 # either we go at his spot, or after
188200 # cdi either moves one up, or stays
189- dcl .insert (di + (d .to > cd .to ), d )
190- cdi += d .to == cd .to
201+ #print "insert at %i" % (cdi + (dc.to > cd.to))
202+ #print cd, dc
203+ dcl .insert (cdi + (dc .to > cd .to ), dc )
204+ cdi += dc .to == cd .to
191205
192206 while True :
193207 # are we larger than the current block
194- if d .to < cd .to :
195- if d .rbound () >= cd .rbound ():
208+ if dc .to < cd .to :
209+ if dc .rbound () >= cd .rbound ():
196210 # xxx|xxx|x
197211 # remove the current item completely
198212 dcl .pop (cdi )
199213 cdi -= 1
200- elif d .rbound () > cd .to :
214+ elif dc .rbound () > cd .to :
201215 # MOVE ITS LBOUND
202216 # xxx|x--|
203- _move_delta_lbound (cd , d .rbound () - cd .to )
217+ _move_delta_lbound (cd , dc .rbound () - cd .to )
204218 break
205219 else :
206220 # WE DON'T OVERLAP IT
207- # this can possibly happen
208- assert False , "Wow, this can really happen"
221+ # this can actually happen, once multiple streams are merged
209222 break
210223 # END rbound overlap handling
211224 # END lbound overlap handling
212225 else :
213- if d .to >= cd .rbound ():
226+ if dc .to >= cd .rbound ():
214227 #|---|...xx
215228 break
216229 # END
217230
218- if d .rbound () >= cd .rbound ():
219- if d .to == cd .to :
231+ if dc .rbound () >= cd .rbound ():
232+ if dc .to == cd .to :
220233 #|xxx|x
221234 # REMOVE CD
222235 dcl .pop (cdi )
223236 cdi -= 1
224237 else :
225238 # TRUNCATE CD
226239 #|-xx|
227- _set_delta_rbound (cd , d .to - cd .to )
240+ _set_delta_rbound (cd , dc .to - cd .to )
228241 # END handle offset special case
229- elif d .to == cd .to :
242+ elif dc .to == cd .to :
230243 #|x--|
231244 # we shift it by our size
232- _move_delta_lbound (cd , d .ts )
245+ _move_delta_lbound (cd , dc .ts )
233246 else :
234247 #|-x-|
235248 # SPLIT CD AND LBOUND MOVE ITS SECOND PART
236249 # insert offset is required to insert it after us
237250 nd = _split_delta (dcl , cd , cdi , 1 )
238- _move_delta_lbound (nd , d .ts )
251+ _move_delta_lbound (nd , dc .ts )
239252 break
240253 # END handle rbound overlap
241254 # END handle overlap
@@ -248,30 +261,14 @@ def _merge_delta(dcl, d):
248261 # END check for end of list
249262 # while our chunk is not completely done
250263
251-
264+ ## DEBUG ##
265+ dcl .check_integrity ()
252266
253267
254268
255269class DeltaChunkList (list ):
256270 """List with special functionality to deal with DeltaChunks"""
257271
258- def init (self , size ):
259- """Intialize this instance with chunks defining to fill up size from a base
260- buffer of equal size"""
261- if len (self ) != 0 :
262- return
263- # pretend we have one huge delta chunk, which just copies everything
264- # from source to destination
265- maxint32 = 2 ** 32
266- for x in range (0 , size , maxint32 ):
267- self .append (DeltaChunk (x , maxint32 , x , None ))
268- # END create copy chunks
269- offset = x * maxint32
270- remainder = size - offset
271- if remainder :
272- self .append (DeltaChunk (offset , remainder , offset , None ))
273- # END handle all done in loop
274-
275272 def terminate_at (self , size ):
276273 """Chops the list at the given size, splitting and removing DeltaNodes
277274 as required"""
@@ -283,6 +280,38 @@ def terminate_at(self, size):
283280 # END truncate last node if possible
284281 del (self [di + (rsize != 0 ):])
285282
283+ ## DEBUG ##
284+ self .check_integrity (size )
285+
286+ def check_integrity (self , target_size = - 1 ):
287+ """Verify the list has non-overlapping chunks only, and the total size matches
288+ target_size
289+ :param target_size: if not -1, the total size of the chain must be target_size
290+ :raise AssertionError: if the size doen't match"""
291+ if target_size > - 1 :
292+ assert self [- 1 ].rbound () == target_size
293+ assert reduce (lambda x ,y : x + y , (d .ts for d in self ), 0 ) == target_size
294+ # END target size verification
295+
296+ if len (self ) < 2 :
297+ return
298+
299+ # check data
300+ for dc in self :
301+ if dc .data :
302+ assert len (dc .data ) >= dc .ts
303+ # END for each dc
304+
305+ left = islice (self , 0 , len (self )- 1 )
306+ right = iter (self )
307+ right .next ()
308+ # this is very pythonic - we might have just use index based access here,
309+ # but this could actually be faster
310+ for lft ,rgt in izip (left , right ):
311+ assert lft .rbound () == rgt .to
312+ assert lft .to + lft .ts == rgt .to
313+ # END for each pair
314+
286315#} END structures
287316
288317#{ Routines
@@ -422,18 +451,15 @@ def merge_deltas(dcl, dstreams):
422451 :param dstreams: iterable of delta stream objects. They must be ordered latest last,
423452 hence the delta to be applied last comes last, its oldest ancestor first
424453 :return: None"""
425- for ds in dstreams :
454+ for dsi , ds in enumerate (dstreams ):
455+ # print "Stream", dsi
426456 db = ds .read ()
427457 delta_buf_size = ds .size
428458
429459 # read header
430460 i , src_size = msb_size (db )
431461 i , target_size = msb_size (db , i )
432462
433- if len (dcl ) == 0 :
434- dcl .init (target_size )
435- # END handle empty list
436-
437463 # interpret opcodes
438464 tbw = 0 # amount of target bytes written
439465 while i < delta_buf_size :
@@ -475,7 +501,7 @@ def merge_deltas(dcl, dstreams):
475501 tbw += cp_size
476502 elif c :
477503 # TODO: Concatenate multiple deltachunks
478- _merge_delta (dcl , DeltaChunk (tbw , c , None , db [i :i + c ]))
504+ _merge_delta (dcl , DeltaChunk (tbw , c , 0 , db [i :i + c ]))
479505 i += c
480506 tbw += c
481507 else :
@@ -487,6 +513,8 @@ def merge_deltas(dcl, dstreams):
487513
488514 # END for each delta stream
489515
516+ # print dcl
517+
490518
491519def apply_delta_data (src_buf , src_buf_size , delta_buf , delta_buf_size , write ):
492520 """
0 commit comments