Skip to content
Search ESC

Building Durable RAG Pipelines with Temporal: Ingestion, Embedding, and Index Management

2026-06-22 · 8 min read · Igor Bobriakov

Most RAG ingestion pipelines fail silently. Not with a stack trace. Not with an alert. With a partially indexed corpus and a search system that quietly returns incomplete results.

The failure mode is straightforward: a script loads documents, calls an embedding API, and upserts vectors to a vector database. Midway through a large batch, the embedding API times out. The script catches the exception, logs it, and exits. Some documents are now in a liminal state — their metadata records exist, their chunk IDs are registered, but their embeddings were never written to the index. Queries that should hit those chunks return nothing. The pipeline logs show a completed run. No alert fires. No one knows.

This is not an edge case. It is a common failure mode for RAG ingestion pipelines that run as cron scripts without durable execution semantics. The fix is not better error handling alone — it is making the execution itself durable, so that partial failures are detected, checkpointed, and resumed at the point of failure without re-processing what already completed.

Temporal is a strong tool for this class of problem. This post covers how to structure a RAG ingestion pipeline as a Temporal workflow: the activity decomposition, retry policies, embedding rate limiting, index consistency guarantees, and re-indexing at scale.

Why RAG Ingestion Needs Durable Execution

A RAG ingestion pipeline has three properties that make standard orchestration insufficient.

It is expensive to restart from scratch. Embedding a large corpus costs real money. If a pipeline fails near the end of a run, re-running from the beginning wastes both cost and time. A durable execution system checkpoints progress so restarts resume from the last completed activity.

It touches multiple external systems. Document stores, embedding APIs, and vector databases are each independently fallible. A network partition between any two of them creates a partial-write scenario. Without coordination, the pipeline has no way to know which writes committed.

Silent inconsistency is worse than visible failure. A crashed pipeline is visible. A pipeline that completes with a corrupted index is not. When embedding silently fails for part of the corpus, retrieval quality degrades — but the degradation is diffuse enough that it looks like a content problem, not an infrastructure problem.

Temporal addresses all three by treating the workflow as a durable state machine. Every activity completion is recorded. If the worker dies, Temporal replays history and resumes execution from the last checkpoint. The pipeline cannot silently skip steps.

This pattern surfaces most often when teams move a RAG prototype to production on a corpus that actually changes — document edits, additions, deletions — and discover that their ingestion script was never built to handle partial failures or re-indexing. The script worked fine in development against a static dataset. In production against a live SharePoint or Confluence corpus, it degrades silently.

Orchestration Approach Comparison

Before committing to Temporal, it is worth understanding where it sits relative to simpler options.

ApproachDurabilityPer-Activity RetryPartial Failure VisibilityOperational Complexity
Cron + Python scriptNone — crash loses all stateManual try/except onlyNone — silent gapsLow to start, high to maintain
Airflow DAGTask-level — DAG reruns from failed taskTask-level with configurable retriesModerate — task status visible in UIHigh — scheduler, DB, executors
Celery + RedisMessage-level — depends on ack settingsYes, with backoff configurationLow — requires external monitoringModerate — broker + workers + flower
TemporalFull — event-sourced history survives worker deathPer-activity, configurable per callHigh — full history in UI + APIModerate — cluster + workers, local dev via Docker

Airflow is a reasonable choice when your pipeline maps cleanly to a DAG and you already operate it. Temporal is the better choice when pipelines are long-running, involve sub-workflow fan-out, or require fine-grained retry control at the activity level without restructuring the entire DAG. The RAG ingestion case fits Temporal’s strengths: variable document counts, expensive per-step operations, and a need for partial failure detection rather than all-or-nothing reruns.

Workflow Design: Activity Decomposition

The core principle: each activity must be idempotent and independently retryable.

In practice, this means splitting the pipeline into four discrete activities:

  1. Fetch document — retrieve raw content from the source (S3, SharePoint, a CMS API)
  2. Chunk document — apply your splitting strategy, return a list of chunk dicts with text and metadata
  3. Embed chunk batch — call the embedding API for a batch of N chunks, return vectors
  4. Upsert to index — write vectors and metadata to the vector database

Never combine embedding and upsert into one activity. If the upsert fails after embedding succeeds, you want to retry only the upsert. If they are one activity, you re-embed at cost unnecessarily.

Here is the complete workflow with Pydantic models and activity retry policies:

from datetime import timedelta
from typing import Optional
import asyncio
from pydantic import BaseModel
from temporalio import activity, workflow
from temporalio.common import RetryPolicy
from temporalio.client import Client
from temporalio.worker import Worker
# --- Pydantic data models ---
class DocumentRef(BaseModel):
doc_id: str
source_uri: str
content_hash: Optional[str] = None
class Chunk(BaseModel):
chunk_id: str
doc_id: str
text: str
metadata: dict
class EmbeddedChunk(BaseModel):
chunk_id: str
doc_id: str
embedding: list[float]
metadata: dict
class IngestionResult(BaseModel):
doc_id: str
chunks_indexed: int
status: str # "completed" | "skipped" | "failed"
class RAGIngestionInput(BaseModel):
documents: list[DocumentRef]
embedding_model: str = "text-embedding-3-small"
chunk_size: int = 512
chunk_overlap: int = 64
batch_size: int = 32
# --- Activities ---
@activity.defn
async def fetch_document(doc_ref: DocumentRef) -> str:
"""Fetch raw document content from source URI."""
import httpx
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.get(doc_ref.source_uri)
resp.raise_for_status()
return resp.text
@activity.defn
async def chunk_document(
doc_id: str,
content: str,
chunk_size: int,
chunk_overlap: int,
) -> list[Chunk]:
"""Split document content into chunks with stable IDs."""
import hashlib
from langchain_text_splitters import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
)
texts = splitter.split_text(content)
chunks = []
for i, text in enumerate(texts):
chunk_id = hashlib.sha256(f"{doc_id}:{i}:{text[:64]}".encode()).hexdigest()[:16]
chunks.append(Chunk(
chunk_id=chunk_id,
doc_id=doc_id,
text=text,
metadata={"doc_id": doc_id, "chunk_index": i},
))
return chunks
@activity.defn
async def embed_chunk_batch(
chunks: list[Chunk],
embedding_model: str,
) -> list[EmbeddedChunk]:
"""Embed a batch of chunks. Rate-limit aware, retryable."""
import openai
import asyncio
activity.heartbeat("starting embedding batch")
client = openai.AsyncOpenAI()
texts = [c.text for c in chunks]
# Respect rate limits — sleep between batches if needed
await asyncio.sleep(0.1)
response = await client.embeddings.create(
model=embedding_model,
input=texts,
)
activity.heartbeat(f"embedding complete, {len(chunks)} chunks")
return [
EmbeddedChunk(
chunk_id=chunks[i].chunk_id,
doc_id=chunks[i].doc_id,
embedding=data.embedding,
metadata=chunks[i].metadata,
)
for i, data in enumerate(response.data)
]
@activity.defn
async def upsert_to_index(
embedded_chunks: list[EmbeddedChunk],
index_name: str,
) -> int:
"""Upsert embedded chunks to vector index. Returns count upserted."""
import pinecone
pc = pinecone.Pinecone()
index = pc.Index(index_name)
vectors = [
{
"id": ec.chunk_id,
"values": ec.embedding,
"metadata": ec.metadata,
}
for ec in embedded_chunks
]
response = index.upsert(vectors=vectors)
return response.upserted_count
@activity.defn
async def check_document_hash(doc_ref: DocumentRef) -> Optional[str]:
"""Return stored content hash for this doc, or None if not indexed."""
import asyncpg
conn = await asyncpg.connect(dsn="postgresql://...")
row = await conn.fetchrow(
"SELECT content_hash FROM doc_index_state WHERE doc_id = $1",
doc_ref.doc_id,
)
await conn.close()
return row["content_hash"] if row else None
@activity.defn
async def record_ingestion_state(result: IngestionResult) -> None:
"""Persist ingestion result to metadata store."""
import asyncpg
conn = await asyncpg.connect(dsn="postgresql://...")
await conn.execute(
"""
INSERT INTO doc_index_state (doc_id, content_hash, chunks_indexed, status, indexed_at)
VALUES ($1, $2, $3, $4, NOW())
ON CONFLICT (doc_id) DO UPDATE
SET content_hash = EXCLUDED.content_hash,
chunks_indexed = EXCLUDED.chunks_indexed,
status = EXCLUDED.status,
indexed_at = EXCLUDED.indexed_at
""",
result.doc_id, result.status, result.chunks_indexed, result.status,
)
await conn.close()
# --- Retry policies ---
FETCH_RETRY = RetryPolicy(
maximum_attempts=5,
initial_interval=timedelta(seconds=2),
maximum_interval=timedelta(seconds=30),
backoff_coefficient=2.0,
non_retryable_error_types=["FileNotFoundError", "PermissionError"],
)
EMBED_RETRY = RetryPolicy(
maximum_attempts=8,
initial_interval=timedelta(seconds=5),
maximum_interval=timedelta(minutes=2),
backoff_coefficient=2.0,
non_retryable_error_types=["openai.BadRequestError", "openai.AuthenticationError"],
)
UPSERT_RETRY = RetryPolicy(
maximum_attempts=10,
initial_interval=timedelta(seconds=2),
maximum_interval=timedelta(minutes=1),
backoff_coefficient=2.0,
)
# --- Workflow ---
@workflow.defn
class RAGIngestionWorkflow:
@workflow.run
async def run(self, params: RAGIngestionInput) -> list[IngestionResult]:
results = []
for doc_ref in params.documents:
# Check if document has changed since last index run
stored_hash = await workflow.execute_activity(
check_document_hash,
doc_ref,
start_to_close_timeout=timedelta(seconds=10),
retry_policy=UPSERT_RETRY,
)
if stored_hash == doc_ref.content_hash:
results.append(IngestionResult(
doc_id=doc_ref.doc_id,
chunks_indexed=0,
status="skipped",
))
continue
# Fetch
content = await workflow.execute_activity(
fetch_document,
doc_ref,
start_to_close_timeout=timedelta(seconds=60),
retry_policy=FETCH_RETRY,
)
# Chunk
chunks = await workflow.execute_activity(
chunk_document,
args=[doc_ref.doc_id, content, params.chunk_size, params.chunk_overlap],
start_to_close_timeout=timedelta(seconds=30),
retry_policy=RetryPolicy(maximum_attempts=3),
)
# Embed + upsert in batches
total_upserted = 0
for i in range(0, len(chunks), params.batch_size):
batch = chunks[i:i + params.batch_size]
embedded = await workflow.execute_activity(
embed_chunk_batch,
args=[batch, params.embedding_model],
start_to_close_timeout=timedelta(minutes=5),
heartbeat_timeout=timedelta(seconds=30),
retry_policy=EMBED_RETRY,
)
upserted = await workflow.execute_activity(
upsert_to_index,
args=[embedded, "rag-production"],
start_to_close_timeout=timedelta(minutes=2),
retry_policy=UPSERT_RETRY,
)
total_upserted += upserted
result = IngestionResult(
doc_id=doc_ref.doc_id,
chunks_indexed=total_upserted,
status="completed",
)
await workflow.execute_activity(
record_ingestion_state,
result,
start_to_close_timeout=timedelta(seconds=15),
retry_policy=UPSERT_RETRY,
)
results.append(result)
return results

Embedding Activity: Retry Policies and Rate Limiting

The embedding activity deserves specific attention because its failure modes are different from generic HTTP calls.

Non-retryable errors are not obvious. A 400 Bad Request from an embeddings API often means the input text exceeds the token limit for the model. Retrying it will fail again. Include the provider’s bad-request exception in non_retryable_error_types. Authentication failures should also be non-retryable. Rate limit errors should use exponential backoff with a conservative initial interval to avoid hammering a rate-limited endpoint.

Heartbeats matter for long batches. If you embed large batches or process slow documents, the activity can legitimately run longer than ordinary HTTP defaults. Without heartbeats, Temporal cannot distinguish a slow-but-healthy activity from a stuck worker. Set a heartbeat timeout and call activity.heartbeat() after each sub-batch or processing checkpoint. This gives Temporal a liveness signal without requiring the activity to complete.

Principle: Rate limiting in an embedding activity should be applied before the API call, not reactively in the retry handler. A small delay or token-bucket check before each batch is cheap at normal throughput and prevents the burst pattern that triggers rate limits in the first place. Reactive retry on rate limits increases latency on every rate-limited batch.

Batch size interacts with cost. Larger batches reduce API call overhead but create coarser retry granularity. If a large batch fails late, the whole batch may be re-embedded on retry. Choose a batch size that balances call overhead against retry cost for the embedding model and provider limits you use.

Index Management: Partial Updates and Consistency

Keeping the vector index consistent with the document corpus is the hardest part of RAG pipeline operations. Three scenarios cause index drift:

Partial upserts. If a network error interrupts an upsert, some vectors from a batch may have written and some may not. Because upsert operations are typically not transactional in vector databases, you cannot roll back the partial write. The correct approach is to make upserts idempotent: use stable chunk IDs (derived from document ID and chunk position, not random UUIDs), so that re-upserting the same chunk on retry overwrites rather than duplicates.

Document updates. When a source document changes, the stale chunk vectors must be deleted before or alongside the new chunks being written. Do not rely on overwriting by ID alone — if a document edit reduces chunk count, orphaned chunk vectors can remain indexed and surface in retrieval. Track the full list of chunk IDs per document in a metadata store, and on update, delete the full prior set before upserting the new set.

Document deletes. When a document is removed from the source, its chunks must be purged from the index. Without a delete signal, the vectors remain indefinitely. Build a delete activity that accepts a document ID, retrieves the associated chunk IDs from the metadata store, deletes them from the vector index by ID, and removes the metadata record.

Warning: Never use a metadata filter-based delete (e.g., delete where doc_id == X) as your primary delete mechanism in production. Filter-based deletes are slower, more expensive, and not consistently atomic across vector database implementations. Delete by explicit ID list retrieved from your metadata store. This also means your metadata store is the source of truth for which chunks exist — the vector index is a derived artifact.

Monitoring Pipeline Health

A Temporal workflow gives you execution history for free. Every activity start, completion, failure, and retry is recorded in the workflow history and visible in the Temporal UI. This handles the “did the pipeline run” question.

The harder monitoring question is “did the pipeline produce a correct index.” Four metrics matter:

Ingestion lag — the time between a document being created or modified in the source and its embeddings landing in the index. This is the freshness window. Measure it by comparing document updated_at timestamps in the source with indexed_at timestamps in your metadata store.

Chunk coverage — for each document, the ratio of expected chunks (from chunking) to confirmed-indexed chunks (from upsert counts). Any document where coverage is less than 1.0 has partial indexing. A nightly scan across your metadata store that flags coverage gaps is more reliable than inferring health from API error rates.

Embedding error rate by document type — PDFs with complex layouts, scanned documents, and very short documents (under 50 tokens) fail at higher rates. Tracking error rate by MIME type and source lets you identify structurally problematic document categories rather than treating all failures as equivalent.

Index-metadata agreement — periodically sample a set of document IDs from your metadata store and verify that the expected chunk IDs exist in the vector index. This catches cases where the metadata write succeeded but the vector write failed silently.

Re-indexing at Scale

Full corpus re-indexing is required when you switch embedding models, change chunking strategy, or need to recover from index corruption. For large corpora, sequential re-indexing may be too slow. The Temporal pattern for this is child workflows.

The parent workflow accepts the full document list and fans out — spawning one child workflow per document (or per batch of N documents, depending on corpus size). Each child workflow runs the full fetch-chunk-embed-upsert sequence for its slice. The parent collects results and surfaces any child failures without blocking successful children.

@workflow.defn
class RAGBulkReindexWorkflow:
@workflow.run
async def run(self, doc_refs: list[DocumentRef]) -> dict:
batch_size = 50
child_handles = []
for i in range(0, len(doc_refs), batch_size):
batch = doc_refs[i:i + batch_size]
handle = await workflow.start_child_workflow(
RAGIngestionWorkflow,
RAGIngestionInput(documents=batch),
id=f"rag-reindex-batch-{i // batch_size}",
execution_timeout=timedelta(hours=2),
)
child_handles.append(handle)
results = await asyncio.gather(*[h.result() for h in child_handles])
total_indexed = sum(
r.chunks_indexed
for batch_results in results
for r in batch_results
if r.status == "completed"
)
failed_docs = [
r.doc_id
for batch_results in results
for r in batch_results
if r.status == "failed"
]
return {"total_indexed": total_indexed, "failed_docs": failed_docs}

Child workflows run concurrently up to Temporal’s configured parallelism limit. Each child’s history is independent — a failure in one batch does not block others, and individual child workflows can be retried without re-running the entire corpus.

Principle: Treat re-indexing as a first-class operational procedure, not an emergency measure. Build and test the bulk re-index workflow before you need it. The worst time to debug a re-index workflow is when the production index is corrupted and retrieval is broken.

The deeper issue is that most teams treat the vector index as an immutable artifact once it is built — they think about ingestion once at setup, not as an ongoing operational concern. That framing is wrong. The index is a derived cache of your document corpus. Anything that changes the corpus — document edits, model upgrades, chunking strategy changes — invalidates some or all of that cache. A production RAG system needs an ingestion pipeline with the same operational maturity as a data warehouse ETL: idempotent, observable, and independently re-runnable on any subset of the corpus without manual intervention.

Pre-Deploy Checklist for RAG Ingestion with Temporal

  • Each activity is idempotent — re-running with the same inputs produces the same index state without duplicates
  • Chunk IDs are derived deterministically from document ID and content, not generated randomly at runtime
  • Non-retryable error types are explicitly listed in each activity's retry policy (especially for embedding API 400 and 401 responses)
  • Heartbeat calls are present in any embedding or fetch activity that may run longer than ordinary request timeouts
  • Document deletes trigger chunk vector deletion by ID list, not metadata filter scan
  • Chunk coverage monitoring is in place — a nightly scan that flags documents with coverage below 1.0
  • The bulk re-index workflow is tested against a representative corpus slice before the production run

The retry policy patterns in this post extend the approach in Temporal Activity Retry Patterns for LLM API Calls — particularly the non-retryable error classification and heartbeat timeout configuration. The ingestion architecture connects directly to the broader enterprise challenge covered in Enterprise RAG Beyond the Demo, where ACL propagation and document churn compound the consistency problems that durable execution addresses at the infrastructure layer. For a full checklist of production RAG concerns beyond ingestion, see The Production-Ready RAG Pipeline.

For a broader introduction to Temporal in AI systems, Temporal for Durable AI Agents covers the execution model and worker architecture that this ingestion design builds on.


Why do standard RAG ingestion pipelines fail silently?

Most RAG ingestion pipelines run as scripts or cron jobs with no execution durability. If a network timeout occurs mid-embedding, or a vector database upsert partially completes, there is no record of which chunks succeeded and which did not. The process exits with either no error or a generic exception, and the index is left in an inconsistent state. Documents appear indexed because their metadata was written, but the underlying embeddings for some chunks were never stored. Queries that should retrieve those chunks return nothing, with no visible signal that retrieval is broken.

What is the correct unit of work for a Temporal activity in a RAG pipeline?

Each activity should correspond to one atomic, independently retryable operation: fetching a document, chunking it, embedding a single batch, or upserting one batch to the index. Grouping multiple operations into one activity defeats Temporal's retry model — if embedding succeeds but the upsert fails, you want to retry only the upsert, not re-embed at cost. Keeping activities granular means each retry is scoped to the actual failure point. The workflow function composes these activities in sequence and tracks completion state across retries and worker restarts.

How should partial re-indexing work when source documents change?

Track a content hash per document alongside its chunk IDs in a metadata store. When a document changes, compute the new hash, identify which chunks changed, delete the stale chunk vectors from the index by ID, embed and upsert only the changed chunks, and update the stored hash and chunk manifest. This avoids full re-indexing on minor edits. For large corpora, a Temporal workflow can process changed documents in parallel child workflows, each scoped to one document, with the parent workflow coordinating overall progress and surfacing any failures.

What monitoring signals matter most for a RAG ingestion pipeline?

Four signals matter: ingestion lag (time between a document being created or updated and its embeddings landing in the index), chunk coverage (ratio of expected chunks to indexed chunks per document — gaps indicate silent partial failures), embedding error rate by document type and source, and index consistency check results (periodic scans that verify metadata and vector counts agree). Temporal's built-in workflow history gives you retry counts and activity failure details without custom instrumentation. For the index, a nightly coverage scan that writes results to a monitoring table is more reliable than inferring health from API error rates alone.


The decision rule

Building a RAG pipeline that breaks silently is not a code quality problem; it is an architecture choice. If your current ingestion pipeline runs as a script with no durable execution guarantee, partial failures are invisible by design. Review ingestion durability, index consistency guarantees, embedding failure paths, and recovery visibility before retrieval quality incidents make the gap visible. The Enterprise Agentic Assessment Kit can structure that first review.

Technical Review

Bring the system under review

Send the system context, constraints, and pressure. A Principal Engineer reviews it and recommends the next step.

[ SUBMIT SPECS ]

No SDRs. A Principal Engineer reviews every submission.

About the author

Igor Bobriakov

AI Architect. Author of Production-Ready AI Agents. 15 years deploying production AI platforms and agentic systems for enterprise clients and deep-tech startups.