fill transaction meta on chunk store and simplify full outer join#355
fill transaction meta on chunk store and simplify full outer join#355
Conversation
05ed9cc to
32e4854
Compare
| sa.Column("process_ts", sa.Float), # Время последней успешной обработки | ||
| sa.Column("is_success", sa.Boolean), # Успешно ли обработана строка | ||
| sa.Column("priority", sa.Integer), # Приоритет обработки (чем больше, тем выше) | ||
| sa.Column("status", sa.String), # Статус исполнения трансформации |
There was a problem hiding this comment.
Возможно тогда is_success лишняя колонка
|
Пока из того что я вижу, тут получится full table scan + cross joins при записи: O(N × T × M) где T - кол-во трансформаций, M - размер связанных таблиц. Вот в чем суть проблемы: Я бы покрыл новую логику |
|
@halconel суть вот в чем. Если на вход трансформации приходит две или больше таблиц, то по transform_keys этот джойн все равно будет исполняться. Ну просто по логике работы трубы. Насколько я понимаю, этот джойн еще нужно делать вручную в коде трансформации, потому что на вход трансформации приходит несколько таблиц и они там внутри джойнятся. И это не перекладывание проблемы из одного места в другое, это просто то, что задается человеком в пайплайне. От этого никуда не уйти. На примере: если есть таблица картинок и таблица моделей классификации, которые надо прогнать по этим картинкам, то тут будет кросс джойн, просто из-за логики того, что хочет человек. Получается, что раньше в этом месте потенциально было две больших операции: кросс джойн из-за логики и большой full outer join, от которого хотим уйти. Теперь большой full outer join уходит, а кросс джойн переносится на этап записи данных в таблицу |
@swsvc Я все еще очень надеюсь, что это будет покрыто нагрузочными тестами и мы увидим, как изменилась производительность при записи. |
|
| assert len(list(tmp_dir.glob("tbl2/*.png"))) == 10 | ||
|
|
||
|
|
||
| @pytest.mark.skip(reason="impossible to trace changes when they happen externally") |
There was a problem hiding this comment.
Этот тест изменяет данные на диске вручную. Так как изменения в мету трансформаций прилетают только когда эти изменения отлавливаются (а здесь они не отлавливаются), то заскипал тест
| out.write('{"id": "2", "text": "text2"}\n') | ||
|
|
||
|
|
||
| @pytest.mark.skip(reason="impossible to trace changes when they happen externally") |
There was a problem hiding this comment.
Точто так же как в предыдущем тесте: файл меняется не через трубу а напрямую через file IO. Скипнул
tests/test_table_store_qdrant.py
Outdated
| yield pd.DataFrame({"id": [1], "embedding": [[0.1]], "str_payload": ["foo"], "int_payload": [42]}) | ||
|
|
||
|
|
||
| @pytest.mark.skip(reason="qdrant store cannot read all the rows from the index") |
There was a problem hiding this comment.
Для корректной работы нужно чтение всего индекса из квадранта, что он делать не умеет. Тест скипнул
There was a problem hiding this comment.
Вот это теперь снова работает
d7b59d9 to
aaa2756
Compare
datapipe/step/batch_transform.py
Outdated
| if not transform_meta_table_exists: | ||
| meta_index = extract_transformation_meta(self.input_dts, self.transform_keys) | ||
| if not meta_index.empty: | ||
| self.meta_table.insert_rows(meta_index) |
There was a problem hiding this comment.
Я вот здесь не уверен, надо ли это прямо так делать. Может быть лучше это контролируемо через отдельные менеджмент команды?
There was a problem hiding this comment.
я убрал в метод класса и напрямую вызываю в тестах. Потом если будет юзкейс, то станет понятно, как лучше сделать
2153276 to
565deb4
Compare
|
В этом коммите есть реализованный алгоритм записи меты для транзакций полностью на sql. Однако он используется только в одном случае: когда все таблицы, участвующие в трансформации, лежат в одной sql базе данных (postgres, sqlite). Если это не так, то джойн происходит в пандасе (потребляет много памяти, потому что выкачивает таблицы для джойна) |
e9e6b1a to
bd2dbfb
Compare
No description provided.