Otimiza task_migrate_and_publish_articles e tarefas relacionadas#948
Otimiza task_migrate_and_publish_articles e tarefas relacionadas#948Copilot wants to merge 5 commits into
Conversation
|
@copilot o PR está incompleto, não há arquivos modificados, por favor execute novamente |
Agent-Logs-Url: https://github.com/scieloorg/scms-upload/sessions/8d968fb4-3dcb-4b3b-b00c-df7816ab481a Co-authored-by: robertatakenaka <505143+robertatakenaka@users.noreply.github.com>
Implementadas as otimizações de maior impacto no commit b163304: cache em-processo (TTL 10 min) para |
| sps_pkg__pid_v3__isnull=False, | ||
| ).values_list("id", flat=True) | ||
| total_articles_to_publish = article_ids_to_publish.count() | ||
|
|
There was a problem hiding this comment.
@copilot remove identificação de que há artigos para publicar e considere que o sync_issue tem que ser executado independentemente se há ou não artigos para publicar pois ele precisa ser exeuctado para garantir que não exista duplicidade no site. E isso não é possível identifcar previamente
There was a problem hiding this comment.
Removida a verificação de total_articles_to_publish e a query associada no commit e75d39b. task_sync_issue volta a ser despachada incondicionalmente para QA e PUBLIC, preservando a garantia de deduplicação no site.
Agent-Logs-Url: https://github.com/scieloorg/scms-upload/sessions/b718a687-b854-4ed5-953c-723005935d77 Co-authored-by: robertatakenaka <505143+robertatakenaka@users.noreply.github.com>
| force_update=force_import_acron_id_file, | ||
| ) | ||
|
|
||
| qa_api_data = get_api_data(journal_proc.collection, "issue", "QA") |
There was a problem hiding this comment.
@copilot o objetivo de criar qa_api_data e public_api_data é para que o token seja solicitado apenas 1 vezes no início, depois somente quando expirar. Qual uma boa alternativa para lidar com este propósito?
There was a problem hiding this comment.
Bom ponto — o TTL=600s da primeira versão forçava re-login a cada 10 min mesmo com token válido, contrariando o objetivo. Ajustes no commit 1db793c em publication/api/publication.py:
- TTL elevado para 3600s (1h), alinhado ao lifetime típico de JWT — apenas como teto de segurança para não reter tokens indefinidamente em workers de longa duração.
- Novo helper
invalidate_api_data_cache(collection, content_type, website_kind): o consumidor que detectar falha de autenticação no resultado dePublicationAPI.post_datapode purgar a entrada e forçar novo login na próxima chamada. PublicationAPI.post_datajá refazget_token()automaticamente na própria instância em caso de falha (linha 113 do publication.py original) — então um token expirado entre a leitura do cache e o uso resulta no máximo em 1 retry interno, não em erro propagado.
Resultado: 1 login por (collection, content_type, website_kind) por worker até o token efetivamente expirar (detectado via invalidate_api_data_cache) ou até atingir 1h. Se preferir TTL maior/menor ou parametrizá-lo via Django setting, posso ajustar.
|
@copilot verifique e corrija uso excessivo de memória |
Agent-Logs-Url: https://github.com/scieloorg/scms-upload/sessions/e97cc5cb-d2eb-4e6d-9c79-892d0e3e7a33 Co-authored-by: robertatakenaka <505143+robertatakenaka@users.noreply.github.com>
…ync_issue Agent-Logs-Url: https://github.com/scieloorg/scms-upload/sessions/a51ed25f-46df-4709-a6af-ad635f11f563 Co-authored-by: robertatakenaka <505143+robertatakenaka@users.noreply.github.com>
Auditei a cadeia inteira procurando os hotspots de memória e apliquei 3 correções no commit c0fb7a2 ( 1. 2. 3. Os 5 testes de |
O que esse PR faz?
Aplica otimizações de memória, CPU e I/O na cadeia de tarefas
task_migrate_and_publish_articles → _by_journal → _by_issue → task_sync_issue → task_publish_article, focando nos itens de maior impacto previamente identificados em uma análise por criticidade.Mudanças principais:
get_api_dataempublication/api/publication.py. Cada chamada antes instanciavaPublicationAPI(...)cujo__init__dispara um HTTP login (get_token); o cache reduz dezenas de logins por job a um por(collection, content_type, website_kind)por worker. Estratégia alinhada ao objetivo de pedir token apenas 1× e revalidar somente quando expirar:invalidate_api_data_cache(collection, content_type, website_kind)para invalidação explícita por chave — consumidores que detectarem falha de autenticação no resultado dePublicationAPI.post_datapodem purgar a entrada e forçar novo login na próxima chamada;PublicationAPI.post_datajá refazget_token()automaticamente na própria instância em caso de falha, então um token expirado entre a leitura do cache e o uso resulta no máximo em 1 retry interno;copy.deepcopypara impedir envenenamento por mutação dos chamadores (ex.:api_data["verify"] = verifyemtask_publish_articles); respostas de erro não são cacheadas; helperclear_api_data_cache()exposto para testes/admin.qa_api_data/public_api_datapropagados de_by_journalpara_by_issue— eram dead-code (parâmetros aceitos mas nunca lidos no corpo) e infláveis no payload Celery (token + credenciais). Compatibilidade com mensagens já enfileiradas via**legacy_kwargs, que descarta silenciosamente as chaves legacy conhecidas e loga warning para chaves desconhecidas..iterator(chunk_size=100)no loop de migração de artigos em_by_issue+select_related("issue_proc","issue_proc__journal_proc","collection","sps_pkg")para reduzir RAM e queries N+1 (a@propertyjournal_procemArticleProcresolve viaissue_proc.journal_proc, ecollectioné FK direto emBaseProc).selected_article_proc_itemsem_by_journalagora usa.iterator(chunk_size=1000).count()redundantes emtask_sync_issue(haviacount()extra além da iteração).select_relatedque era ignorado porvalues_listemtask_sync_issue(silently ineficaz).publication/api/test_publication.pycobrindo: hit por chave, isolamento entre chaves, imunidade a mutação aninhada (deepcopy), não-cache de erros, helperclear_api_data_cachee invalidação por chave viainvalidate_api_data_cache.Redução de uso de memória (auditoria adicional, commit
c0fb7a2):task_sync_issue— substituídolist(qs.values_list("id", flat=True))(que materializava todos os IDs de artigos do issue em RAM — milhares em issues grandes) por.count()paratotal_to_process+.iterator(chunk_size=500)no loop de publicação. Footprint constante independente do tamanho do issue.task_migrate_and_publish_articles_by_journal— o dictissue_proc_and_related_article_proc_id_list(mapa{issue_id: [article_ids...]}) era mantido inteiro em RAM até o fim dofor ... .items(). Agorawhile ... popitem()libera cada lista de IDs assim que a tarefa Celery é despachada — pico de memória cai conforme o loop avança.task_migrate_and_publish_articles(top-level) — mesmo padrãopopitem()paraitems_to_process(mapa{journal_id: [issue_ids...]}para coleções inteiras), evitando manter o dict completo em RAM durante o despacho..iterator(chunk_size=1000)adicionado nosvalues_listque alimentamitems_to_process(issues e journals da coleção), evitando carregar todas as linhas do queryset de uma vez.task_sync_issuecontinua sendo despachada incondicionalmente para QA e PUBLIC após a migração — por feedback da revisão, ela precisa rodar sempre para garantir que não exista duplicidade no site, condição que não pode ser determinada antecipadamente. A query/contagemtotal_articles_to_publishque existia em_by_issueapenas para condicionar o despacho foi removida.Itens deixados para PR separado por requererem mudanças maiores de fluxo: 🔴 #3 (skip de
task_sync_issuequando vazio — descartado: precisa rodar para deduplicação no site), 🔴 #4 (loop síncrono emtask_sync_issue— requer redesign com rate-limit/group), 🟠 #7 (dedup síncrono executado sempre), 🟠 #8 (dupla seleção entre_by_journale_by_issue), e itens 🟢/🟡 de limpeza.Onde a revisão poderia começar?
publication/api/publication.py— adição do cache (_API_DATA_CACHE,get_api_data,clear_api_data_cache,invalidate_api_data_cache).proc/tasks.py,task_migrate_and_publish_articles(top-level, ~linhas 830–880) parapopitem()+.iterator()nos querysets que alimentamitems_to_process.proc/tasks.py,task_migrate_and_publish_articles_by_journal(~linhas 900–1010) etask_migrate_and_publish_articles_by_issue(~linhas 1015–1135), onde concentram-se a remoção dos kwargs de api_data, oiterator()/select_relatede o drain viapopitem()do dict de despacho.proc/tasks.py,task_sync_issue(~linhas 1160–1220) para o uso de.count()+.iterator(chunk_size=500)em vez da lista materializada de IDs.publication/api/test_publication.py— novos testes do cache.Como este poderia ser testado manualmente?
task_migrate_and_publish_articlespara uma coleção com vários issues a processar.PublicationAPI.get_token— a contagem por worker deve cair drasticamente (ordem de magnitude) em relação ao baseline; em até 1h de execução de um worker, deve haver no máximo 1 login por(collection, content_type, website_kind)(ou novo login apósinvalidate_api_data_cache).invalidate_api_data_cache(collection, "issue", "QA")e confirmar que a próxima chamada deget_api_data(collection, "issue", "QA")aciona um novoget_token.qa_api_data/public_api_datano kwargs) —_by_issuedeve processar normalmente, sem warning. Enviar uma com kwarg fictício (ex.:foo=1) — deve aparecer warningtask_migrate_and_publish_articles_by_issue: ignoring unknown kwargs ['foo'].OfficialJournal#5/Crear un modelo en la carpetajournalspara ingresar títulos no oficiales de las revistas #6: monitorar o uso de RAM do worker em issues com muitos artigos — deve manter-se estável (chunks de 100); número de queries por issue deve diminuir (sem N+1 emjournal_proc/collection/sps_pkg, semcount()extras).task_migrate_and_publish_articlese_by_journaldeve decrescer ao longo do loop (drain viapopitem()); emtask_sync_issue, RSS deve manter-se constante mesmo para issues com milhares de artigos (sem materialização da lista de IDs).task_sync_issueainda deve ser enfileirada 2× (QA + PUBLIC) — verificar nos logs/eventos doTaskExecution.pytest publication/api/test_publication.py proc/tests.py(5 casos do cache + demais casos passando).Algum cenário de contexto que queira dar?
A análise prévia apontava 4 itens 🔴 críticos e 4 🟠 altos. Este PR endereça os de maior ROI e menor risco de mudança comportamental:
popitem()+.iterator()nos querysets que alimentam o dict top-level)task_sync_issuedespachada com 0 artigostask_sync_issueiterator/select_relatedna migraçãocount()redundantestask_sync_issue)select_itemsduplicado entre níveisSobre a estratégia do cache: o objetivo original de
qa_api_data/public_api_dataera pedir o token 1× e renovar somente quando expirasse. A combinação adotada preserva esse objetivo sem trafegar token por mensagens Celery — TTL longo (1h) como teto +invalidate_api_data_cachepara invalidação dirigida quando o consumidor detectar falha de autenticação + retry interno automático emPublicationAPI.post_data. Resultado: 1 login por chave por worker, com renovação reativa (não periódica) quando o token efetivamente falha.Sobre a redução adicional de memória (auditoria solicitada na revisão): o pico de RAM dos workers crescia proporcionalmente a (a) número total de IDs de artigos a publicar por issue em
task_sync_issue(materializados em lista) e (b) tamanho dos dicts de despacho em_by_journale no top-level (mantidos inteiros até o fim do loop). Substituindolist(...)por.count()+.iterator()e ofor ... .items()porwhile ... popitem(), o footprint passa a ser constante (cache de página do queryset + 1 chave do dict por iteração), eliminando o crescimento linear no tamanho dos dados de entrada.A combinação de cache + remoção dos kwargs
qa_api_data/public_api_dataresolve dois problemas em um: corta o número de logins HTTP por job e ainda reduz substancialmente o tamanho das mensagens Celery enviadas ao broker (token + credenciais não trafegam mais por mensagem).Antes (problemático):
Depois: as duas chamadas e os dois kwargs foram removidos;
task_sync_issue(que efetivamente precisa deapi_data) obtém viaget_api_datacacheado.Screenshots
N/A.
Quais são tickets relevantes?
Issue de otimização da tarefa
migrate_and_publish_articlesreferenciada na descrição do problema.Referências
proc/tasks.py— definição das 4 tarefas da cadeia.publication/api/publication.py—PublicationAPI.get_token(custo HTTP deget_api_data),PublicationAPI.post_data(retry interno em falha de auth) e novo cache_API_DATA_CACHE+invalidate_api_data_cache.proc/models.py—ArticleProc.journal_proc(property que resolve viaissue_proc) eBaseProc.collection(FK direto), guiando oselect_relatedaplicado.bigbang/tasks_scheduler.py— periodicidade do disparo (ARTICLE_DB_MIGRATION_MINUTES).