Skip to content

Commit 274c5c8

Browse files
committed
Added frame for actual implementation of the aggregation - for now we just implement a forward-merge of the chunks, then the reverse version once there is some more knowledge on how this works
1 parent 425ecf0 commit 274c5c8

File tree

4 files changed

+109
-6
lines changed

4 files changed

+109
-6
lines changed

ext/async

Submodule async updated from 2842d1e to 5992bb6

fun.py

Lines changed: 80 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,48 @@
4141

4242
__all__ = ('is_loose_object', 'loose_object_header_info', 'msb_size', 'pack_object_header_info',
4343
'write_object', 'loose_object_header', 'stream_copy', 'apply_delta_data',
44-
'is_equal_canonical_sha' )
44+
'is_equal_canonical_sha', 'apply_delta_chunks', 'reverse_merge_deltas',
45+
'merge_deltas')
46+
47+
48+
#{ Structures
49+
50+
class DeltaChunk(object):
51+
"""Represents a piece of a delta, it can either add new data, or copy existing
52+
one from a source buffer"""
53+
__slots__ = (
54+
'to', # start offset in the target buffer in bytes
55+
'ts', # size of this chunk in the target buffer in bytes
56+
'so', # start offset in the source buffer in bytes or None
57+
'data' # chunk of bytes to be added to the target buffer or None
58+
)
59+
60+
def __init__(self, to, ts, so, data):
61+
self.to = to
62+
self.ts = ts
63+
self.so = so
64+
self.data = data
65+
66+
#{ Interface
67+
68+
def abssize(self):
69+
return self.to + self.ts
70+
71+
def apply(self, source, target):
72+
"""Apply own data to the target buffer
73+
:param source: buffer providing source bytes for copy operations
74+
:param target: target buffer large enough to contain all the changes to be applied"""
75+
if self.data is not None:
76+
# APPEND DATA
77+
pass
78+
else:
79+
# COPY DATA FROM SOURCE
80+
pass
81+
# END handle chunk mode
82+
83+
#} END interface
84+
85+
#} END structures
4586

4687
#{ Routines
4788

@@ -164,10 +205,46 @@ def stream_copy(read, write, size, chunk_size):
164205
return dbw
165206

166207

208+
def reverse_merge_deltas(dcl, dstreams):
209+
"""Read the condensed delta chunk information from dstream and merge its information
210+
into a list of existing delta chunks
211+
:param dcl: list of DeltaChunk objects, may be empty initially, and will be changed
212+
during the merge process
213+
:param dstreams: iterable of delta stream objects. They must be ordered latest first,
214+
hence the delta to be applied last comes first, then its ancestors
215+
:return: None"""
216+
raise NotImplementedError("This is left out up until we actually iterate the dstreams - they are prefetched right now")
217+
218+
def merge_deltas(dcl, dstreams):
219+
"""Read the condensed delta chunk information from dstream and merge its information
220+
into a list of existing delta chunks
221+
:param dcl: list of DeltaChunk objects, may be empty initially, and will be changed
222+
during the merge process
223+
:param dstreams: iterable of delta stream objects. They must be ordered latest last,
224+
hence the delta to be applied last comes last, its oldest ancestor first
225+
:return: None"""
226+
for ds in dstreams:
227+
buf = ds.read()
228+
i, src_size = msb_size(buf)
229+
i, target_size = msb_size(buf, i)
230+
231+
# parse the commands
232+
233+
# END for each delta stream
234+
235+
def apply_delta_chunks(src_buf, src_buf_size, dcl, target):
236+
"""
237+
Apply data from a delta chunk list and a source buffer to the target stream
238+
239+
:param src_buf: random access data from which the delta was created
240+
:param src_buf_size: size of the source buffer in bytes
241+
:param delta_buf_size: size fo the delta buffer in bytes
242+
:param target: ostream with a write method"""
243+
244+
167245
def apply_delta_data(src_buf, src_buf_size, delta_buf, delta_buf_size, target_file):
168246
"""
169-
Apply data from a delta buffer using a source buffer to the target file,
170-
which will be written to
247+
Apply data from a delta buffer using a source buffer to the target file
171248
172249
:param src_buf: random access data from which the delta was created
173250
:param src_buf_size: size of the source buffer in bytes

stream.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
from fun import (
88
msb_size,
99
stream_copy,
10-
apply_delta_data,
10+
apply_delta_data,
11+
apply_delta_chunks,
12+
merge_deltas,
1113
delta_types
1214
)
1315

@@ -320,9 +322,29 @@ def __init__(self, stream_list):
320322
self._br = 0
321323

322324
def _set_cache_(self, attr):
325+
# Aggregate all deltas into one delta in reverse order. Hence we take
326+
# the last delta, and reverse-merge its ancestor delta, until we receive
327+
# the final delta data stream.
328+
dcl = list()
329+
reverse_merge_deltas(dcl, self._dstreams)
330+
331+
if len(dcl) == 0:
332+
self._size = 0
333+
self._mm_target = allocate_memory(0)
334+
return
335+
# END handle empty list
336+
337+
self._size = dcl[-1].abssize()
338+
self._mm_target = allocate_memory(self._size)
339+
340+
bbuf = allocate_memory(self._bstream.size)
341+
stream_copy(self._bstream.read, bbuf.write, base_size, 256 * mmap.PAGESIZE)
342+
343+
apply_delta_chunks(bbuf, self._bstream.size, dcl, self._mm_target)
344+
345+
def _set_cache_old(self, attr):
323346
"""If we are here, we apply the actual deltas"""
324347

325-
# prefetch information
326348
buffer_info_list = list()
327349
max_target_size = 0
328350
for dstream in self._dstreams:

util.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,10 @@ def make_sha(source=''):
117117

118118
def allocate_memory(size):
119119
""":return: a file-protocol accessible memory block of the given size"""
120+
if size == 0:
121+
return _RandomAccessStringIO('')
122+
# END handle empty chunks gracefully
123+
120124
try:
121125
return mmap.mmap(-1, size) # read-write by default
122126
except EnvironmentError:

0 commit comments

Comments
 (0)