Skip to content

Commit cf232a6

Browse files
committed
Merge remote-tracking branch 'origin/push-v2-integration' into changes_limits
2 parents 350aedc + 0119b7d commit cf232a6

File tree

6 files changed

+119
-85
lines changed

6 files changed

+119
-85
lines changed

mergin/cli.py

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -477,17 +477,36 @@ def sync(ctx):
477477
if mc is None:
478478
return
479479
directory = os.getcwd()
480-
upload_job = None
480+
current_job = None
481+
current_bar = None
481482
try:
482-
483-
def on_progress(increment, push_job):
484-
nonlocal upload_job
485-
upload_job = push_job
486-
487-
# run pull & push cycles until there are no local changes
488-
mc.sync_project(directory, progress_callback=on_progress)
489-
490-
click.secho("Sync complete.", fg="green")
483+
# Iterate over the generator to get updates
484+
for size_change, job in mc.sync_project(directory, upload_progress=True):
485+
# Check if this is a new job (a new push operation)
486+
if job and job != current_job:
487+
# If a previous bar exists, close it
488+
if current_bar:
489+
current_bar.finish()
490+
491+
# A new push job has started. Initialize a new progress bar.
492+
click.echo(f"Starting upload")
493+
current_job = job
494+
495+
# The length of the progress bar should be the total size of the job
496+
# You'll need to get this from your job object (e.g., job.total_size)
497+
total_size = job.total_size
498+
current_bar = click.progressbar(
499+
length=total_size,
500+
label=f"Uploading project",
501+
)
502+
503+
# Update the current progress bar with the size increment
504+
current_bar.update(size_change)
505+
506+
# After the loop finishes, make sure to close the final progress bar
507+
if current_bar:
508+
current_bar.finish()
509+
click.secho("\nProject synced successfully", fg="green")
491510

492511
except InvalidProject as e:
493512
click.secho("Invalid project directory ({})".format(str(e)), fg="red")
@@ -496,8 +515,8 @@ def on_progress(increment, push_job):
496515
return
497516
except KeyboardInterrupt:
498517
click.secho("Cancelling...")
499-
if upload_job:
500-
push_project_cancel(upload_job)
518+
if current_job:
519+
push_project_cancel(current_job)
501520
except Exception as e:
502521
_print_unhandled_exception()
503522

mergin/client.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1512,13 +1512,16 @@ def create_invitation(self, workspace_id: int, email: str, workspace_role: Works
15121512
ws_inv = self.post(f"v2/workspaces/{workspace_id}/invitations", params, json_headers)
15131513
return json.load(ws_inv)
15141514

1515-
def sync_project(self, project_directory, progress_callback=None):
1515+
def sync_project(self, project_directory, upload_progress=False):
15161516
"""
15171517
Syncs project by loop with these steps:
15181518
1. Pull server version
15191519
2. Get local changes
15201520
3. Push first change batch
15211521
Repeat if there are more local changes.
1522+
1523+
:param project_directory: Project's directory
1524+
:param upload_progress: If True, the method will be a generator yielding upload progress as (size_change, job) tuples.
15221525
"""
15231526
mp = MerginProject(project_directory)
15241527
has_changes = True
@@ -1533,19 +1536,18 @@ def sync_project(self, project_directory, progress_callback=None):
15331536
job = push_project_async(self, project_directory)
15341537
if not job:
15351538
break
1536-
if not progress_callback:
1539+
if not upload_progress:
15371540
push_project_wait(job)
15381541
else:
15391542
last_size = 0
15401543
while push_project_is_running(job):
15411544
sleep(SYNC_CALLBACK_WAIT)
15421545
current_size = job.transferred_size
1543-
progress_callback(
1544-
current_size - last_size, job
1545-
) # call callback with transferred size increment
1546+
yield (current_size - last_size, job) # Yields the size change and the job object
15461547
last_size = current_size
15471548
push_project_finalize(job)
15481549
_, has_changes = get_push_changes_batch(self, mp)
1550+
server_conflict_attempts = 0
15491551
except ClientError as e:
15501552
if e.is_retryable_sync() and server_conflict_attempts < PUSH_ATTEMPTS - 1:
15511553
# retry on conflict, e.g. when server has changes that we do not have yet
@@ -1556,5 +1558,3 @@ def sync_project(self, project_directory, progress_callback=None):
15561558
sleep(PUSH_ATTEMPT_WAIT)
15571559
continue
15581560
raise e
1559-
else:
1560-
server_conflict_attempts = 0

mergin/client_push.py

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import time
2323
from typing import List, Tuple, Optional, ByteString
2424

25-
from .local_changes import ChangesValidationError, LocalChange, LocalChanges
25+
from .local_changes import ChangesValidationError, FileChange, LocalPojectChanges
2626

2727
from .common import (
2828
MAX_UPLOAD_VERSIONED_SIZE,
@@ -73,7 +73,7 @@ def __init__(
7373

7474
self._request_headers = {"Content-Type": "application/octet-stream"}
7575

76-
def upload_chunk(self, data: ByteString, checksum: str):
76+
def upload_chunk_v1_api(self, data: ByteString, checksum: str):
7777
"""
7878
Uploads the chunk to the server.
7979
"""
@@ -125,7 +125,7 @@ def upload_blocking(self):
125125
self.upload_chunk_v2_api(data, checksum_str)
126126
else:
127127
# use v1 API for uploading chunks
128-
self.upload_chunk(data, checksum_str)
128+
self.upload_chunk_v1_api(data, checksum_str)
129129
break # exit loop if upload was successful
130130
except ClientError as e:
131131
if attempt < UPLOAD_CHUNK_ATTEMPTS - 1:
@@ -140,10 +140,10 @@ class UploadJob:
140140
"""Keeps all the important data about a pending upload job"""
141141

142142
def __init__(
143-
self, version: str, changes: LocalChanges, transaction_id: Optional[str], mp: MerginProject, mc, tmp_dir
143+
self, version: str, changes: LocalPojectChanges, transaction_id: Optional[str], mp: MerginProject, mc, tmp_dir
144144
):
145145
self.version = version
146-
self.changes: LocalChanges = changes # dictionary of local changes to the project
146+
self.changes: LocalPojectChanges = changes # dictionary of local changes to the project
147147
self.transaction_id = transaction_id # ID of the transaction assigned by the server
148148
self.total_size = 0 # size of data to upload (in bytes)
149149
self.transferred_size = 0 # size of data already uploaded (in bytes)
@@ -167,8 +167,8 @@ def _submit_item_to_thread(self, item: UploadQueueItem):
167167
future = self.executor.submit(_do_upload, item, self)
168168
self.futures.append(future)
169169

170-
def add_items(self, items: List[UploadQueueItem]):
171-
"""Add multiple chunks to the upload queue"""
170+
def start(self, items: List[UploadQueueItem]):
171+
"""Starting upload in background with multiple upload items (UploadQueueItem)"""
172172
self.total_size = sum(item.size for item in items)
173173
self.upload_queue_items = items
174174

@@ -182,7 +182,10 @@ def add_items(self, items: List[UploadQueueItem]):
182182
self._submit_item_to_thread(item)
183183

184184
def update_chunks_from_items(self):
185-
"""Update chunks in LocalChanges from the upload queue items."""
185+
"""
186+
Update chunks in LocalProjectChanges from the upload queue items.
187+
Used just before finalizing the transaction to set the server_chunk_id in v2 API.
188+
"""
186189
self.changes.update_chunks([(item.chunk_id, item.server_chunk_id) for item in self.upload_queue_items])
187190

188191

@@ -195,7 +198,7 @@ def _do_upload(item: UploadQueueItem, job: UploadJob):
195198
job.transferred_size += item.size
196199

197200

198-
def create_upload_chunks(mc, mp: MerginProject, local_changes: List[LocalChange]) -> List[UploadQueueItem]:
201+
def create_upload_chunks(mc, mp: MerginProject, local_changes: List[FileChange]) -> List[UploadQueueItem]:
199202
"""
200203
Create a list of UploadQueueItem objects from the changes dictionary and calculate total size of files.
201204
This is used to prepare the upload queue for the upload job.
@@ -228,7 +231,7 @@ def create_upload_chunks(mc, mp: MerginProject, local_changes: List[LocalChange]
228231

229232

230233
def create_upload_job(
231-
mc, mp: MerginProject, changes: LocalChanges, tmp_dir: tempfile.TemporaryDirectory
234+
mc, mp: MerginProject, changes: LocalPojectChanges, tmp_dir: tempfile.TemporaryDirectory
232235
) -> Optional[UploadJob]:
233236
"""
234237
Prepare transaction and create an upload job for the project using the v1 API.
@@ -268,18 +271,20 @@ def create_upload_job(
268271
if not upload_changes:
269272
mp.log.info("not uploading any files")
270273
if push_start_resp:
274+
# This is related just to v1 API
271275
job.server_resp = push_start_resp
272276
push_project_finalize(job)
273277
return # all done - no pending job
274278

275279
if transaction_id:
280+
# This is related just to v1 API
276281
mp.log.info(f"got transaction ID {transaction_id}")
277282

278283
# prepare file chunks for upload
279284
upload_queue_items = create_upload_chunks(mc, mp, upload_changes)
280285

281286
mp.log.info(f"Starting upload chunks for project {project_id}")
282-
job.add_items(upload_queue_items)
287+
job.start(upload_queue_items)
283288
return job
284289

285290

@@ -451,6 +456,7 @@ def push_project_cancel(job: UploadJob):
451456

452457
job.executor.shutdown(wait=True)
453458
if not job.transaction_id:
459+
# If not v2 api endpoint with transaction, nothing to cancel on server
454460
job.mp.log.info("--- push cancelled")
455461
return
456462
try:
@@ -473,7 +479,7 @@ def remove_diff_files(job: UploadJob) -> None:
473479
os.remove(diff_file)
474480

475481

476-
def get_push_changes_batch(mc, mp: MerginProject) -> Tuple[LocalChanges, int]:
482+
def get_push_changes_batch(mc, mp: MerginProject) -> Tuple[LocalPojectChanges, int]:
477483
"""
478484
Get changes that need to be pushed to the server.
479485
"""
@@ -482,10 +488,10 @@ def get_push_changes_batch(mc, mp: MerginProject) -> Tuple[LocalChanges, int]:
482488
changes = filter_changes(mc, project_role, changes)
483489

484490
try:
485-
local_changes = LocalChanges(
486-
added=[LocalChange(**change) for change in changes["added"]],
487-
updated=[LocalChange(**change) for change in changes["updated"]],
488-
removed=[LocalChange(**change) for change in changes["removed"]],
491+
local_changes = LocalPojectChanges(
492+
added=[FileChange(**change) for change in changes["added"]],
493+
updated=[FileChange(**change) for change in changes["updated"]],
494+
removed=[FileChange(**change) for change in changes["removed"]],
489495
)
490496
except ChangesValidationError as e:
491497
raise ClientError(

mergin/editor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ def prevent_conflicted_copy(path: str, mc, project_role: str) -> bool:
6464
Args:
6565
path (str): The file path to check.
6666
mc: The Mergin client object.
67-
project_info (dict): Information about the Mergin project from server.
67+
project_role: Current project role.
6868
6969
Returns:
7070
bool: True if the file path should be prevented from ceating conflicted copy, False otherwise.

mergin/local_changes.py

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,28 @@ def __init__(self, message, invalid_changes=[], max_media_upload_size=None, max_
1818

1919

2020
@dataclass
21-
class BaseLocalChange:
21+
class FileDiffChange:
2222
path: str
2323
checksum: str
2424
size: int
2525
mtime: datetime
2626

2727

2828
@dataclass
29-
class LocalChange(BaseLocalChange):
29+
class FileChange:
30+
# path to the file relative to the project root
31+
path: str
32+
# file checksum
33+
checksum: str
34+
# file size in bytes
35+
size: int
36+
# file modification time
37+
mtime: datetime
38+
# original file checksum used for compairison
3039
origin_checksum: Optional[str] = None
40+
# list of chunk ids that make up this file
3141
chunks: List[str] = field(default_factory=list)
42+
# optional diff information for geopackage files with geodiff metadata
3243
diff: Optional[dict] = None
3344
upload_file: Optional[str] = None
3445
# some functions (MerginProject.compare_file_sets) are adding version to the change from project info
@@ -38,9 +49,9 @@ class LocalChange(BaseLocalChange):
3849
# some functions (MerginProject.compare_file_sets) are adding location dict to the change from project info
3950
location: Optional[str] = None
4051

41-
def get_diff(self) -> Optional[BaseLocalChange]:
52+
def get_diff(self) -> Optional[FileDiffChange]:
4253
if self.diff:
43-
return BaseLocalChange(
54+
return FileDiffChange(
4455
path=self.diff.get("path", ""),
4556
checksum=self.diff.get("checksum", ""),
4657
size=self.diff.get("size", 0),
@@ -64,10 +75,10 @@ def to_server_data(self) -> dict:
6475

6576

6677
@dataclass
67-
class LocalChanges:
68-
added: List[LocalChange] = field(default_factory=list)
69-
updated: List[LocalChange] = field(default_factory=list)
70-
removed: List[LocalChange] = field(default_factory=list)
78+
class LocalPojectChanges:
79+
added: List[FileChange] = field(default_factory=list)
80+
updated: List[FileChange] = field(default_factory=list)
81+
removed: List[FileChange] = field(default_factory=list)
7182

7283
def __post_init__(self):
7384
"""
@@ -101,7 +112,7 @@ def to_server_payload(self) -> dict:
101112
"removed": [change.to_server_data() for change in self.removed],
102113
}
103114

104-
def get_upload_changes(self) -> List[LocalChange]:
115+
def get_upload_changes(self) -> List[FileChange]:
105116
"""
106117
Get all changes that need to be uploaded.
107118
This includes added and updated files.
@@ -115,9 +126,7 @@ def _map_unique_chunks(self, change_chunks: List[str], server_chunks: List[Tuple
115126
mapped = []
116127
seen = set()
117128
for chunk in change_chunks:
118-
for server_chunk in server_chunks:
119-
chunk_id = server_chunk[0]
120-
server_chunk_id = server_chunk[1]
129+
for chunk_id, server_chunk_id in server_chunks:
121130
if chunk_id == chunk and server_chunk_id not in seen:
122131
mapped.append(server_chunk_id)
123132
seen.add(server_chunk_id)

0 commit comments

Comments
 (0)