Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions mpt_tool/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
from rich.console import Console

from mpt_tool.enums import MigrationTypeEnum
from mpt_tool.errors import NewMigrationError, RunMigrationError
from mpt_tool.renders import MigrationRender
from mpt_tool.use_cases import RunMigrationsUseCase
from mpt_tool.use_cases.apply_migration import ApplyMigrationUseCase
from mpt_tool.use_cases.errors import ApplyMigrationError, NewMigrationError, RunMigrationError
from mpt_tool.use_cases.list_migrations import ListMigrationsUseCase
from mpt_tool.use_cases.new_migration import NewMigrationUseCase

Expand All @@ -20,9 +21,17 @@ def callback() -> None:


@app.command("migrate")
def migrate( # noqa: C901, WPS210, WPS213, WPS238, WPS231
def migrate( # noqa: C901, WPS210, WPS211, WPS213, WPS238, WPS231
data: Annotated[bool, typer.Option("--data", help="Run data migrations.")] = False, # noqa: FBT002
schema: Annotated[bool, typer.Option("--schema", help="Run schema migrations.")] = False, # noqa: FBT002
fake: Annotated[
str | None,
typer.Option(
"--fake",
help="Mark the migration provided as applied without running it",
metavar="MIGRATION_ID",
),
] = None,
new_data: Annotated[
str | None,
typer.Option(
Expand All @@ -42,7 +51,7 @@ def migrate( # noqa: C901, WPS210, WPS213, WPS238, WPS231
list: Annotated[bool, typer.Option("--list", help="List all migrations.")] = False, # noqa: A002, FBT002
) -> None:
"""Migrate command."""
options = sum([bool(data), bool(schema), bool(new_data), bool(new_schema), bool(list)]) # noqa: WPS221
options = sum([data, schema, bool(fake), bool(new_data), bool(new_schema), list]) # noqa: WPS221
if options > 1:
raise typer.BadParameter("Only one option can be used.", param_hint="migrate")
if not options:
Expand All @@ -52,16 +61,26 @@ def migrate( # noqa: C901, WPS210, WPS213, WPS238, WPS231
migration_type = MigrationTypeEnum.DATA if data else MigrationTypeEnum.SCHEMA
typer.echo(f"Running {migration_type} migrations...")

run_migration = RunMigrationsUseCase()
try:
run_migration.execute(migration_type)
RunMigrationsUseCase().execute(migration_type)
except RunMigrationError as error:
typer.secho(f"Error running migrations: {error!s}", fg=typer.colors.RED)
raise typer.Abort

typer.secho("Migrations completed successfully.", fg=typer.colors.GREEN)
return

if fake:
typer.echo(f"Running migration {fake} in fake mode.")
try:
ApplyMigrationUseCase().execute(migration_id=fake)
except ApplyMigrationError as error:
typer.secho(f"Error running migration: {error!s}", fg=typer.colors.RED)
raise typer.Abort

typer.secho(f"Migration {fake} applied successfully.", fg=typer.colors.GREEN)
return

if new_schema or new_data:
filename_suffix = new_data or new_schema
migration_type = MigrationTypeEnum.DATA if new_data else MigrationTypeEnum.SCHEMA
Expand Down
24 changes: 0 additions & 24 deletions mpt_tool/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,3 @@ class BaseError(Exception):
@override
def __init__(self, message: str):
super().__init__(message)


class CreateMigrationError(BaseError):
"""Error creating the migration file."""


class LoadMigrationError(BaseError):
"""Error loading migrations."""


class NewMigrationError(BaseError):
"""Error creating new migration."""


class MigrationFolderError(BaseError):
"""Error accessing migrations folder."""


class RunMigrationError(BaseError):
"""Error running migrations."""


class StateNotFoundError(BaseError):
"""Error getting state from state file."""
4 changes: 4 additions & 0 deletions mpt_tool/managers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from mpt_tool.managers.file_migration import FileMigrationManager
from mpt_tool.managers.file_state import FileStateManager

__all__ = ["FileMigrationManager", "FileStateManager"]
15 changes: 15 additions & 0 deletions mpt_tool/managers/encoders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import json
from typing import Any, override

from mpt_tool.models import Migration


class StateJSONEncoder(json.JSONEncoder):
"""JSON encoder for migration states."""

@override
def default(self, obj: object) -> Any: # noqa: WPS110
if isinstance(obj, Migration):
return obj.to_dict()

return super().default(obj)
25 changes: 25 additions & 0 deletions mpt_tool/managers/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from mpt_tool.errors import BaseError


class ManagerError(BaseError):
"""Base error for all manager errors."""


class CreateMigrationError(ManagerError):
"""Error creating the migration file."""


class InvalidStateError(ManagerError):
"""Error loading invalid state."""


class LoadMigrationError(ManagerError):
"""Error loading migrations."""


class MigrationFolderError(ManagerError):
"""Error accessing migrations folder."""


class StateNotFoundError(ManagerError):
"""Error getting state from state file."""
124 changes: 42 additions & 82 deletions mpt_tool/managers.py → mpt_tool/managers/file_migration.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
import json
import re
from collections import Counter
from importlib.util import module_from_spec, spec_from_file_location
from pathlib import Path
from types import ModuleType
from typing import Any, override
from typing import cast

from mpt_tool.constants import MIGRATION_FOLDER, MIGRATION_STATE_FILE
from mpt_tool.commands.base import BaseCommand
from mpt_tool.constants import MIGRATION_FOLDER
from mpt_tool.enums import MigrationTypeEnum
from mpt_tool.errors import (
CreateMigrationError,
LoadMigrationError,
MigrationFolderError,
StateNotFoundError,
)
from mpt_tool.models import Migration, MigrationFile
from mpt_tool.managers.errors import CreateMigrationError, LoadMigrationError, MigrationFolderError
from mpt_tool.models import MigrationFile
from mpt_tool.templates import MIGRATION_SCAFFOLDING_TEMPLATE


Expand All @@ -24,26 +18,49 @@ class FileMigrationManager:
_migration_folder: Path = Path(MIGRATION_FOLDER)

@classmethod
def load_migration(cls, migration_file: MigrationFile) -> ModuleType:
"""Loads a migration module from a migration file.
def load_migration(cls, migration_file: MigrationFile) -> BaseCommand:
"""Loads a migration instance from a migration file.

Args:
migration_file: The migration file to load.

Returns:
The loaded migration module.
The migration instance.

Raise:
LoadMigrationError: If an error occurs during migration loading.
"""
spec = spec_from_file_location(migration_file.name, migration_file.full_path)
if spec is None or spec.loader is None:
raise LoadMigrationError(f"Failed to load migration file: {migration_file.full_path}")

migration_module = module_from_spec(spec)
spec.loader.exec_module(migration_module)
return migration_module
try:
spec.loader.exec_module(migration_module)
except ImportError as error:
raise LoadMigrationError(f"Failed to import migration module: {error!s}") from error

try:
migration_instance = cast(BaseCommand, migration_module.Command())
except (TypeError, AttributeError) as error:
raise LoadMigrationError(f"Invalid migration Command: {error!s}") from error

return migration_instance

@classmethod
def new_migration(cls, file_suffix: str, migration_type: MigrationTypeEnum) -> MigrationFile:
"""Creates a new migration file."""
"""Creates a new migration file.

Args:
file_suffix: The suffix to use for the migration file name.
migration_type: The type of migration to create.

Return:
The newly created migration file.

Raises:
CreateMigrationError: If an error occurs during migration creation.
"""
cls._migration_folder.mkdir(parents=True, exist_ok=True)
try:
migration_file = MigrationFile.new(migration_id=file_suffix, path=cls._migration_folder)
Expand Down Expand Up @@ -74,7 +91,14 @@ def retrieve_migration_files(cls) -> tuple[MigrationFile, ...]:

@classmethod
def validate(cls) -> tuple[MigrationFile, ...]:
"""Validates the migration folder and returns a tuple of migration files."""
"""Validates the migration folder and returns a tuple of migration files.

Returns:
The validated migration files.

Raises:
MigrationFolderError: If an error occurs during migration validation.
"""
if not cls._migration_folder.exists():
raise MigrationFolderError(f"Migration folder not found: {cls._migration_folder}")

Expand Down Expand Up @@ -106,67 +130,3 @@ def _get_migration_files(cls) -> tuple[MigrationFile, ...]:
raise MigrationFolderError(str(error)) from None

return tuple(migrations)


class StateJSONEncoder(json.JSONEncoder):
"""JSON encoder for migration states."""

@override
def default(self, obj: object) -> Any: # noqa: WPS110
if isinstance(obj, Migration):
return obj.to_dict()

return super().default(obj)


class FileStateManager:
"""Manages migration states."""

_state_path: Path = Path(MIGRATION_STATE_FILE)

@classmethod
def load(cls) -> dict[str, Migration]:
"""Load migration states from the state file."""
if not cls._state_path.exists():
return {}

state_data = json.loads(cls._state_path.read_text(encoding="utf-8"))
return {key: Migration.from_dict(mig_data) for key, mig_data in state_data.items()}

@classmethod
def get_by_id(cls, migration_id: str) -> Migration:
"""Get a migration state by its ID."""
state_data = cls.load()
try:
state = state_data[migration_id]
except KeyError:
raise StateNotFoundError("State not found") from None

return state

@classmethod
def new(cls, migration_id: str, migration_type: MigrationTypeEnum, order_id: int) -> Migration:
"""Create a new migration state."""
state_data = cls.load()
new_state = Migration(
migration_id=migration_id,
order_id=order_id,
type=migration_type,
)
state_data[migration_id] = new_state
cls.save(state_data)
return new_state

@classmethod
def save(cls, state_data: dict[str, Migration]) -> None:
"""Save migration states to the state file."""
cls._state_path.write_text(
json.dumps(state_data, indent=2, cls=StateJSONEncoder), encoding="utf-8"
)

@classmethod
def save_state(cls, state: Migration) -> None:
"""Save a migration state to the state file."""
state_data = cls.load()
state_data[state.migration_id] = state
cls.save(state_data)
65 changes: 65 additions & 0 deletions mpt_tool/managers/file_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import json
from pathlib import Path

from mpt_tool.constants import MIGRATION_STATE_FILE
from mpt_tool.enums import MigrationTypeEnum
from mpt_tool.managers.encoders import StateJSONEncoder
from mpt_tool.managers.errors import InvalidStateError, StateNotFoundError
from mpt_tool.models import Migration


class FileStateManager:
"""Manages migration states."""

_state_path: Path = Path(MIGRATION_STATE_FILE)

@classmethod
def load(cls) -> dict[str, Migration]:
"""Load migration states from the state file."""
if not cls._state_path.exists():
return {}

try:
state_data = json.loads(cls._state_path.read_text(encoding="utf-8"))
except json.JSONDecodeError as error:
raise InvalidStateError(f"Invalid state file: {error!s}") from error

return {key: Migration.from_dict(mig_data) for key, mig_data in state_data.items()}

@classmethod
def get_by_id(cls, migration_id: str) -> Migration:
"""Get a migration state by its ID."""
state_data = cls.load()
try:
state = state_data[migration_id]
except KeyError:
raise StateNotFoundError("State not found") from None

return state

@classmethod
def new(cls, migration_id: str, migration_type: MigrationTypeEnum, order_id: int) -> Migration:
"""Create a new migration state."""
state_data = cls.load()
new_state = Migration(
migration_id=migration_id,
order_id=order_id,
type=migration_type,
)
state_data[migration_id] = new_state
cls.save(state_data)
return new_state

@classmethod
def save(cls, state_data: dict[str, Migration]) -> None:
"""Save migration states to the state file."""
cls._state_path.write_text(
json.dumps(state_data, indent=2, cls=StateJSONEncoder), encoding="utf-8"
)

@classmethod
def save_state(cls, state: Migration) -> None:
"""Save a migration state to the state file."""
state_data = cls.load()
state_data[state.migration_id] = state
cls.save(state_data)
5 changes: 5 additions & 0 deletions mpt_tool/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ def failed(self) -> None:
self.started_at = None
self.applied_at = None

def fake(self) -> None:
"""Mark the migration as fake."""
self.started_at = None
self.applied_at = dt.datetime.now(tz=dt.UTC)

def start(self) -> None:
"""Mark the migration as started."""
self.started_at = dt.datetime.now(tz=dt.UTC)
Expand Down
Loading