Skip to content

Commit ff3afac

Browse files
committed
Introduce UploadChangesHandler
1 parent dd03a38 commit ff3afac

File tree

3 files changed

+142
-131
lines changed

3 files changed

+142
-131
lines changed

mergin/cli.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -412,8 +412,8 @@ def push(ctx):
412412
return
413413
directory = os.getcwd()
414414
try:
415-
blocking_job, non_blocking_job = push_project_async(mc, directory)
416-
for job in [blocking_job, non_blocking_job]:
415+
jobs = push_project_async(mc, directory)
416+
for job in jobs:
417417
if job is not None: # if job is none, we don't upload any files, and the transaction is finished already
418418
with click.progressbar(length=job.total_size) as bar:
419419
last_transferred_size = 0
@@ -431,7 +431,8 @@ def push(ctx):
431431
return
432432
except KeyboardInterrupt:
433433
click.secho("Cancelling...")
434-
push_project_cancel(job)
434+
for job in jobs:
435+
push_project_cancel(job)
435436
except Exception as e:
436437
_print_unhandled_exception()
437438

mergin/client_push.py

Lines changed: 136 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import tempfile
1616
import concurrent.futures
1717
import os
18+
from typing import Dict, List
1819

1920
from .common import UPLOAD_CHUNK_SIZE, ClientError
2021
from .merginproject import MerginProject
@@ -83,40 +84,63 @@ def upload_blocking(self, mc, mp):
8384
raise ClientError("Mismatch between uploaded file chunk {} and local one".format(self.chunk_id))
8485

8586

86-
class UploadChanges:
87-
def __init__(self, added=None, updated=None, removed=None):
88-
self.added = added or []
89-
self.updated = updated or []
90-
self.removed = removed or []
91-
self.renamed = []
92-
93-
def is_empty(self):
94-
return not (self.added or self.updated or self.removed or self.renamed)
95-
96-
def split(self):
97-
blocking = UploadChanges()
98-
non_blocking = UploadChanges()
99-
100-
for file in self.added:
101-
target = blocking if is_qgis_file(file["path"]) or is_versioned_file(file["path"]) else non_blocking
102-
target.added.append(file)
87+
class UploadChangesHandler:
88+
"""
89+
Handles preparation of file changes to be uploaded to the server.
10390
104-
for file in self.updated:
105-
blocking.updated.append(file)
91+
This class is responsible for:
92+
- Filtering project file changes.
93+
- Splitting changes into blocking and non-blocking groups.
94+
- TODO: Applying limits such as max file count or size to break large uploads into smaller batches.
95+
- Generating upload-ready change groups for asynchronous job creation.
96+
"""
10697

107-
for file in self.removed:
108-
blocking.removed.append(file)
98+
def __init__(self, mp, client, project_info):
99+
self.mp = mp
100+
self.client = client
101+
self.project_info = project_info
102+
self._raw_changes = mp.get_push_changes()
103+
self._filtered_changes = filter_changes(client, project_info, self._raw_changes)
104+
105+
@staticmethod
106+
def is_blocking_file(file):
107+
return is_qgis_file(file["path"]) or is_versioned_file(file["path"])
108+
109+
def split_by_type(self) -> List[Dict[str, List[dict]]]:
110+
"""
111+
Split raw filtered changes into two batches:
112+
1. Blocking: updated/removed and added files that are blocking
113+
2. Non-blocking: added files that are not blocking
114+
115+
Returns a list of dicts each with keys:
116+
- added, updated, removed, blocking
117+
"""
118+
blocking_group = {"added": [], "updated": [], "removed": [], "blocking": True}
119+
non_blocking_group = {"added": [], "updated": [], "removed": [], "blocking": False}
120+
121+
for f in self._filtered_changes.get("added", []):
122+
if self.is_blocking_file(f):
123+
blocking_group["added"].append(f)
124+
else:
125+
non_blocking_group["added"].append(f)
126+
127+
for f in self._filtered_changes.get("updated", []):
128+
blocking_group["updated"].append(f)
129+
130+
for f in self._filtered_changes.get("removed", []):
131+
blocking_group["removed"].append(f)
132+
133+
result = []
134+
if any(blocking_group[k] for k in ("added", "updated", "removed")):
135+
result.append(blocking_group)
136+
if any(non_blocking_group["added"]):
137+
result.append(non_blocking_group)
109138

110-
result = {}
111-
if not blocking.is_empty():
112-
result["blocking"] = blocking
113-
if not non_blocking.is_empty():
114-
result["non_blocking"] = non_blocking
115139
return result
116140

117141

118-
def push_project_async(mc, directory):
119-
"""Starts push of a project and returns pending upload job"""
142+
def push_project_async(mc, directory) -> List[UploadJob]:
143+
"""Starts push of a project and returns pending upload jobs"""
120144

121145
mp = MerginProject(directory)
122146
if mp.has_unfinished_pull():
@@ -153,111 +177,97 @@ def push_project_async(mc, directory):
153177
+ f"\n\nLocal version: {local_version}\nServer version: {server_version}"
154178
)
155179

156-
changes = mp.get_push_changes()
157-
changes = filter_changes(mc, project_info, changes)
180+
changes_handler = UploadChangesHandler(mp, mc, project_info)
181+
changes_groups = changes_handler.split_by_type()
158182

159-
blocking_changes, non_blocking_changes = changes.split()
183+
jobs = []
184+
for changes in changes_groups:
185+
mp.log.debug("push changes:\n" + pprint.pformat(changes))
160186

161-
blocking_job = (
162-
_prepare_upload_job(mp, mc, project_path, local_version, blocking_changes)
163-
if any(len(v) for v in blocking_changes.values())
164-
else None
165-
)
166-
non_blocking_job = (
167-
_prepare_upload_job(mp, mc, project_path, local_version, non_blocking_changes)
168-
if any(len(v) for v in non_blocking_changes.values())
169-
else None
170-
)
187+
tmp_dir = tempfile.TemporaryDirectory(prefix="python-api-client-")
171188

172-
return blocking_job, non_blocking_job
189+
# If there are any versioned files (aka .gpkg) that are not updated through a diff,
190+
# we need to make a temporary copy somewhere to be sure that we are uploading full content.
191+
# That's because if there are pending transactions, checkpointing or switching from WAL mode
192+
# won't work, and we would end up with some changes left in -wal file which do not get
193+
# uploaded. The temporary copy using geodiff uses sqlite backup API and should copy everything.
194+
for f in changes["updated"]:
195+
if mp.is_versioned_file(f["path"]) and "diff" not in f:
196+
mp.copy_versioned_file_for_upload(f, tmp_dir.name)
173197

198+
for f in changes["added"]:
199+
if mp.is_versioned_file(f["path"]):
200+
mp.copy_versioned_file_for_upload(f, tmp_dir.name)
174201

175-
def _prepare_upload_job(mp, mc, project_path, local_version, changes):
176-
mp.log.debug("push changes:\n" + pprint.pformat(changes))
177-
178-
tmp_dir = tempfile.TemporaryDirectory(prefix="python-api-client-")
179-
180-
# If there are any versioned files (aka .gpkg) that are not updated through a diff,
181-
# we need to make a temporary copy somewhere to be sure that we are uploading full content.
182-
# That's because if there are pending transactions, checkpointing or switching from WAL mode
183-
# won't work, and we would end up with some changes left in -wal file which do not get
184-
# uploaded. The temporary copy using geodiff uses sqlite backup API and should copy everything.
185-
for f in changes["updated"]:
186-
if mp.is_versioned_file(f["path"]) and "diff" not in f:
187-
mp.copy_versioned_file_for_upload(f, tmp_dir.name)
188-
189-
for f in changes["added"]:
190-
if mp.is_versioned_file(f["path"]):
191-
mp.copy_versioned_file_for_upload(f, tmp_dir.name)
192-
193-
if not sum(len(v) for v in changes.values()):
194-
mp.log.info(f"--- push {project_path} - nothing to do")
195-
return
202+
if not sum(len(v) for v in changes.values()):
203+
mp.log.info(f"--- push {project_path} - nothing to do")
204+
return
196205

197-
# drop internal info from being sent to server
198-
for item in changes["updated"]:
199-
item.pop("origin_checksum", None)
200-
data = {"version": local_version, "changes": changes}
206+
# drop internal info from being sent to server
207+
for item in changes["updated"]:
208+
item.pop("origin_checksum", None)
209+
data = {"version": local_version, "changes": changes}
201210

202-
try:
203-
resp = mc.post(
204-
f"/v1/project/push/{project_path}",
205-
data,
206-
{"Content-Type": "application/json"},
207-
)
208-
except ClientError as err:
209-
mp.log.error("Error starting transaction: " + str(err))
210-
mp.log.info("--- push aborted")
211-
raise
212-
server_resp = json.load(resp)
213-
214-
upload_files = data["changes"]["added"] + data["changes"]["updated"]
215-
216-
transaction_id = server_resp["transaction"] if upload_files else None
217-
job = UploadJob(project_path, changes, transaction_id, mp, mc, tmp_dir)
218-
219-
if not upload_files:
220-
mp.log.info("not uploading any files")
221-
job.server_resp = server_resp
222-
push_project_finalize(job)
223-
return None # all done - no pending job
224-
225-
mp.log.info(f"got transaction ID {transaction_id}")
226-
227-
upload_queue_items = []
228-
total_size = 0
229-
# prepare file chunks for upload
230-
for file in upload_files:
231-
if "diff" in file:
232-
# versioned file - uploading diff
233-
file_location = mp.fpath_meta(file["diff"]["path"])
234-
file_size = file["diff"]["size"]
235-
elif "upload_file" in file:
236-
# versioned file - uploading full (a temporary copy)
237-
file_location = file["upload_file"]
238-
file_size = file["size"]
239-
else:
240-
# non-versioned file
241-
file_location = mp.fpath(file["path"])
242-
file_size = file["size"]
243-
244-
for chunk_index, chunk_id in enumerate(file["chunks"]):
245-
size = min(UPLOAD_CHUNK_SIZE, file_size - chunk_index * UPLOAD_CHUNK_SIZE)
246-
upload_queue_items.append(UploadQueueItem(file_location, size, transaction_id, chunk_id, chunk_index))
247-
248-
total_size += file_size
249-
250-
job.total_size = total_size
251-
job.upload_queue_items = upload_queue_items
252-
253-
mp.log.info(f"will upload {len(upload_queue_items)} items with total size {total_size}")
254-
255-
# start uploads in background
256-
job.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
257-
for item in upload_queue_items:
258-
future = job.executor.submit(_do_upload, item, job)
259-
job.futures.append(future)
260-
return job
211+
try:
212+
resp = mc.post(
213+
f"/v1/project/push/{project_path}",
214+
data,
215+
{"Content-Type": "application/json"},
216+
)
217+
except ClientError as err:
218+
mp.log.error("Error starting transaction: " + str(err))
219+
mp.log.info("--- push aborted")
220+
raise
221+
server_resp = json.load(resp)
222+
223+
upload_files = data["changes"]["added"] + data["changes"]["updated"]
224+
225+
transaction_id = server_resp["transaction"] if upload_files else None
226+
job = UploadJob(project_path, changes, transaction_id, mp, mc, tmp_dir)
227+
228+
if not upload_files:
229+
mp.log.info("not uploading any files")
230+
job.server_resp = server_resp
231+
push_project_finalize(job)
232+
return None # all done - no pending job
233+
234+
mp.log.info(f"got transaction ID {transaction_id}")
235+
236+
upload_queue_items = []
237+
total_size = 0
238+
# prepare file chunks for upload
239+
for file in upload_files:
240+
if "diff" in file:
241+
# versioned file - uploading diff
242+
file_location = mp.fpath_meta(file["diff"]["path"])
243+
file_size = file["diff"]["size"]
244+
elif "upload_file" in file:
245+
# versioned file - uploading full (a temporary copy)
246+
file_location = file["upload_file"]
247+
file_size = file["size"]
248+
else:
249+
# non-versioned file
250+
file_location = mp.fpath(file["path"])
251+
file_size = file["size"]
252+
253+
for chunk_index, chunk_id in enumerate(file["chunks"]):
254+
size = min(UPLOAD_CHUNK_SIZE, file_size - chunk_index * UPLOAD_CHUNK_SIZE)
255+
upload_queue_items.append(UploadQueueItem(file_location, size, transaction_id, chunk_id, chunk_index))
256+
257+
total_size += file_size
258+
259+
job.total_size = total_size
260+
job.upload_queue_items = upload_queue_items
261+
262+
mp.log.info(f"will upload {len(upload_queue_items)} items with total size {total_size}")
263+
264+
# start uploads in background
265+
job.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
266+
for item in upload_queue_items:
267+
future = job.executor.submit(_do_upload, item, job)
268+
job.futures.append(future)
269+
jobs.append(job)
270+
return jobs
261271

262272

263273
def push_project_wait(job):

mergin/editor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ def is_editor_enabled(mc, project_info: dict) -> bool:
2525
return server_support and project_role == EDITOR_ROLE_NAME
2626

2727

28-
def _apply_editor_filters(changes: UploadChanges) -> UploadChanges:
28+
def _apply_editor_filters(changes: Dict[str, List[dict]]) -> Dict[str, List[dict]]:
2929
"""
3030
Applies editor-specific filters to the changes dictionary, removing any changes to files that are not in the editor's list of allowed files.
3131
@@ -41,7 +41,7 @@ def _apply_editor_filters(changes: UploadChanges) -> UploadChanges:
4141
return changes
4242

4343

44-
def filter_changes(mc, project_info: dict, changes: UploadChanges) -> UploadChanges:
44+
def filter_changes(mc, project_info: dict, changes: Dict[str, List[dict]]) -> Dict[str, List[dict]]:
4545
"""
4646
Filters the given changes dictionary based on the editor's enabled state.
4747

0 commit comments

Comments
 (0)