Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions server/.test.env
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ SECURITY_BEARER_SALT='bearer'
SECURITY_EMAIL_SALT='email'
SECURITY_PASSWORD_SALT='password'
DIAGNOSTIC_LOGS_DIR=/tmp/diagnostic_logs
GEVENT_WORKER=0
47 changes: 47 additions & 0 deletions server/mergin/sync/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,50 @@ def remove(project_name):
project.removed_by = None
db.session.commit()
click.secho("Project removed", fg="green")

@project.command()
@click.argument("project-name", callback=normalize_input(lowercase=False))
@click.option("--since", type=int, required=False)
@click.option("--to", type=int, required=False)
def create_checkpoint(project_name, since=None, to=None):
"""Create project delta checkpoint, corresponding lower checkpoints and merged diffs for project"""
ws, name = split_project_path(project_name)
workspace = current_app.ws_handler.get_by_name(ws)
if not workspace:
click.secho("ERROR: Workspace does not exist", fg="red", err=True)
sys.exit(1)
project = (
Project.query.filter_by(workspace_id=workspace.id, name=name)
.filter(Project.storage_params.isnot(None))
.first()
)
if not project:
click.secho("ERROR: Project does not exist", fg="red", err=True)
sys.exit(1)

since = since if since is not None else 0
to = to if to is not None else project.latest_version
if since < 0 or to < 1:
click.secho(
"ERROR: Invalid version number, minimum version for 'since' is 0 and minimum version for 'to' is 1",
fg="red",
err=True,
)
sys.exit(1)

if to > project.latest_version:
click.secho(
"ERROR: 'to' version exceeds latest project version", fg="red", err=True
)
sys.exit(1)

if since >= to:
click.secho(
"ERROR: 'since' version must be less than 'to' version",
fg="red",
err=True,
)
sys.exit(1)

project.get_delta_changes(since, to)
click.secho("Project checkpoint(s) created", fg="green")
180 changes: 119 additions & 61 deletions server/mergin/sync/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,9 +399,6 @@ def get_delta_changes(
)
existing_delta_map = {(c.rank, c.version): c for c in expected_deltas}

# Cache all individual (rank 0) delta rows in the required range.
individual_deltas: List[ProjectVersionDelta] = []

result: List[DeltaChange] = []
for checkpoint in expected_checkpoints:
existing_delta = existing_delta_map.get((checkpoint.rank, checkpoint.end))
Expand All @@ -411,29 +408,10 @@ def get_delta_changes(
result.extend(DeltaChangeSchema(many=True).load(existing_delta.changes))
continue

# If higher rank delta checkopoint does not exists, we are using rank=0 deltas to create checkopoint
# If higher rank delta checkopoint does not exists we need to create it
if checkpoint.rank > 0:
individual_deltas = (
ProjectVersionDelta.query.filter(
ProjectVersionDelta.project_id == project_id,
ProjectVersionDelta.version >= since,
ProjectVersionDelta.version <= to,
ProjectVersionDelta.rank == 0,
)
.order_by(ProjectVersionDelta.version)
.all()
if not individual_deltas
else individual_deltas
)

if not individual_deltas:
logging.error(
f"No individual deltas found for project {project_id} in range {since} / {to} to create checkpoint."
)
return

new_checkpoint = ProjectVersionDelta.create_checkpoint(
project_id, checkpoint, individual_deltas
project_id, checkpoint
)
if new_checkpoint:
result.extend(
Expand All @@ -443,6 +421,7 @@ def get_delta_changes(
logging.error(
f"Not possible to create checkpoint for project {project_id} in range {checkpoint.start}-{checkpoint.end}"
)
return

return ProjectVersionDelta.merge_changes(result)

Expand Down Expand Up @@ -520,6 +499,10 @@ def __init__(self, project_id, path):
self.project_id = project_id
self.path = path

def generate_diff_name(self):
"""Generate uniqute diff file name for server generated diff"""
return mergin_secure_filename(f"{self.path}-diff-{uuid.uuid4()}")


class LatestProjectFiles(db.Model):
"""Store project latest version files history ids"""
Expand Down Expand Up @@ -775,7 +758,7 @@ def diffs_chain(
),
None,
)
if diff:
if diff and os.path.exists(diff.abs_path):
diffs.append(diff)
elif item.rank > 0:
# fallback if checkpoint does not exist: replace merged diff with individual diffs
Expand Down Expand Up @@ -876,16 +859,58 @@ def abs_path(self) -> str:
"""
return os.path.join(self.file.project.storage.project_dir, self.location)

@staticmethod
def can_create_checkpoint(file_path_id: int, checkpoint: Checkpoint) -> bool:
"""Check if it makes sense to create a diff file for a checkpoint, e.g. there were relevant changes within the range without breaking changes"""

basefile = FileHistory.get_basefile(file_path_id, checkpoint.end)
if not basefile:
return False

file_was_deleted = (
FileHistory.query.filter_by(file_path_id=file_path_id)
.filter(
FileHistory.project_version_name
>= max(basefile.project_version_name, checkpoint.start),
FileHistory.project_version_name <= checkpoint.end,
FileHistory.change == PushChangeType.DELETE.value,
)
.count()
> 0
)
if file_was_deleted:
return False

query = FileDiff.query.filter_by(basefile_id=basefile.id).filter(
FileDiff.rank == 0
)

# rank 0 is a special case we only verify it exists
if checkpoint.rank == 0:
query = query.filter(FileDiff.version == checkpoint.end)
# for higher ranks we need to check if there were diff updates in that range
else:
query = query.filter(
FileDiff.version >= checkpoint.start,
FileDiff.version <= checkpoint.end,
)

return query.count() > 0

def construct_checkpoint(self) -> bool:
"""Create a diff file checkpoint (aka. merged diff).
Find all smaller diffs which are needed to create the final diff file and merge them.
In case of missing some lower rank checkpoint, use individual diffs instead.
In case of missing some lower rank checkpoints, create them recursively.

Once checkpoint is created, size and checksum are updated in the database.

Returns:
bool: True if checkpoint was successfully created or already present
"""
logging.debug(
f"Construct checkpoint for file {self.path} v{self.version} of rank {self.rank}"
)

if os.path.exists(self.abs_path):
return True

Expand Down Expand Up @@ -914,7 +939,7 @@ def construct_checkpoint(self) -> bool:
return False

diffs_paths = []
# let's confirm we have all intermediate diffs needed, if not, we need to use individual diffs instead
# let's confirm we have all intermediate diffs needed, if not, we need to create them (recursively) first
cached_items = Checkpoint.get_checkpoints(
cache_level.start, cache_level.end - 1
)
Expand All @@ -936,6 +961,7 @@ def construct_checkpoint(self) -> bool:
continue

# find diff in table and on disk
# diffs might not exist because theye were not created yet or there were no changes (e.g. for zeroth rank diffs)
diff = next(
(
d
Expand All @@ -944,27 +970,34 @@ def construct_checkpoint(self) -> bool:
),
None,
)
if diff and os.path.exists(diff.abs_path):

if not diff:
# lower rank diff not even in DB yet - create it and try to construct merged file
if item.rank > 0 and FileDiff.can_create_checkpoint(
self.file_path_id, item
):
diff = FileDiff(
basefile=basefile,
version=item.end,
rank=item.rank,
path=basefile.file.generate_diff_name(),
size=None,
checksum=None,
)
db.session.add(diff)
db.session.commit()
else:
# such diff is not expected to exist
continue

diff_exists = diff.construct_checkpoint()
if diff_exists:
diffs_paths.append(diff.abs_path)
else:
individual_diffs = (
FileDiff.query.filter_by(
basefile_id=basefile.id,
rank=0,
)
.filter(
FileDiff.version >= item.start, FileDiff.version <= item.end
)
.order_by(FileDiff.version)
.all()
logging.error(
f"Unable to create checkpoint diff for {item} for file {self.file_path_id}"
)
if individual_diffs:
diffs_paths.extend([i.abs_path for i in individual_diffs])
else:
logging.error(
f"Unable to find diffs for {item} for file {self.file_path_id}"
)
return False
return False

# we apply latest change (if any) on previous version
end_diff = FileDiff.query.filter_by(
Expand Down Expand Up @@ -1186,34 +1219,59 @@ def create_checkpoint(
cls,
project_id: str,
checkpoint: Checkpoint,
from_deltas: List[ProjectVersionDelta] = [],
) -> Optional[ProjectVersionDelta]:
"""
Creates and caches new checkpoint and any required FileDiff checkpoints.
Use from_deltas to create checkpoint from existing individual deltas.
Returns created ProjectVersionDelta object with checkpoint.
Creates and caches new checkpoint and any required FileDiff checkpoints recursively if needed.
"""
delta_range = [
change
for change in from_deltas
if checkpoint.start <= change.version <= checkpoint.end
]
delta_range = []
# our new checkpoint will be created by adding last individual delta to previous checkpoints
expected_checkpoints = Checkpoint.get_checkpoints(
checkpoint.start, checkpoint.end - 1
)
expected_checkpoints.append(Checkpoint(rank=0, index=checkpoint.end))

expected_deltas = (
ProjectVersionDelta.query.filter(
ProjectVersionDelta.project_id == project_id,
tuple_(ProjectVersionDelta.rank, ProjectVersionDelta.version).in_(
[(item.rank, item.end) for item in expected_checkpoints]
),
)
.order_by(ProjectVersionDelta.version)
.all()
)

existing_delta_map = {(c.rank, c.version): c for c in expected_deltas}
# make sure we have all components, if not, created them (recursively)
for item in expected_checkpoints:
existing_delta = existing_delta_map.get((item.rank, item.end))
if not existing_delta:
existing_delta = cls.create_checkpoint(project_id, item)

if existing_delta:
delta_range.append(existing_delta)
else:
logging.error(
f"Missing project delta endpoint for {project_id} v{item.end} rank {item.rank} which could not be recreated"
)
return

if not delta_range:
logging.warning(
f"No individual changes found for project {project_id} in range v{checkpoint.start}-v{checkpoint.end} to create checkpoint."
f"No changes found for project {project_id} in range v{checkpoint.start}-v{checkpoint.end} to create checkpoint."
)
return None

# dump changes lists from database and flatten list for merging
delta_range = sorted(delta_range, key=lambda x: x.version)
changes = []
for delta in delta_range:
changes.extend(DeltaChangeSchema(many=True).load(delta.changes))
merged_delta_items: List[DeltaChange] = [
d.to_data_delta() for d in cls.merge_changes(changes)
]

# Pre-fetch data for all versioned files to create FileDiff checkpoints
# Pre-fetch data for all versioned files to create FileDiff checkpoints where it makes sense
versioned_delta_items = [
item
for item in merged_delta_items
Expand Down Expand Up @@ -1246,17 +1304,17 @@ def create_checkpoint(
if not base_file:
continue

diff_path = mergin_secure_filename(
f"{item.path}-diff-{uuid.uuid4()}"
)
if not FileDiff.can_create_checkpoint(file_path_id, checkpoint):
continue

checkpoint_diff = FileDiff(
basefile=base_file,
path=diff_path,
path=base_file.file.generate_diff_name(),
rank=checkpoint.rank,
version=checkpoint.end,
)
# Patch the delta with the path to the new diff checkpoint
item.diff = diff_path
item.diff = checkpoint_diff.path
db.session.add(checkpoint_diff)

checkpoint_delta = ProjectVersionDelta(
Expand Down
Loading
Loading