Processamento Incremental: Atualizações Rápidas do Pipeline

Este artigo explica como o pipeline de SEO usa processamento incremental para ser executado em segundos em vez de horas.

O Problema: Reprocessamento Completo é Lento

Executar todo o pipeline do zero leva horas:

  • Etapa 0 (Incorporação da Fonte): 15 minutos (65.000 produtos)

  • Etapa 1 (Busca de Consultas): 10 minutos (chamadas de API)

  • Etapa 2 (Agrupamento de Consultas): 30 minutos (similaridade 65K×65K)

  • Etapa 3 (Mapeamentos de Frases): 20 minutos (incorporação + correspondência)

  • Etapa 4 (Correspondência de Produtos): 45 minutos (consultas × produtos)

  • Etapa 5 (Pesquisas Relacionadas): 25 minutos (similaridade consulta × consulta)

Total: ~2,5 horas para o pipeline completo

Problema: Atualizações diárias desperdiçariam 2,5 horas recalculando dados inalterados.

A Solução: Estratégia Incremental de Três Camadas

Usamos três técnicas para pular trabalho desnecessário:

1. Pular Etapas (Granulação Grossa)

Pule etapas inteiras se a saída estiver atualizada e o script inalterado.

2. Incorporação Incremental (Granulação Média)

Incorpore apenas itens novos/alterados, reutilize incorporações em cache.

3. Checkpointing (Granulação Fina)

Salve o progresso durante operações longas, retome do ponto de verificação em caso de falha.

Estratégia de Pular Etapas

Como Funciona

Antes de cada etapa, verifique:

A saída existe? Se não, execute a etapa.

Idade da saída: Se mais antiga que 7 dias, execute a etapa.

O script mudou? Se o script foi modificado desde que a saída foi gerada, execute a etapa.

Todas as verificações passaram? Pule a etapa.

Implementação

def should_skip_step(output_path, script_path, days=7):
    # Verifica se a saída existe
    if not os.path.exists(output_path):
# ... (detalhes da implementação omitidos)

Uso

Cada script verifica na inicialização:

from seo_common import should_skip_step

if should_skip_step(SEO_SOURCE_EMBEDDINGS_PATH, __file__):
    print("✓ Pulando: A saída está atualizada e o script inalterado")
    return

Benefícios

Execuções diárias rápidas: A maioria das etapas é pulada se os dados estiverem inalterados

Invalidação automática: Mudanças no script disparam nova execução

Atualização configurável: Ajuste o parâmetro days por etapa

Estratégia de Incorporação Incremental

Como Funciona

Ao incorporar itens (produtos, consultas, frases):

Carregue o cache: Leia itens incorporados anteriormente e suas chaves

Compare as chaves: Identifique itens novos, alterados e excluídos

Incorpore apenas novos: Incorpore apenas itens não presentes no cache

Mescle: Combine incorporações em cache com novas incorporações na ordem correta

Salve: Escreva o cache atualizado

Implementação

A função incremental_embed_with_keys lida com isso:

def incremental_embed_with_keys(
    items,           # Itens atuais para incorporar
    keys,            # Chaves únicas para os itens
# ... (detalhes da implementação omitidos)

Taxas de Acerto no Cache

Taxas típicas de acerto no cache após a primeira execução:

Dados de origem (produtos, peças, artigos):

  • Primeira execução: 0% (incorporar todos os 65.000 itens)

  • Execução diária: 99,5% (apenas ~300 itens novos/alterados)

Consultas (do GSC, Ads, ao vivo):

  • Primeira execução: 0% (incorporar todas as 65.000 consultas)

  • Execução diária: 99,2% (apenas ~500 novas consultas)

Mapeamentos de frases:

  • Primeira execução: 0% (incorporar todas as 5.000 frases)

  • Execução diária: 99,8% (apenas ~10 novas frases)

Impacto no Desempenho

Primeira execução (cache frio):

  • Incorporação da fonte: 15 minutos (65.000 itens)

  • Incorporação de consultas: 10 minutos (65.000 consultas)

  • Incorporação de frases: 2 minutos (5.000 frases)

Execução diária (cache aquecido):

  • Incorporação da fonte: 10 segundos (300 itens, taxa de acerto de 99,5%)

  • Incorporação de consultas: 5 segundos (500 consultas, taxa de acerto de 99,2%)

  • Incorporação de frases: 1 segundo (10 frases, taxa de acerto de 99,8%)

Aceleração: 90-180× mais rápido

Estratégia de Checkpointing

Como Funciona

Para operações de longa duração (incorporar 65.000 itens):

Processamento em lotes: Processe itens em lotes (ex.: 1.000 itens)

Salve o ponto de verificação: Após cada lote, salve os resultados acumulados

Retome em caso de falha: Se o processo travar, retome do último ponto de verificação

Salvamento final: Após todos os lotes, salve os resultados completos

Implementação

O checkpointing é integrado ao incremental_embed_with_keys:

checkpoint_every = 1000  # Salvar a cada 1.000 itens

embeddings_list = []
# ... (detalhes da implementação omitidos)

Benefícios

Recuperação de falhas: Retome do último ponto de verificação em vez de recomeçar

Visibilidade do progresso: Veja o progresso a cada 1.000 itens

Eficiência de memória: Processe em lotes, não carregue tudo de uma vez

Integração em Todo o Pipeline

O processamento incremental é usado em várias etapas:

Etapa 0: Incorporação de Dados de Origem

Incremental: Incorpore apenas produtos, peças e artigos novos/alterados

Checkpointing: Salve a cada 1.000 itens

Lógica de pulo: Pule se a saída tiver < 7 dias e o script estiver inalterado

Veja: Incorporação de Dados de Origem

Etapa 1: Busca de Consultas

Incremental: Chamadas de API buscam apenas dados novos (desde a última execução)

Lógica de pulo: Pule se a saída tiver < 1 dia

Veja: Busca de Consultas

Etapa 3b: Incorporação de Consultas

Incremental: Incorpore apenas consultas novas

Checkpointing: Salve a cada 1.000 consultas

Lógica de pulo: Pule se a saída tiver < 7 dias e o script estiver inalterado

Veja: Incorporação de Consultas

Etapa 4: Expansão de Mapeamento de Frases

Incremental: Incorpore apenas frases novas

Checkpointing: Salve a cada 1.000 frases

Lógica de pulo: Pule se a saída tiver < 7 dias e o script estiver inalterado

Veja: Mapeamentos de Frase para Filtro

Etapa 6: Correspondência de Produtos

Incremental: Correlacione apenas consultas novas

Lógica de pulo: Pule se a saída tiver < 7 dias e o script estiver inalterado

Veja: Correspondência de Produtos

Configuração

O processamento incremental é configurado por etapa:

Limiar de Atualização

# Pule se a saída tiver < 7 dias (padrão)
should_skip_step(output_path, script_path, days=7)

# Pule se a saída tiver < 1 dia (para dados que mudam com frequência)
should_skip_step(output_path, script_path, days=1)

Frequência de Checkpoint

# Salve a cada 1.000 itens (padrão)
incremental_embed_with_keys(..., checkpoint_every=1000)

# Salve a cada 5.000 itens (para processamento mais rápido, menos segurança)
incremental_embed_with_keys(..., checkpoint_every=5000)

Tamanho do Lote

# Incorpore 32 itens por lote (padrão, equilibrado)
incremental_embed_with_keys(..., batch_size=32)

# Incorpore 64 itens por lote (mais rápido em GPU, mais memória)
incremental_embed_with_keys(..., batch_size=64)

Monitoramento e Depuração

Estatísticas do Cache

Cada etapa imprime estatísticas do cache:

✓ Cache existente encontrado, verificando alterações...
  Existente: 65.000 itens
  Atual:     65.300 itens
  Reutilizando: 64.800 incorporações
  Novos:     500 itens para incorporar

Mensagens de Pulo

Quando etapas são puladas:

✓ Pulando 0_embed_source_data.py: A saída está atualizada e o script inalterado.

Mensagens de Checkpoint

Durante operações longas:

Incorporando 65.000 itens (checkpoint a cada 1.000)...
  Lote 0-1000...
    ✓ Checkpoint salvo (1.000 no total)
  Lote 1000-2000...
    ✓ Checkpoint salvo (2.000 no total)
  ...

Referências

Conceitos Técnicos

Artigos Relacionados

Resumo

O processamento incremental torna o pipeline 90-180× mais rápido:

Estratégia de três camadas:

  • ✅ Pular etapas (pule etapas inteiras se a saída estiver atualizada)

  • ✅ Incorporação incremental (incorpore apenas itens novos/alterados)

  • ✅ Checkpointing (salve o progresso, retome em caso de falha)

Desempenho:

  • ✅ Primeira execução: ~2,5 horas (pipeline completo)

  • ✅ Execução diária: ~5 minutos (atualizações incrementais)

  • ✅ Taxas de acerto no cache: 99%+ após a primeira execução

Benefícios:

  • ✅ Atualizações diárias rápidas (minutos em vez de horas)

  • ✅ Invalidação automática (mudanças no script disparam nova execução)

  • ✅ Recuperação de falhas (retome do ponto de verificação)

  • ✅ Eficiência de memória (processamento em lotes)

Esta estratégia permite execuções diárias do pipeline sem desperdiçar computação em dados inalterados.


← Voltar ao Índice de Documentação