Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion meta
209 changes: 146 additions & 63 deletions openslides_backend/migrations/migration_handler.py

Large diffs are not rendered by default.

72 changes: 66 additions & 6 deletions openslides_backend/migrations/migration_helper.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
from enum import StrEnum
from importlib import import_module
from io import StringIO
from os import SEEK_END, listdir
from re import Match, match
from threading import Thread
from typing import Any
Expand All @@ -10,6 +10,7 @@
from psycopg.rows import DictRow
from psycopg.types.json import Jsonb

from meta.dev.src.helper_get_names import HelperGetNames
from openslides_backend.migrations.exceptions import MigrationException
from openslides_backend.models.base import model_registry
from openslides_backend.services.postgresql.db_connection_handling import (
Expand Down Expand Up @@ -86,7 +87,7 @@ def write_line(message: str) -> None:
if MigrationHelper.migrate_thread_stream_just_read:
MigrationHelper.migrate_thread_stream_read_pos = stream.tell()
MigrationHelper.migrate_thread_stream_just_read = False
stream.seek(0, SEEK_END)
stream.seek(0, os.SEEK_END)
MigrationHelper.migrate_thread_stream.write(message + "\n")

@staticmethod
Expand Down Expand Up @@ -117,7 +118,8 @@ def load_migrations() -> None:
migration_number: int
reMatch: Match[str] | None

migrations = listdir(MIGRATIONS_PATH)
MigrationHelper.migrations = {}
migrations = os.listdir(MIGRATIONS_PATH)

for migration in migrations:
reMatch = match(r"(?P<migration>\d{4}_.*)\.py", migration)
Expand Down Expand Up @@ -225,7 +227,7 @@ def assert_migration_index(curs: Cursor[DictRow]) -> None:

if backend_migration_index > database_migration_index:
raise ActionException(
f"Missing {backend_migration_index-database_migration_index} migrations to apply."
f"Missing {backend_migration_index-database_migration_index} migrations to be applied."
)

if backend_migration_index < database_migration_index:
Expand Down Expand Up @@ -347,6 +349,13 @@ def get_public_tables(curs: Cursor[DictRow]) -> list[str]:
not in ("version", "os_notify_log_t", "truncate_tables")
]

@staticmethod
def get_migration_class(module_name: str) -> Any:
"""
Returns the class Migration within the specified module.
"""
return getattr(import_module(f"{MODULE_PATH}{module_name}"), "Migration")

@staticmethod
def get_replace_tables(
migration_number: int,
Expand All @@ -355,7 +364,9 @@ def get_replace_tables(
Returns the replace tables mapping origin table to its migration copy.
"""
module_name = MigrationHelper.migrations[migration_number]
migration_module = import_module(f"{MODULE_PATH}{module_name}")
migration_class = MigrationHelper.get_migration_class(module_name)
# TODO Problem we can't rely on the ORIGIN_TABLES as we also rely on intermediate table information.
# That information would ultimately have to come from the yml files and that get's changed with every migration.
return {
col: {
"table": col + "_m",
Expand All @@ -366,7 +377,7 @@ def get_replace_tables(
if field.write_fields
],
}
for col in migration_module.ORIGIN_COLLECTIONS
for col in migration_class.ORIGIN_COLLECTIONS
}

@staticmethod
Expand Down Expand Up @@ -408,3 +419,52 @@ def get_unified_replace_tables_from_database(
},
relevant_mis,
)

@staticmethod
def get_view_definition(curs: Cursor[DictRow], collection: str) -> str:
curs.execute(
"""
SELECT pg_get_viewdef(%s::regclass, true) AS viewdef
FROM pg_class
WHERE relname = %s AND relkind = 'v';
""",
(collection, collection),
)
row = curs.fetchone()
if not row:
raise ValueError(f"Source view not found: {collection}")
return row["viewdef"]

@staticmethod
def get_all_view_definitions(curs: Cursor[DictRow]) -> list[dict[str, Any]]:
curs.execute("""
SELECT relname, pg_get_viewdef(relname::regclass, true) AS viewdef
FROM pg_class
WHERE relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'public')
AND relkind = 'v';
""")
result = curs.fetchall()
if not result:
raise ValueError("No views found.")
return result

# # # actual migration functions # # #

@staticmethod
def delete_field(curs: Cursor[DictRow], collection: str, field_name: str) -> None:
viewdef = MigrationHelper.get_view_definition(curs, collection)

curs.execute(
sql.SQL("CREATE OR REPLACE VIEW {} AS {};").format(
sql.Identifier(collection + "vm"),
sql.SQL(viewdef),
)
)
curs.execute(
sql.SQL("ALTER TABLE {} DROP COLUMN {};").format(
sql.Identifier(
HelperGetNames.get_table_name(collection, migration=True)
),
sql.Identifier(field_name),
)
)
162 changes: 78 additions & 84 deletions openslides_backend/migrations/migration_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
from threading import Thread
from typing import Any, cast

from psycopg import Cursor, sql
from psycopg.rows import DictRow
from psycopg import sql

from openslides_backend.migrations.exceptions import (
MigrationException,
Expand Down Expand Up @@ -90,19 +89,12 @@ def get_stats(self) -> dict[str, Any]:
Doesn't respect initial migration if executed on a higher target migration index.
"""

def count(table: str, curs: Cursor[DictRow]) -> int:
def count(table: str) -> int:
if MIN_NON_REL_MIGRATION <= current_migration_index < 100:
if table.endswith("_m"):
# initial migration uses the original table_t instead of migration table_m
# to count migrated models.
statement_part = sql.SQL("{table}").format(
table=sql.Identifier(table[:-2] + "_t")
)
else:
# initial migration uses the models instead of table_t to count models
statement_part = sql.SQL(
"models WHERE fqid LIKE '{collection}/%' and deleted = false"
).format(collection=sql.SQL(table[:-2]))
# initial migration uses the models instead of table_t to count models
statement_part = sql.SQL(
"models WHERE fqid LIKE '{collection}/%' and deleted = false"
).format(collection=sql.SQL(table[:-2]))
else:
statement_part = sql.SQL("{table}").format(table=sql.Identifier(table))
response = self.cursor.execute(
Expand Down Expand Up @@ -134,26 +126,27 @@ def count(table: str, curs: Cursor[DictRow]) -> int:
).items()
}
stats = {
collection: {
"count": amount,
"migrated": count(migration_table, self.cursor),
}
collection: amount
for collection, migration_table in unmigrated_collections.items()
if (amount := count(collection + "_t", self.cursor))
if (amount := count(collection + "_t"))
}

if exc := MigrationHelper.migrate_thread_exception:
module_name = getattr(exc, "__module__", "")
return {
**self.get_migration_result(),
"current_migration_index": current_migration_index,
"target_migration_index": self.target_migration_index,
**(
{"exception": str(MigrationHelper.migrate_thread_exception)}
if MigrationHelper.migrate_thread_exception
{
"exception": f"{module_name}{'.' if module_name else ''}{type(exc).__qualname__}: {exc}"
}
if exc
else {"migratable_models": stats}
),
}

def assert_valid_migration_index(self, curs: Cursor[DictRow]) -> None:
def assert_valid_migration_index(self) -> None:
"""assert consistent migration index"""
database_m_idx = MigrationHelper.get_database_migration_index(self.cursor)
if database_m_idx > self.target_migration_index:
Expand All @@ -173,80 +166,80 @@ def handle_request(self, payload: dict[str, Any]) -> dict[str, Any]:
if not (command := payload.get("cmd")):
raise View400Exception("No command provided")
self.logger.info(f"Migration command: {command}")
if command not in iter(MigrationCommand):
raise View400Exception("Unknown command: " + command)
with get_new_os_conn() as conn:
conn.transaction()
with conn.cursor() as curs:
self.cursor = curs
state = MigrationHelper.get_migration_state(curs)
if command == MigrationCommand.PROGRESS:
return self.handle_progress_command()
elif command == MigrationCommand.STATS:
return {"stats": self.get_stats()}
self.assert_valid_migration_index(curs)
elif command != MigrationCommand.RESET:
self.assert_valid_migration_index()

match MigrationHelper.get_migration_state(curs):
case (
MigrationState.MIGRATION_RUNNING
| MigrationState.MIGRATION_PREPARING
):
process = "Migration"
case MigrationState.FINALIZATION_RUNNING:
process = "Finalization"
case (
MigrationState.MIGRATION_FAILED
| MigrationState.FINALIZATION_FAILED
):
match state:
case (
MigrationState.MIGRATION_RUNNING
| MigrationState.MIGRATION_PREPARING
):
process = "Migration"
case MigrationState.FINALIZATION_RUNNING:
process = "Finalization"
case (
MigrationState.MIGRATION_FAILED
| MigrationState.FINALIZATION_FAILED
):
raise MigrationException(
f"Migration in a failed state. Reset before trying to {command} again. Failed on: {MigrationHelper.migrate_thread_exception}"
)
case _:
process = ""
if process:
raise MigrationException(
f"Migration in a failed state. Reset before trying to {command} again. Failed on: {MigrationHelper.migrate_thread_exception}"
f"{process} is running, only 'stats' command is allowed."
)
case _:
process = ""
if process:
raise View400Exception(
f"{process} is running, only 'stats' command is allowed."
)

verbose = payload.get("verbose", False)
if command in iter(MigrationCommand):
MigrationHelper.migrate_thread_stream = StringIO()
MigrationHelper.migrate_thread = thread = Thread(
target=self.execute_migrate_command, args=[command, verbose]
)
thread.start()
thread.join(THREAD_WAIT_TIME)
if thread.is_alive():
# Read isolation would prevent seeing the newest status otherwise.
self.cursor.connection.commit()
# Migration still running. Report current progress and return
return {
"status": MigrationHelper.get_migration_state(self.cursor),
"output": MigrationHelper.read_stream(),
}
else:
# Migration already finished/had nothing to do
return self.get_migration_result()
MigrationHelper.migrate_thread_stream = StringIO()
MigrationHelper.migrate_thread = thread = Thread(
target=self.execute_migrate_command, args=[command, verbose]
)
thread.start()
thread.join(THREAD_WAIT_TIME)
if thread.is_alive():
# Read isolation would prevent seeing the newest status otherwise.
self.cursor.connection.commit()
# Migration still running. Report current progress and return
return {
"status": MigrationHelper.get_migration_state(self.cursor),
"output": MigrationHelper.read_stream(),
}
else:
raise View400Exception("Unknown command: " + command)
# Migration already finished/had nothing to do
return self.get_migration_result()

def execute_migrate_command(self, command: str, verbose: bool) -> None:
"""
Should be called as a new Thread.
Should only be called if migration is in a correct state. Error handling in this is minimalistic.
If an exception occurs during execution for the first time,
it is stored in the MigrationHelper.migrate_thread_exception for read with MigrationCommand.STATS.
"""
try:
with get_new_os_conn() as conn:
with conn.cursor() as curs:
if (
MigrationHelper.get_migration_state(curs)
in [
MigrationState.MIGRATION_RUNNING,
MigrationState.MIGRATION_PREPARING,
]
and command != MigrationCommand.RESET
):
raise MigrationException(
f"Cannot {command} when migration is running."
)
state = MigrationHelper.get_migration_state(curs)
if (
state == MigrationState.MIGRATION_REQUIRED
and command == MigrationCommand.FINALIZE
):
self.execute_migrate_command(MigrationCommand.MIGRATE, verbose)

with get_new_os_conn() as conn:
with conn.cursor() as curs:
self.handler = MigrationHandler(
curs, self.env, self.services, self.logging
)
Expand All @@ -255,16 +248,17 @@ def execute_migrate_command(self, command: str, verbose: bool) -> None:
MigrationHelper.migrate_thread_exception = e
self.logger.exception(e)
except Exception as e:
MigrationHelper.migrate_thread_exception = e
self.logger.exception(e)
# TODO catch this on a lower level and set it for specific faulty migration index
with get_new_os_conn() as conn:
with conn.cursor() as curs:
relevant_mis = MigrationHelper.get_unfinalized_indices(curs)
match command:
case MigrationCommand.MIGRATE:
state = MigrationState.MIGRATION_FAILED
case MigrationCommand.FINALIZE:
state = MigrationState.FINALIZATION_FAILED
for mi in relevant_mis:
MigrationHelper.set_database_migration_info(curs, mi, state)
self.logger.exception(e)
if not MigrationHelper.migrate_thread_exception:
MigrationHelper.migrate_thread_exception = e
with get_new_os_conn() as conn:
with conn.cursor() as curs:
relevant_mis = MigrationHelper.get_unfinalized_indices(curs)
match command:
case MigrationCommand.MIGRATE:
state = MigrationState.MIGRATION_FAILED
case MigrationCommand.FINALIZE:
state = MigrationState.FINALIZATION_FAILED
for mi in relevant_mis:
MigrationHelper.set_database_migration_info(curs, mi, state)
Loading