Skip to content

Otimiza task_migrate_and_publish_articles e tarefas relacionadas#948

Open
Copilot wants to merge 5 commits into
mainfrom
copilot/optimize-migrate-and-publish-articles
Open

Otimiza task_migrate_and_publish_articles e tarefas relacionadas#948
Copilot wants to merge 5 commits into
mainfrom
copilot/optimize-migrate-and-publish-articles

Conversation

Copy link
Copy Markdown
Contributor

Copilot AI commented Apr 29, 2026

O que esse PR faz?

Aplica otimizações de memória, CPU e I/O na cadeia de tarefas task_migrate_and_publish_articles → _by_journal → _by_issue → task_sync_issue → task_publish_article, focando nos itens de maior impacto previamente identificados em uma análise por criticidade.

Mudanças principais:

  • 🔴 Cache em-processo para get_api_data em publication/api/publication.py. Cada chamada antes instanciava PublicationAPI(...) cujo __init__ dispara um HTTP login (get_token); o cache reduz dezenas de logins por job a um por (collection, content_type, website_kind) por worker. Estratégia alinhada ao objetivo de pedir token apenas 1× e revalidar somente quando expirar:
    • TTL=3600s (1h), alinhado ao lifetime típico de JWT, apenas como teto de segurança para não reter tokens indefinidamente em workers de longa duração;
    • invalidate_api_data_cache(collection, content_type, website_kind) para invalidação explícita por chave — consumidores que detectarem falha de autenticação no resultado de PublicationAPI.post_data podem purgar a entrada e forçar novo login na próxima chamada;
    • PublicationAPI.post_data já refaz get_token() automaticamente na própria instância em caso de falha, então um token expirado entre a leitura do cache e o uso resulta no máximo em 1 retry interno;
    • retorna copy.deepcopy para impedir envenenamento por mutação dos chamadores (ex.: api_data["verify"] = verify em task_publish_articles); respostas de erro não são cacheadas; helper clear_api_data_cache() exposto para testes/admin.
  • 🔴 Remoção de qa_api_data/public_api_data propagados de _by_journal para _by_issue — eram dead-code (parâmetros aceitos mas nunca lidos no corpo) e infláveis no payload Celery (token + credenciais). Compatibilidade com mensagens já enfileiradas via **legacy_kwargs, que descarta silenciosamente as chaves legacy conhecidas e loga warning para chaves desconhecidas.
  • 🟠 .iterator(chunk_size=100) no loop de migração de artigos em _by_issue + select_related("issue_proc","issue_proc__journal_proc","collection","sps_pkg") para reduzir RAM e queries N+1 (a @property journal_proc em ArticleProc resolve via issue_proc.journal_proc, e collection é FK direto em BaseProc).
  • 🟠 selected_article_proc_items em _by_journal agora usa .iterator(chunk_size=1000).
  • 🟠 Remoção de count() redundantes em task_sync_issue (havia count() extra além da iteração).
  • Remove select_related que era ignorado por values_list em task_sync_issue (silently ineficaz).
  • Adiciona testes unitários em publication/api/test_publication.py cobrindo: hit por chave, isolamento entre chaves, imunidade a mutação aninhada (deepcopy), não-cache de erros, helper clear_api_data_cache e invalidação por chave via invalidate_api_data_cache.

Redução de uso de memória (auditoria adicional, commit c0fb7a2):

  • task_sync_issue — substituído list(qs.values_list("id", flat=True)) (que materializava todos os IDs de artigos do issue em RAM — milhares em issues grandes) por .count() para total_to_process + .iterator(chunk_size=500) no loop de publicação. Footprint constante independente do tamanho do issue.
  • task_migrate_and_publish_articles_by_journal — o dict issue_proc_and_related_article_proc_id_list (mapa {issue_id: [article_ids...]}) era mantido inteiro em RAM até o fim do for ... .items(). Agora while ... popitem() libera cada lista de IDs assim que a tarefa Celery é despachada — pico de memória cai conforme o loop avança.
  • task_migrate_and_publish_articles (top-level) — mesmo padrão popitem() para items_to_process (mapa {journal_id: [issue_ids...]} para coleções inteiras), evitando manter o dict completo em RAM durante o despacho.
  • Top-level: .iterator(chunk_size=1000) adicionado nos values_list que alimentam items_to_process (issues e journals da coleção), evitando carregar todas as linhas do queryset de uma vez.

task_sync_issue continua sendo despachada incondicionalmente para QA e PUBLIC após a migração — por feedback da revisão, ela precisa rodar sempre para garantir que não exista duplicidade no site, condição que não pode ser determinada antecipadamente. A query/contagem total_articles_to_publish que existia em _by_issue apenas para condicionar o despacho foi removida.

Itens deixados para PR separado por requererem mudanças maiores de fluxo: 🔴 #3 (skip de task_sync_issue quando vazio — descartado: precisa rodar para deduplicação no site), 🔴 #4 (loop síncrono em task_sync_issue — requer redesign com rate-limit/group), 🟠 #7 (dedup síncrono executado sempre), 🟠 #8 (dupla seleção entre _by_journal e _by_issue), e itens 🟢/🟡 de limpeza.

Onde a revisão poderia começar?

  • publication/api/publication.py — adição do cache (_API_DATA_CACHE, get_api_data, clear_api_data_cache, invalidate_api_data_cache).
  • proc/tasks.py, task_migrate_and_publish_articles (top-level, ~linhas 830–880) para popitem() + .iterator() nos querysets que alimentam items_to_process.
  • proc/tasks.py, task_migrate_and_publish_articles_by_journal (~linhas 900–1010) e task_migrate_and_publish_articles_by_issue (~linhas 1015–1135), onde concentram-se a remoção dos kwargs de api_data, o iterator()/select_related e o drain via popitem() do dict de despacho.
  • proc/tasks.py, task_sync_issue (~linhas 1160–1220) para o uso de .count() + .iterator(chunk_size=500) em vez da lista materializada de IDs.
  • publication/api/test_publication.py — novos testes do cache.

Como este poderia ser testado manualmente?

  1. Subir um worker Celery em ambiente de QA e disparar task_migrate_and_publish_articles para uma coleção com vários issues a processar.
  2. Validar 🔴 Adiciona templates para preencher issues e PR #2: instrumentar (ou observar logs) PublicationAPI.get_token — a contagem por worker deve cair drasticamente (ordem de magnitude) em relação ao baseline; em até 1h de execução de um worker, deve haver no máximo 1 login por (collection, content_type, website_kind) (ou novo login após invalidate_api_data_cache).
  3. Validar invalidação: chamar invalidate_api_data_cache(collection, "issue", "QA") e confirmar que a próxima chamada de get_api_data(collection, "issue", "QA") aciona um novo get_token.
  4. Validar 🔴 Adiciona templates para preencher issues e PR #2 (compat): reenviar uma mensagem antiga (com qa_api_data/public_api_data no kwargs) — _by_issue deve processar normalmente, sem warning. Enviar uma com kwarg fictício (ex.: foo=1) — deve aparecer warning task_migrate_and_publish_articles_by_issue: ignoring unknown kwargs ['foo'].
  5. Validar 🟠 Criar um modelo OfficialJournal #5/Crear un modelo en la carpeta journals para ingresar títulos no oficiales de las revistas #6: monitorar o uso de RAM do worker em issues com muitos artigos — deve manter-se estável (chunks de 100); número de queries por issue deve diminuir (sem N+1 em journal_proc/collection/sps_pkg, sem count() extras).
  6. Validar redução de memória (auditoria adicional): para uma coleção/journal com muitos issues e muitos artigos por issue, monitorar RSS do worker durante o despacho — pico em task_migrate_and_publish_articles e _by_journal deve decrescer ao longo do loop (drain via popitem()); em task_sync_issue, RSS deve manter-se constante mesmo para issues com milhares de artigos (sem materialização da lista de IDs).
  7. Validar comportamento de deduplicação: para issues processadas com 0 artigos novos a publicar, task_sync_issue ainda deve ser enfileirada 2× (QA + PUBLIC) — verificar nos logs/eventos do TaskExecution.
  8. Rodar os testes unitários: pytest publication/api/test_publication.py proc/tests.py (5 casos do cache + demais casos passando).

Algum cenário de contexto que queira dar?

A análise prévia apontava 4 itens 🔴 críticos e 4 🟠 altos. Este PR endereça os de maior ROI e menor risco de mudança comportamental:

# Crit. Tema Status
1 🔴 Listas de IDs gigantes propagadas via Celery ✅ Endereçado (drain via popitem() + .iterator() nos querysets que alimentam o dict top-level)
2 🔴 Login HTTP repetido + tokens em mensagens ✅ Endereçado (cache TTL 1h + invalidação por chave + remoção dos kwargs mortos)
3 🔴 task_sync_issue despachada com 0 artigos Descartado — sync precisa rodar sempre (deduplicação no site)
4 🔴 Loop síncrono em task_sync_issue Adiado (requer redesign)
5 🟠 Falta iterator/select_related na migração ✅ Endereçado
6 🟠 count() redundantes ✅ Endereçado (no task_sync_issue)
7 🟠 Dedup síncrono executado sempre Adiado
8 🟠 select_items duplicado entre níveis Adiado

Sobre a estratégia do cache: o objetivo original de qa_api_data/public_api_data era pedir o token 1× e renovar somente quando expirasse. A combinação adotada preserva esse objetivo sem trafegar token por mensagens Celery — TTL longo (1h) como teto + invalidate_api_data_cache para invalidação dirigida quando o consumidor detectar falha de autenticação + retry interno automático em PublicationAPI.post_data. Resultado: 1 login por chave por worker, com renovação reativa (não periódica) quando o token efetivamente falha.

Sobre a redução adicional de memória (auditoria solicitada na revisão): o pico de RAM dos workers crescia proporcionalmente a (a) número total de IDs de artigos a publicar por issue em task_sync_issue (materializados em lista) e (b) tamanho dos dicts de despacho em _by_journal e no top-level (mantidos inteiros até o fim do loop). Substituindo list(...) por .count() + .iterator() e o for ... .items() por while ... popitem(), o footprint passa a ser constante (cache de página do queryset + 1 chave do dict por iteração), eliminando o crescimento linear no tamanho dos dados de entrada.

A combinação de cache + remoção dos kwargs qa_api_data/public_api_data resolve dois problemas em um: corta o número de logins HTTP por job e ainda reduz substancialmente o tamanho das mensagens Celery enviadas ao broker (token + credenciais não trafegam mais por mensagem).

Antes (problemático):

# proc/tasks.py — _by_journal
qa_api_data = get_api_data(journal_proc.collection, "issue", "QA")        # HTTP login
public_api_data = get_api_data(journal_proc.collection, "issue", "PUBLIC") # HTTP login
...
task_migrate_and_publish_articles_by_issue.delay(
    ...,
    qa_api_data=qa_api_data,    # token + credenciais no broker
    public_api_data=public_api_data,  # idem, e nunca consumido em _by_issue
)

Depois: as duas chamadas e os dois kwargs foram removidos; task_sync_issue (que efetivamente precisa de api_data) obtém via get_api_data cacheado.

Screenshots

N/A.

Quais são tickets relevantes?

Issue de otimização da tarefa migrate_and_publish_articles referenciada na descrição do problema.

Referências

  • proc/tasks.py — definição das 4 tarefas da cadeia.
  • publication/api/publication.pyPublicationAPI.get_token (custo HTTP de get_api_data), PublicationAPI.post_data (retry interno em falha de auth) e novo cache _API_DATA_CACHE + invalidate_api_data_cache.
  • proc/models.pyArticleProc.journal_proc (property que resolve via issue_proc) e BaseProc.collection (FK direto), guiando o select_related aplicado.
  • bigbang/tasks_scheduler.py — periodicidade do disparo (ARTICLE_DB_MIGRATION_MINUTES).

Copilot AI changed the title [WIP] Optimize migrate_and_publish_articles task for resource efficiency Análise de criticidade dos gargalos em migrate_and_publish_articles Apr 29, 2026
Copilot AI requested a review from robertatakenaka April 29, 2026 11:46
@robertatakenaka robertatakenaka marked this pull request as ready for review April 29, 2026 17:12
Copilot AI review requested due to automatic review settings April 29, 2026 17:12
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot wasn't able to review any files in this pull request.

@robertatakenaka
Copy link
Copy Markdown
Member

@copilot o PR está incompleto, não há arquivos modificados, por favor execute novamente

Copy link
Copy Markdown
Contributor Author

Copilot AI commented Apr 29, 2026

@copilot o PR está incompleto, não há arquivos modificados, por favor execute novamente

Implementadas as otimizações de maior impacto no commit b163304: cache em-processo (TTL 10 min) para get_api_data evitando login HTTP a cada chamada; remoção de qa_api_data/public_api_data que eram dead-code propagados via Celery (compatibilidade via **legacy_kwargs); skip de task_sync_issue quando não há artigos a publicar; .iterator(chunk_size=…) + select_related extra no loop de migração; e materialização única dos IDs eliminando count() redundantes. Adicionei publication/api/test_publication.py com 4 testes do cache (todos passando junto com proc/tests.py). Itens 🔴 #1 e #4 foram deixados para PR separado por exigirem mudanças maiores de fluxo.

Copilot AI changed the title Análise de criticidade dos gargalos em migrate_and_publish_articles Otimiza task_migrate_and_publish_articles e tarefas relacionadas Apr 29, 2026
Comment thread proc/tasks.py
sps_pkg__pid_v3__isnull=False,
).values_list("id", flat=True)
total_articles_to_publish = article_ids_to_publish.count()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot remove identificação de que há artigos para publicar e considere que o sync_issue tem que ser executado independentemente se há ou não artigos para publicar pois ele precisa ser exeuctado para garantir que não exista duplicidade no site. E isso não é possível identifcar previamente

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removida a verificação de total_articles_to_publish e a query associada no commit e75d39b. task_sync_issue volta a ser despachada incondicionalmente para QA e PUBLIC, preservando a garantia de deduplicação no site.

Comment thread proc/tasks.py
force_update=force_import_acron_id_file,
)

qa_api_data = get_api_data(journal_proc.collection, "issue", "QA")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot o objetivo de criar qa_api_data e public_api_data é para que o token seja solicitado apenas 1 vezes no início, depois somente quando expirar. Qual uma boa alternativa para lidar com este propósito?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bom ponto — o TTL=600s da primeira versão forçava re-login a cada 10 min mesmo com token válido, contrariando o objetivo. Ajustes no commit 1db793c em publication/api/publication.py:

  • TTL elevado para 3600s (1h), alinhado ao lifetime típico de JWT — apenas como teto de segurança para não reter tokens indefinidamente em workers de longa duração.
  • Novo helper invalidate_api_data_cache(collection, content_type, website_kind): o consumidor que detectar falha de autenticação no resultado de PublicationAPI.post_data pode purgar a entrada e forçar novo login na próxima chamada.
  • PublicationAPI.post_data já refaz get_token() automaticamente na própria instância em caso de falha (linha 113 do publication.py original) — então um token expirado entre a leitura do cache e o uso resulta no máximo em 1 retry interno, não em erro propagado.

Resultado: 1 login por (collection, content_type, website_kind) por worker até o token efetivamente expirar (detectado via invalidate_api_data_cache) ou até atingir 1h. Se preferir TTL maior/menor ou parametrizá-lo via Django setting, posso ajustar.

@robertatakenaka
Copy link
Copy Markdown
Member

@copilot verifique e corrija uso excessivo de memória

…ync_issue

Agent-Logs-Url: https://github.com/scieloorg/scms-upload/sessions/a51ed25f-46df-4709-a6af-ad635f11f563

Co-authored-by: robertatakenaka <505143+robertatakenaka@users.noreply.github.com>
Copy link
Copy Markdown
Contributor Author

Copilot AI commented Apr 29, 2026

@copilot verifique e corrija uso excessivo de memória

Auditei a cadeia inteira procurando os hotspots de memória e apliquei 3 correções no commit c0fb7a2 (proc/tasks.py):

1. task_sync_issue — maior ganho. Antes list(qs.values_list("id", flat=True)) materializava todos os IDs de artigos do issue em RAM (issues grandes podem ter milhares). Agora: 1 .count() para total_to_process + .iterator(chunk_size=500) no loop de publicação. Footprint constante independente do tamanho do issue.

2. _by_journal — drena dict ao despachar. O issue_proc_and_related_article_proc_id_list (mapa {issue_id: [article_ids...]}) era mantido inteiro em RAM até o fim do for ... .items(). Agora while ... popitem() libera cada lista de IDs assim que a tarefa Celery é despachada — pico de memória cai conforme o loop avança.

3. task_migrate_and_publish_articles (top-level) — mesmo padrão. O items_to_process (mapa {journal_id: [issue_ids...]} para coleções inteiras) também é drenado via popitem(). Adicional: os values_list que o alimentam agora usam .iterator(chunk_size=1000) em vez de carregar todas as linhas de uma vez.

Os 5 testes de publication/api/test_publication.py continuam passando. Os iterators existentes em _by_issue (.iterator(chunk_size=100) no loop de migração) e em _by_journal (.iterator(chunk_size=1000) em selected_article_proc_items) já estavam aplicados desde o commit b163304. Mantive copy.deepcopy no cache de api_data (custo desprezível para dict plano de ~7 chaves) para preservar a garantia de isolamento testada.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Otimize a tarefa de migrate_and_publish_articles e as tarefas relacionadas

3 participants