Incremental Processing: Fast Pipeline Updates

This article explains how the SEO pipeline uses incremental processing to run in seconds instead of hours.

The Problem: Full Reprocessing is Slow

Running the entire pipeline from scratch takes hours:

  • Step 0 (Source embedding): 15 minutes (65,000 products)

  • Step 1 (Query fetching): 10 minutes (API calls)

  • Step 2 (Query clustering): 30 minutes (65K×65K similarity)

  • Step 3 (Phrase mappings): 20 minutes (embedding + matching)

  • Step 4 (Product matching): 45 minutes (queries × products)

  • Step 5 (Related searches): 25 minutes (query × query similarity)

Total: ~2.5 hours for full pipeline

Problem: Daily updates would waste 2.5 hours recomputing unchanged data.

The Solution: Three-Layer Incremental Strategy

We use three techniques to skip unnecessary work:

1. Step Skipping (Coarse-Grained)

Skip entire steps if output is fresh and script unchanged.

2. Incremental Embedding (Medium-Grained)

Only embed new/changed items, reuse cached embeddings.

3. Checkpointing (Fine-Grained)

Save progress during long operations, resume from checkpoint on failure.

Step Skipping Strategy

How It Works

Before each step, check:

Output exists? If no, run the step.

Output age: If older than 7 days, run the step.

Script changed? If script modified since output was generated, run the step.

All checks pass? Skip the step.

Implementation

def should_skip_step(output_path, script_path, days=7):
    # Check if output exists
    if not os.path.exists(output_path):
# ... (implementation details omitted)

Usage

Each script checks at startup:

from seo_common import should_skip_step

if should_skip_step(SEO_SOURCE_EMBEDDINGS_PATH, __file__):
    print("✓ Skipping: Output is fresh and script unchanged")
    return

Benefits

Fast daily runs: Most steps skipped if data unchanged

Automatic invalidation: Script changes trigger re-run

Configurable freshness: Adjust days parameter per step

Incremental Embedding Strategy

How It Works

When embedding items (products, queries, phrases):

Load cache: Read previously embedded items and their keys

Compare keys: Identify new, changed, and deleted items

Embed only new: Only embed items not in cache

Merge: Combine cached embeddings with new embeddings in correct order

Save: Write updated cache

Implementation

The incremental_embed_with_keys function handles this:

def incremental_embed_with_keys(
    items,           # Current items to embed
    keys,            # Unique keys for items
# ... (implementation details omitted)

Cache Hit Rates

Typical cache hit rates after first run:

Source data (products, parts, articles):

  • First run: 0% (embed all 65,000 items)

  • Daily run: 99.5% (only ~300 new/changed items)

Queries (from GSC, Ads, live):

  • First run: 0% (embed all 65,000 queries)

  • Daily run: 99.2% (only ~500 new queries)

Phrase mappings:

  • First run: 0% (embed all 5,000 phrases)

  • Daily run: 99.8% (only ~10 new phrases)

Performance Impact

First run (cold cache):

  • Source embedding: 15 minutes (65,000 items)

  • Query embedding: 10 minutes (65,000 queries)

  • Phrase embedding: 2 minutes (5,000 phrases)

Daily run (warm cache):

  • Source embedding: 10 seconds (300 items, 99.5% hit rate)

  • Query embedding: 5 seconds (500 queries, 99.2% hit rate)

  • Phrase embedding: 1 second (10 phrases, 99.8% hit rate)

Speedup: 90-180× faster

Checkpointing Strategy

How It Works

For long-running operations (embedding 65,000 items):

Batch processing: Process items in batches (e.g., 1,000 items)

Save checkpoint: After each batch, save accumulated results

Resume on failure: If process crashes, resume from last checkpoint

Final save: After all batches, save complete results

Implementation

Checkpointing is built into incremental_embed_with_keys:

checkpoint_every = 1000  # Save every 1,000 items

embeddings_list = []
# ... (implementation details omitted)

Benefits

Crash recovery: Resume from last checkpoint instead of starting over

Progress visibility: See progress every 1,000 items

Memory efficiency: Process in batches, don't load all at once

Integration Across Pipeline

Incremental processing is used in multiple steps:

Step 0: Source Data Embedding

Incremental: Only embed new/changed products, parts, articles

Checkpointing: Save every 1,000 items

Skip logic: Skip if output < 7 days old and script unchanged

See: Source Data Embedding

Step 1: Query Fetching

Incremental: API calls fetch only new data (since last run)

Skip logic: Skip if output < 1 day old

See: Query Fetching

Step 3b: Query Embedding

Incremental: Only embed new queries

Checkpointing: Save every 1,000 queries

Skip logic: Skip if output < 7 days old and script unchanged

See: Query Embedding

Step 4: Phrase Mapping Expansion

Incremental: Only embed new phrases

Checkpointing: Save every 1,000 phrases

Skip logic: Skip if output < 7 days old and script unchanged

See: Phrase-to-Filter Mappings

Step 6: Product Matching

Incremental: Only match new queries

Skip logic: Skip if output < 7 days old and script unchanged

See: Product Matching

Configuration

Incremental processing is configured per step:

Freshness Threshold

# Skip if output < 7 days old (default)
should_skip_step(output_path, script_path, days=7)

# Skip if output < 1 day old (for frequently changing data)
should_skip_step(output_path, script_path, days=1)

Checkpoint Frequency

# Save every 1,000 items (default)
incremental_embed_with_keys(..., checkpoint_every=1000)

# Save every 5,000 items (for faster processing, less safety)
incremental_embed_with_keys(..., checkpoint_every=5000)

Batch Size

# Embed 32 items per batch (default, balanced)
incremental_embed_with_keys(..., batch_size=32)

# Embed 64 items per batch (faster on GPU, more memory)
incremental_embed_with_keys(..., batch_size=64)

Monitoring and Debugging

Cache Statistics

Each step prints cache statistics:

✓ Found existing cache, checking for changes...
  Existing: 65,000 items
  Current:  65,300 items
  Reusing: 64,800 embeddings
  New:     500 items to embed

Skip Messages

When steps are skipped:

✓ Skipping 0_embed_source_data.py: Output is fresh and script unchanged.

Checkpoint Messages

During long operations:

Embedding 65,000 items (checkpointing every 1,000)...
  Batch 0-1000...
    ✓ Checkpoint saved (1,000 total)
  Batch 1000-2000...
    ✓ Checkpoint saved (2,000 total)
  ...

References

Technical Concepts

Related Articles

Summary

Incremental processing makes the pipeline 90-180× faster:

Three-layer strategy:

  • ✅ Step skipping (skip entire steps if output fresh)

  • ✅ Incremental embedding (only embed new/changed items)

  • ✅ Checkpointing (save progress, resume on failure)

Performance:

  • ✅ First run: ~2.5 hours (full pipeline)

  • ✅ Daily run: ~5 minutes (incremental updates)

  • ✅ Cache hit rates: 99%+ after first run

Benefits:

  • ✅ Fast daily updates (minutes instead of hours)

  • ✅ Automatic invalidation (script changes trigger re-run)

  • ✅ Crash recovery (resume from checkpoint)

  • ✅ Memory efficient (batch processing)

This strategy enables daily pipeline runs without wasting compute on unchanged data.


← Back to Documentation Index