|
| 1 | +"""python-korea-payment legacy DB → backend DB 데이터 이관. |
| 2 | +
|
| 3 | +Cutover 시 `LEGACY_DATABASE_NAME` 환경변수가 설정되어 있을 때만 실행됩니다. |
| 4 | +미설정 시 no-op — 개발/테스트 환경 및 cutover 완료 후 재실행 모두 안전. |
| 5 | +같은 Postgres 인스턴스를 전제로 host/port/user/password 는 default DB 재사용 (backend user 에 legacy DB SELECT 권한 GRANT 필요). |
| 6 | +
|
| 7 | +이관 대상: |
| 8 | +- user_userext: legacy-only (shifted) 사용자 INSERT + 매칭된 사용자 unique_id 갱신 (QR 연속성). |
| 9 | +- socialaccount_socialapp: provider 설정 (github/google/kakao/naver client_id 등 — backend 가 빈 상태라 그대로 복사). |
| 10 | +- socialaccount_socialaccount, account_emailaddress: allauth 로그인 연속성 (kakao/google/naver 로 재로그인 시 동일 사용자로 매칭). |
| 11 | +- product_*: CategoryGroup → Category → Tag → Product → OptionGroup → Option → ProductTagRelation |
| 12 | +- order_*: Order → OrderProductRelation → OrderProductOptionRelation → SingleProductCart → CustomerInfo |
| 13 | +- payment_history_paymenthistory |
| 14 | +- *historical*: simple-history 보존 (admin audit trail) |
| 15 | +
|
| 16 | +이관 제외: |
| 17 | +- payment_payment (deprecated, 사용처 0) |
| 18 | +- user_userext_groups, user_userext_user_permissions (auth 정책 변경 — admin 재설정) |
| 19 | +- socialaccount_socialtoken (만료 토큰 — 다음 로그인 시 새 발급; legacy 측 0 rows) |
| 20 | +- account_emailconfirmation (만료 단발 토큰) |
| 21 | +- openid_openidnonce, openid_openidstore (legacy 측 0 rows) |
| 22 | +- auth_*, authtoken_*, django_*, usersessions_*: backend 기준으로 통일 |
| 23 | +
|
| 24 | +User 매핑 우선순위 (총 2,299명): |
| 25 | +- auto_email: email 정규화 (@pycon.kr → @python.or.kr) 매칭 |
| 26 | +- auto_username: 위 미매칭 + username 동일 (같은 사람이 다른 email 로 양쪽 가입한 케이스) |
| 27 | +- manual: hardcoded — darjeeling@gmail.com (legacy id 5, 1135) → backend darjeeling@python.or.kr (id 5) |
| 28 | +- shifted: 모두 미매칭 → legacy.id + USER_ID_OFFSET (backend max id 와 충돌 없는 여유 공간) |
| 29 | +""" |
| 30 | + |
| 31 | +from enum import StrEnum |
| 32 | + |
| 33 | +from django.db import connections, migrations, transaction |
| 34 | + |
| 35 | +EMAIL_REWRITE_OLD = "@pycon.kr" |
| 36 | +EMAIL_REWRITE_NEW = "@python.or.kr" |
| 37 | +USER_ID_OFFSET = 175 |
| 38 | +MANUAL_USER_MAPPING: dict[int, int] = {5: 5, 1135: 5} |
| 39 | +BATCH_SIZE = 1000 |
| 40 | + |
| 41 | + |
| 42 | +class _Source(StrEnum): |
| 43 | + AUTO_EMAIL = "auto_email" |
| 44 | + AUTO_USERNAME = "auto_username" |
| 45 | + MANUAL = "manual" |
| 46 | + SHIFTED = "shifted" |
| 47 | + |
| 48 | + |
| 49 | +_BASE_USER_FK = frozenset({"created_by_id", "updated_by_id", "deleted_by_id"}) |
| 50 | +_HISTORY_USER_FK = _BASE_USER_FK | {"history_user_id"} |
| 51 | + |
| 52 | +# Topological INSERT 순서 (FK 의존성). user_fk_cols 의 컬럼 값은 user_id_map 으로 변환됨. |
| 53 | +TABLES_TO_COPY: list[tuple[str, frozenset[str]]] = [ |
| 54 | + # allauth — 로그인 연속성 + provider 설정 |
| 55 | + ("socialaccount_socialapp", frozenset()), # provider config (github/google/kakao/naver, FK 없음) |
| 56 | + ("socialaccount_socialaccount", frozenset({"user_id"})), |
| 57 | + ("account_emailaddress", frozenset({"user_id"})), |
| 58 | + # shop product |
| 59 | + ("product_categorygroup", _BASE_USER_FK), |
| 60 | + ("product_category", _BASE_USER_FK), |
| 61 | + ("product_tag", _BASE_USER_FK), |
| 62 | + ("product_product", _BASE_USER_FK), |
| 63 | + ("product_optiongroup", _BASE_USER_FK), |
| 64 | + ("product_option", _BASE_USER_FK), |
| 65 | + ("product_producttagrelation", _BASE_USER_FK), |
| 66 | + ("order_order", _BASE_USER_FK | {"user_id"}), |
| 67 | + ("order_orderproductrelation", _BASE_USER_FK), |
| 68 | + ("order_singleproductcart", _BASE_USER_FK | {"user_id"}), |
| 69 | + ("order_orderproductoptionrelation", _BASE_USER_FK), |
| 70 | + ("order_customerinfo", _BASE_USER_FK), |
| 71 | + ("payment_history_paymenthistory", _BASE_USER_FK), |
| 72 | + # historical_* 는 FK 제약 없음 — 순서 임의. 가독성 위해 위와 동일 순서. |
| 73 | + ("product_historicalcategorygroup", _HISTORY_USER_FK), |
| 74 | + ("product_historicalcategory", _HISTORY_USER_FK), |
| 75 | + ("product_historicaltag", _HISTORY_USER_FK), |
| 76 | + ("product_historicalproduct", _HISTORY_USER_FK), |
| 77 | + ("product_historicaloptiongroup", _HISTORY_USER_FK), |
| 78 | + ("product_historicaloption", _HISTORY_USER_FK), |
| 79 | + ("product_historicalproducttagrelation", _HISTORY_USER_FK), |
| 80 | + ("order_historicalorder", _HISTORY_USER_FK | {"user_id"}), |
| 81 | + ("order_historicalorderproductrelation", _HISTORY_USER_FK), |
| 82 | + ("order_historicalsingleproductcart", _HISTORY_USER_FK | {"user_id"}), |
| 83 | + ("order_historicalorderproductoptionrelation", _HISTORY_USER_FK), |
| 84 | + ("order_historicalcustomerinfo", _HISTORY_USER_FK), |
| 85 | +] |
| 86 | + |
| 87 | + |
| 88 | +def _normalize_email(email: str | None) -> str | None: |
| 89 | + if not email: |
| 90 | + return email |
| 91 | + lower = email.lower() |
| 92 | + return lower.removesuffix(EMAIL_REWRITE_OLD) + EMAIL_REWRITE_NEW if lower.endswith(EMAIL_REWRITE_OLD) else lower |
| 93 | + |
| 94 | + |
| 95 | +def _build_user_id_map(target_cur, legacy_cur) -> dict[int, tuple[int, _Source]]: |
| 96 | + """legacy.user_userext.id → (target.id, source) 매핑 구성.""" |
| 97 | + target_cur.execute("SELECT id, email, username FROM public.user_userext") |
| 98 | + backend_rows = target_cur.fetchall() |
| 99 | + backend_by_email = {_normalize_email(email): pk for pk, email, _ in backend_rows} |
| 100 | + backend_by_username = {username: pk for pk, _, username in backend_rows} |
| 101 | + |
| 102 | + legacy_cur.execute("SELECT id, email, username FROM public.user_userext") |
| 103 | + mapping: dict[int, tuple[int, _Source]] = {} |
| 104 | + username_matches: list[tuple[int, str, int]] = [] |
| 105 | + for legacy_id, email, username in legacy_cur.fetchall(): |
| 106 | + if (backend_id := backend_by_email.get(_normalize_email(email))) is not None: |
| 107 | + mapping[legacy_id] = (backend_id, _Source.AUTO_EMAIL) |
| 108 | + elif (backend_id := backend_by_username.get(username)) is not None: |
| 109 | + mapping[legacy_id] = (backend_id, _Source.AUTO_USERNAME) |
| 110 | + username_matches.append((legacy_id, username, backend_id)) |
| 111 | + elif legacy_id in MANUAL_USER_MAPPING: |
| 112 | + mapping[legacy_id] = (MANUAL_USER_MAPPING[legacy_id], _Source.MANUAL) |
| 113 | + else: |
| 114 | + mapping[legacy_id] = (legacy_id + USER_ID_OFFSET, _Source.SHIFTED) |
| 115 | + |
| 116 | + counts: dict[str, int] = {s.value: 0 for s in _Source} |
| 117 | + for _, src in mapping.values(): |
| 118 | + counts[src] += 1 |
| 119 | + print(f"[migrate_legacy] user_id_map: total={len(mapping)}, {counts}") |
| 120 | + # username-only 매칭은 동일인일 확률이 높지만 false positive 가능 — 운영자 검토용 로그. |
| 121 | + if username_matches: |
| 122 | + print(f"[migrate_legacy] username-only matches ({len(username_matches)}건, 검토 권장):") |
| 123 | + for lid, username, bid in username_matches: |
| 124 | + print(f" legacy.id={lid} username={username!r} → backend.id={bid}") |
| 125 | + return mapping |
| 126 | + |
| 127 | + |
| 128 | +def _copy_shifted_users(target_cur, legacy_cur, user_id_map: dict[int, tuple[int, _Source]]) -> None: |
| 129 | + shifted_ids = [lid for lid, (_, src) in user_id_map.items() if src == _Source.SHIFTED] |
| 130 | + if not shifted_ids: |
| 131 | + return |
| 132 | + legacy_cur.execute( |
| 133 | + """ |
| 134 | + SELECT id, password, last_login, is_superuser, username, |
| 135 | + first_name, last_name, email, is_staff, is_active, date_joined, unique_id |
| 136 | + FROM public.user_userext WHERE id = ANY(%s) ORDER BY id |
| 137 | + """, |
| 138 | + [shifted_ids], |
| 139 | + ) |
| 140 | + # legacy 에는 nickname 컬럼이 없음 — username 으로 ko/en 기본값 채우기 (master nickname 은 None). |
| 141 | + # image_id 도 legacy 부재. |
| 142 | + rows = [ |
| 143 | + (user_id_map[lid][0], pw, llg, sup, uname, fn, ln, em, stf, act, dj, None, uname, uname, None, uniq) |
| 144 | + for lid, pw, llg, sup, uname, fn, ln, em, stf, act, dj, uniq in legacy_cur.fetchall() |
| 145 | + ] |
| 146 | + target_cur.executemany( |
| 147 | + """ |
| 148 | + INSERT INTO public.user_userext ( |
| 149 | + id, password, last_login, is_superuser, username, |
| 150 | + first_name, last_name, email, is_staff, is_active, date_joined, |
| 151 | + nickname, nickname_en, nickname_ko, image_id, unique_id |
| 152 | + ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) |
| 153 | + """, |
| 154 | + rows, |
| 155 | + ) |
| 156 | + |
| 157 | + |
| 158 | +def _update_matched_unique_id(target_cur, legacy_cur, user_id_map: dict[int, tuple[int, _Source]]) -> None: |
| 159 | + """auto/manual 매칭된 사용자의 unique_id 를 legacy 값으로 덮어쓴다 — payment 시절 발급된 QR/토큰 연속성 유지.""" |
| 160 | + matched = [(lid, bid) for lid, (bid, src) in user_id_map.items() if src != _Source.SHIFTED] |
| 161 | + if not matched: |
| 162 | + return |
| 163 | + legacy_cur.execute( |
| 164 | + "SELECT id, unique_id FROM public.user_userext WHERE id = ANY(%s)", |
| 165 | + [[lid for lid, _ in matched]], |
| 166 | + ) |
| 167 | + legacy_unique = dict(legacy_cur.fetchall()) |
| 168 | + updates = [(legacy_unique[lid], bid) for lid, bid in matched if lid in legacy_unique] |
| 169 | + target_cur.executemany("UPDATE public.user_userext SET unique_id = %s WHERE id = %s", updates) |
| 170 | + |
| 171 | + |
| 172 | +def _get_columns(cur, table: str) -> list[str]: |
| 173 | + cur.execute( |
| 174 | + """ |
| 175 | + SELECT column_name FROM information_schema.columns |
| 176 | + WHERE table_schema = 'public' AND table_name = %s |
| 177 | + ORDER BY ordinal_position |
| 178 | + """, |
| 179 | + [table], |
| 180 | + ) |
| 181 | + return [r[0] for r in cur.fetchall()] |
| 182 | + |
| 183 | + |
| 184 | +def _copy_account_emailaddress(target_cur, legacy_cur, user_id_map: dict[int, tuple[int, _Source]]) -> None: |
| 185 | + """account_emailaddress 전용 — `(user_id, primary=true)` 부분 unique index 충돌 회피. |
| 186 | +
|
| 187 | + 같은 backend user 로 매핑된 여러 legacy email 이 모두 primary 인 경우 (merge 케이스), |
| 188 | + 첫 항목만 primary 유지, 나머지는 primary=false 로 demote. |
| 189 | + """ |
| 190 | + legacy_cur.execute( |
| 191 | + 'SELECT id, email, verified, "primary", user_id FROM public.account_emailaddress ' |
| 192 | + 'ORDER BY user_id, "primary" DESC, id' |
| 193 | + ) |
| 194 | + seen_primary: set[int] = set() |
| 195 | + rows = [] |
| 196 | + for row_id, email, verified, is_primary, legacy_uid in legacy_cur.fetchall(): |
| 197 | + backend_uid = user_id_map[legacy_uid][0] |
| 198 | + if is_primary and backend_uid in seen_primary: |
| 199 | + is_primary = False # 같은 backend user 의 두 번째 primary 는 demote |
| 200 | + elif is_primary: |
| 201 | + seen_primary.add(backend_uid) |
| 202 | + rows.append((row_id, email, verified, is_primary, backend_uid)) |
| 203 | + target_cur.executemany( |
| 204 | + 'INSERT INTO public.account_emailaddress (id, email, verified, "primary", user_id) VALUES (%s, %s, %s, %s, %s)', |
| 205 | + rows, |
| 206 | + ) |
| 207 | + print(f"[migrate_legacy] account_emailaddress: copied {len(rows)} rows") |
| 208 | + |
| 209 | + |
| 210 | +def _copy_table( |
| 211 | + target_cur, legacy_cur, table: str, user_fk_cols: frozenset[str], user_id_map: dict[int, tuple[int, _Source]] |
| 212 | +) -> None: |
| 213 | + legacy_cols = _get_columns(legacy_cur, table) |
| 214 | + target_cols = set(_get_columns(target_cur, table)) |
| 215 | + if not legacy_cols or not target_cols: |
| 216 | + raise RuntimeError( |
| 217 | + f"Table {table} missing in legacy ({len(legacy_cols)} cols) or target ({len(target_cols)} cols)" |
| 218 | + ) |
| 219 | + # legacy ∩ target 컬럼만 (스키마 drift 방어). 순서는 legacy 기준. |
| 220 | + cols = [c for c in legacy_cols if c in target_cols] |
| 221 | + col_list = ", ".join(f'"{c}"' for c in cols) |
| 222 | + placeholders = ", ".join(["%s"] * len(cols)) |
| 223 | + fk_indices = [i for i, c in enumerate(cols) if c in user_fk_cols] |
| 224 | + |
| 225 | + # nosec: B608 — TABLES_TO_COPY 화이트리스트 + information_schema 컬럼명, 사용자 입력 없음 |
| 226 | + select_sql = f"SELECT {col_list} FROM public.{table}" # nosec: B608 |
| 227 | + insert_sql = f"INSERT INTO public.{table} ({col_list}) VALUES ({placeholders})" # nosec: B608 |
| 228 | + legacy_cur.execute(select_sql) |
| 229 | + total = 0 |
| 230 | + while batch := legacy_cur.fetchmany(BATCH_SIZE): |
| 231 | + translated = [] |
| 232 | + for row in batch: |
| 233 | + row = list(row) |
| 234 | + # mapping 누락 시 그대로 둬서 FK 위반으로 detect — 모든 user 가 mapping 에 포함되어야 정상. |
| 235 | + for idx in fk_indices: |
| 236 | + if row[idx] is not None and row[idx] in user_id_map: |
| 237 | + row[idx] = user_id_map[row[idx]][0] |
| 238 | + translated.append(tuple(row)) |
| 239 | + target_cur.executemany(insert_sql, translated) |
| 240 | + total += len(translated) |
| 241 | + print(f"[migrate_legacy] {table}: copied {total} rows") |
| 242 | + |
| 243 | + |
| 244 | +def _reset_sequences(target_cur) -> None: |
| 245 | + """수동 INSERT 후 IDENTITY/SEQUENCE 컬럼을 max+1 로 동기화 — 다음 INSERT 충돌 방지.""" |
| 246 | + targets = [ |
| 247 | + ("user_userext", "id"), |
| 248 | + ("socialaccount_socialapp", "id"), |
| 249 | + ("socialaccount_socialaccount", "id"), |
| 250 | + ("account_emailaddress", "id"), |
| 251 | + *((table, "history_id") for table, _ in TABLES_TO_COPY if "historical" in table), |
| 252 | + ] |
| 253 | + for table, pk_col in targets: |
| 254 | + # hardcoded 테이블/컬럼명, 사용자 입력 없음 |
| 255 | + seq_expr = f"pg_get_serial_sequence('public.{table}', '{pk_col}')" # nosec: B608 |
| 256 | + max_expr = f"(SELECT MAX({pk_col}) FROM public.{table})" # nosec: B608 |
| 257 | + target_cur.execute(f"SELECT setval({seq_expr}, COALESCE({max_expr}, 1), true)") # nosec: B608 |
| 258 | + |
| 259 | + |
| 260 | +def _verify(target_cur, legacy_cur) -> None: |
| 261 | + """legacy 와 target 의 row count 비교.""" |
| 262 | + mismatches = [] |
| 263 | + for table, _ in TABLES_TO_COPY: |
| 264 | + # nosec: B608 — TABLES_TO_COPY 는 화이트리스트 |
| 265 | + legacy_cur.execute(f"SELECT COUNT(*) FROM public.{table}") # nosec: B608 |
| 266 | + legacy_count = legacy_cur.fetchone()[0] |
| 267 | + target_cur.execute(f"SELECT COUNT(*) FROM public.{table}") # nosec: B608 |
| 268 | + target_count = target_cur.fetchone()[0] |
| 269 | + if legacy_count != target_count: |
| 270 | + mismatches.append(f"{table}: legacy={legacy_count}, target={target_count}") |
| 271 | + if mismatches: |
| 272 | + raise RuntimeError("Row count mismatch:\n " + "\n ".join(mismatches)) |
| 273 | + |
| 274 | + |
| 275 | +def migrate_data(apps, schema_editor): |
| 276 | + if "legacy" not in connections.databases: |
| 277 | + return # 개발/테스트 환경 또는 cutover 완료 후 — no-op. |
| 278 | + |
| 279 | + # 중간 실패 시 target DB 의 모든 변경을 함께 롤백 (legacy DB 는 SELECT 만 — 롤백 불필요). |
| 280 | + with ( |
| 281 | + transaction.atomic(using="default"), |
| 282 | + connections["legacy"].cursor() as legacy_cur, |
| 283 | + connections["default"].cursor() as target_cur, |
| 284 | + ): |
| 285 | + user_id_map = _build_user_id_map(target_cur, legacy_cur) |
| 286 | + _copy_shifted_users(target_cur, legacy_cur, user_id_map) |
| 287 | + _update_matched_unique_id(target_cur, legacy_cur, user_id_map) |
| 288 | + |
| 289 | + for table, user_fk_cols in TABLES_TO_COPY: |
| 290 | + if table == "account_emailaddress": |
| 291 | + _copy_account_emailaddress(target_cur, legacy_cur, user_id_map) |
| 292 | + else: |
| 293 | + _copy_table(target_cur, legacy_cur, table, user_fk_cols, user_id_map) |
| 294 | + |
| 295 | + _reset_sequences(target_cur) |
| 296 | + _verify(target_cur, legacy_cur) |
| 297 | + |
| 298 | + |
| 299 | +class Migration(migrations.Migration): |
| 300 | + atomic = True |
| 301 | + dependencies = [ |
| 302 | + ("user", "0009_alter_historicaluserext_options_and_more"), |
| 303 | + ("order", "0001_initial"), |
| 304 | + ("product", "0001_initial"), |
| 305 | + ("payment_history", "0001_initial"), |
| 306 | + # allauth — socialaccount/account 테이블 선행 생성 |
| 307 | + ("socialaccount", "0006_alter_socialaccount_extra_data"), |
| 308 | + ("account", "0009_emailaddress_unique_primary_email"), |
| 309 | + ] |
| 310 | + operations = [migrations.RunPython(migrate_data, reverse_code=migrations.RunPython.noop)] |
0 commit comments