1212import copy
1313import math
1414import os
15+ import pprint
1516import shutil
1617
1718import concurrent .futures
18- import threading
1919
2020from .common import CHUNK_SIZE , ClientError
2121from .merginproject import MerginProject
@@ -76,14 +76,14 @@ def _download_items(file, directory, diff_only=False):
7676 return items
7777
7878
79- def _do_download (item , mc , project_path , job ):
79+ def _do_download (item , mc , mp , project_path , job ):
8080 """ runs in worker thread """
8181 if job .is_cancelled :
8282 return
8383
8484 # TODO: make download_blocking / save_to_file cancellable so that we can cancel as soon as possible
8585
86- item .download_blocking (mc , project_path )
86+ item .download_blocking (mc , mp , project_path )
8787 job .transferred_size += item .size
8888
8989
@@ -99,9 +99,13 @@ def download_project_async(mc, project_path, directory):
9999 os .makedirs (directory )
100100 mp = MerginProject (directory )
101101
102+ mp .log .info (f"--- start download { project_path } " )
103+
102104 project_info = mc .project_info (project_path )
103105 version = project_info ['version' ] if project_info ['version' ] else 'v0'
104106
107+ mp .log .info (f"got project info. version { version } " )
108+
105109 # prepare download
106110 update_tasks = [] # stuff to do at the end of download
107111 for file in project_info ['files' ]:
@@ -116,14 +120,16 @@ def download_project_async(mc, project_path, directory):
116120 download_list .extend (task .download_queue_items )
117121 for item in task .download_queue_items :
118122 total_size += item .size
123+
124+ mp .log .info (f"will download { len (update_tasks )} files in { len (download_list )} chunks, total size { total_size } " )
119125
120126 job = DownloadJob (project_path , total_size , version , update_tasks , download_list , directory , mp , project_info )
121127
122128 # start download
123129 job .executor = concurrent .futures .ThreadPoolExecutor (max_workers = 4 )
124130 job .futures = []
125131 for item in download_list :
126- future = job .executor .submit (_do_download , item , mc , project_path , job )
132+ future = job .executor .submit (_do_download , item , mc , mp , project_path , job )
127133 job .futures .append (future )
128134
129135 return job
@@ -167,6 +173,8 @@ def download_project_finalize(job):
167173 if future .exception () is not None :
168174 raise future .exception ()
169175
176+ job .mp .log .info ("--- download finished" )
177+
170178 for task in job .update_tasks :
171179
172180 # right now only copy tasks...
@@ -235,9 +243,10 @@ def __repr__(self):
235243 return "<DownloadQueueItem path={} version={} diff_only={} part_index={} size={} dest={}>" .format (
236244 self .file_path , self .version , self .diff_only , self .part_index , self .size , self .download_file_path )
237245
238- def download_blocking (self , mc , project_path ):
246+ def download_blocking (self , mc , mp , project_path ):
239247 """ Starts download and only returns once the file has been fully downloaded and saved """
240248
249+ mp .log .debug (f"Downloading { self .file_path } version={ self .version } diff={ self .diff_only } part={ self .part_index } " )
241250 start = self .part_index * (1 + CHUNK_SIZE )
242251 resp = mc .get ("/v1/project/raw/{}" .format (project_path ), data = {
243252 "file" : self .file_path ,
@@ -248,9 +257,11 @@ def download_blocking(self, mc, project_path):
248257 }
249258 )
250259 if resp .status in [200 , 206 ]:
260+ mp .log .debug (f"Download finished: { self .file_path } " )
251261 save_to_file (resp , self .download_file_path )
252262 else :
253- raise ClientError ('Failed to download part {} of file {}' .format (part , basename ))
263+ mp .log .error (f"Download failed: { self .file_path } " )
264+ raise ClientError ('Failed to download part {} of file {}' .format (self .part_index , self .file_path ))
254265
255266
256267class PullJob :
@@ -290,19 +301,27 @@ def pull_project_async(mc, directory):
290301 mp = MerginProject (directory )
291302 project_path = mp .metadata ["name" ]
292303 local_version = mp .metadata ["version" ]
304+
305+ mp .log .info (f"--- start pull { project_path } " )
306+
293307 server_info = mc .project_info (project_path , since = local_version )
294- if local_version == server_info ["version" ]:
308+ server_version = server_info ["version" ]
309+
310+ mp .log .info (f"got project info. version { server_version } " )
311+
312+ if local_version == server_version :
313+ mp .log .info ("--- pull - nothing to do (already at server version)" )
295314 return # Project is up to date
296315
297316 # we either download a versioned file using diffs (strongly preferred),
298317 # but if we don't have history with diffs (e.g. uploaded without diffs)
299318 # then we just download the whole file
300319 _pulling_file_with_diffs = lambda f : 'diffs' in f and len (f ['diffs' ]) != 0
301320
302- server_version = server_info ["version" ]
303321 temp_dir = mp .fpath_meta (f'fetch_{ local_version } -{ server_version } ' )
304322 os .makedirs (temp_dir , exist_ok = True )
305323 pull_changes = mp .get_pull_changes (server_info ["files" ])
324+ mp .log .debug ("pull changes:\n " + pprint .pformat (pull_changes ))
306325 fetch_files = []
307326 for f in pull_changes ["added" ]:
308327 f ['version' ] = server_version
@@ -364,13 +383,15 @@ def pull_project_async(mc, directory):
364383 for item in file_to_merge .downloaded_items :
365384 total_size += item .size
366385
386+ mp .log .info (f"will download { len (download_list )} chunks, total size { total_size } " )
387+
367388 job = PullJob (project_path , pull_changes , total_size , server_version , files_to_merge , download_list , temp_dir , mp , server_info , basefiles_to_patch )
368389
369390 # start download
370391 job .executor = concurrent .futures .ThreadPoolExecutor (max_workers = 4 )
371392 job .futures = []
372393 for item in download_list :
373- future = job .executor .submit (_do_download , item , mc , project_path , job )
394+ future = job .executor .submit (_do_download , item , mc , mp , project_path , job )
374395 job .futures .append (future )
375396
376397 return job
@@ -450,6 +471,8 @@ def pull_project_finalize(job):
450471 if future .exception () is not None :
451472 raise future .exception ()
452473
474+ job .mp .log .info ("finalizing pull" )
475+
453476 # merge downloaded chunks
454477 for file_to_merge in job .files_to_merge :
455478 file_to_merge .merge ()
@@ -478,6 +501,8 @@ def pull_project_finalize(job):
478501 'version' : job .version if job .version else "v0" , # for new projects server version is ""
479502 'files' : job .project_info ['files' ]
480503 }
481-
504+
505+ job .mp .log .info ("--- pull finished" )
506+
482507 shutil .rmtree (job .temp_dir )
483508 return conflicts
0 commit comments