Most RAG ingestion pipelines fail silently. Not with a stack trace. Not with an alert. With a partially indexed corpus and a search system that quietly returns incomplete results.
The failure mode is straightforward: a script loads documents, calls an embedding API, and upserts vectors to a vector database. Midway through a large batch, the embedding API times out. The script catches the exception, logs it, and exits. Some documents are now in a liminal state — their metadata records exist, their chunk IDs are registered, but their embeddings were never written to the index. Queries that should hit those chunks return nothing. The pipeline logs show a completed run. No alert fires. No one knows.
This is not an edge case. It is a common failure mode for RAG ingestion pipelines that run as cron scripts without durable execution semantics. The fix is not better error handling alone — it is making the execution itself durable, so that partial failures are detected, checkpointed, and resumed at the point of failure without re-processing what already completed.
Temporal is a strong tool for this class of problem. This post covers how to structure a RAG ingestion pipeline as a Temporal workflow: the activity decomposition, retry policies, embedding rate limiting, index consistency guarantees, and re-indexing at scale.
Why RAG Ingestion Needs Durable Execution
A RAG ingestion pipeline has three properties that make standard orchestration insufficient.
It is expensive to restart from scratch. Embedding a large corpus costs real money. If a pipeline fails near the end of a run, re-running from the beginning wastes both cost and time. A durable execution system checkpoints progress so restarts resume from the last completed activity.
It touches multiple external systems. Document stores, embedding APIs, and vector databases are each independently fallible. A network partition between any two of them creates a partial-write scenario. Without coordination, the pipeline has no way to know which writes committed.
Silent inconsistency is worse than visible failure. A crashed pipeline is visible. A pipeline that completes with a corrupted index is not. When embedding silently fails for part of the corpus, retrieval quality degrades — but the degradation is diffuse enough that it looks like a content problem, not an infrastructure problem.
Temporal addresses all three by treating the workflow as a durable state machine. Every activity completion is recorded. If the worker dies, Temporal replays history and resumes execution from the last checkpoint. The pipeline cannot silently skip steps.
This pattern surfaces most often when teams move a RAG prototype to production on a corpus that actually changes — document edits, additions, deletions — and discover that their ingestion script was never built to handle partial failures or re-indexing. The script worked fine in development against a static dataset. In production against a live SharePoint or Confluence corpus, it degrades silently.
Orchestration Approach Comparison
Before committing to Temporal, it is worth understanding where it sits relative to simpler options.
| Approach | Durability | Per-Activity Retry | Partial Failure Visibility | Operational Complexity |
|---|---|---|---|---|
| Cron + Python script | None — crash loses all state | Manual try/except only | None — silent gaps | Low to start, high to maintain |
| Airflow DAG | Task-level — DAG reruns from failed task | Task-level with configurable retries | Moderate — task status visible in UI | High — scheduler, DB, executors |
| Celery + Redis | Message-level — depends on ack settings | Yes, with backoff configuration | Low — requires external monitoring | Moderate — broker + workers + flower |
| Temporal | Full — event-sourced history survives worker death | Per-activity, configurable per call | High — full history in UI + API | Moderate — cluster + workers, local dev via Docker |
Airflow is a reasonable choice when your pipeline maps cleanly to a DAG and you already operate it. Temporal is the better choice when pipelines are long-running, involve sub-workflow fan-out, or require fine-grained retry control at the activity level without restructuring the entire DAG. The RAG ingestion case fits Temporal’s strengths: variable document counts, expensive per-step operations, and a need for partial failure detection rather than all-or-nothing reruns.
Workflow Design: Activity Decomposition
The core principle: each activity must be idempotent and independently retryable.
In practice, this means splitting the pipeline into four discrete activities:
- Fetch document — retrieve raw content from the source (S3, SharePoint, a CMS API)
- Chunk document — apply your splitting strategy, return a list of chunk dicts with text and metadata
- Embed chunk batch — call the embedding API for a batch of N chunks, return vectors
- Upsert to index — write vectors and metadata to the vector database
Never combine embedding and upsert into one activity. If the upsert fails after embedding succeeds, you want to retry only the upsert. If they are one activity, you re-embed at cost unnecessarily.
Here is the complete workflow with Pydantic models and activity retry policies:
from datetime import timedeltafrom typing import Optionalimport asyncio
from pydantic import BaseModelfrom temporalio import activity, workflowfrom temporalio.common import RetryPolicyfrom temporalio.client import Clientfrom temporalio.worker import Worker
# --- Pydantic data models ---
class DocumentRef(BaseModel): doc_id: str source_uri: str content_hash: Optional[str] = None
class Chunk(BaseModel): chunk_id: str doc_id: str text: str metadata: dict
class EmbeddedChunk(BaseModel): chunk_id: str doc_id: str embedding: list[float] metadata: dict
class IngestionResult(BaseModel): doc_id: str chunks_indexed: int status: str # "completed" | "skipped" | "failed"
class RAGIngestionInput(BaseModel): documents: list[DocumentRef] embedding_model: str = "text-embedding-3-small" chunk_size: int = 512 chunk_overlap: int = 64 batch_size: int = 32
# --- Activities ---
@activity.defnasync def fetch_document(doc_ref: DocumentRef) -> str: """Fetch raw document content from source URI.""" import httpx async with httpx.AsyncClient(timeout=30.0) as client: resp = await client.get(doc_ref.source_uri) resp.raise_for_status() return resp.text
@activity.defnasync def chunk_document( doc_id: str, content: str, chunk_size: int, chunk_overlap: int,) -> list[Chunk]: """Split document content into chunks with stable IDs.""" import hashlib from langchain_text_splitters import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap, ) texts = splitter.split_text(content)
chunks = [] for i, text in enumerate(texts): chunk_id = hashlib.sha256(f"{doc_id}:{i}:{text[:64]}".encode()).hexdigest()[:16] chunks.append(Chunk( chunk_id=chunk_id, doc_id=doc_id, text=text, metadata={"doc_id": doc_id, "chunk_index": i}, )) return chunks
@activity.defnasync def embed_chunk_batch( chunks: list[Chunk], embedding_model: str,) -> list[EmbeddedChunk]: """Embed a batch of chunks. Rate-limit aware, retryable.""" import openai import asyncio
activity.heartbeat("starting embedding batch")
client = openai.AsyncOpenAI() texts = [c.text for c in chunks]
# Respect rate limits — sleep between batches if needed await asyncio.sleep(0.1)
response = await client.embeddings.create( model=embedding_model, input=texts, )
activity.heartbeat(f"embedding complete, {len(chunks)} chunks")
return [ EmbeddedChunk( chunk_id=chunks[i].chunk_id, doc_id=chunks[i].doc_id, embedding=data.embedding, metadata=chunks[i].metadata, ) for i, data in enumerate(response.data) ]
@activity.defnasync def upsert_to_index( embedded_chunks: list[EmbeddedChunk], index_name: str,) -> int: """Upsert embedded chunks to vector index. Returns count upserted.""" import pinecone
pc = pinecone.Pinecone() index = pc.Index(index_name)
vectors = [ { "id": ec.chunk_id, "values": ec.embedding, "metadata": ec.metadata, } for ec in embedded_chunks ]
response = index.upsert(vectors=vectors) return response.upserted_count
@activity.defnasync def check_document_hash(doc_ref: DocumentRef) -> Optional[str]: """Return stored content hash for this doc, or None if not indexed.""" import asyncpg conn = await asyncpg.connect(dsn="postgresql://...") row = await conn.fetchrow( "SELECT content_hash FROM doc_index_state WHERE doc_id = $1", doc_ref.doc_id, ) await conn.close() return row["content_hash"] if row else None
@activity.defnasync def record_ingestion_state(result: IngestionResult) -> None: """Persist ingestion result to metadata store.""" import asyncpg conn = await asyncpg.connect(dsn="postgresql://...") await conn.execute( """ INSERT INTO doc_index_state (doc_id, content_hash, chunks_indexed, status, indexed_at) VALUES ($1, $2, $3, $4, NOW()) ON CONFLICT (doc_id) DO UPDATE SET content_hash = EXCLUDED.content_hash, chunks_indexed = EXCLUDED.chunks_indexed, status = EXCLUDED.status, indexed_at = EXCLUDED.indexed_at """, result.doc_id, result.status, result.chunks_indexed, result.status, ) await conn.close()
# --- Retry policies ---
FETCH_RETRY = RetryPolicy( maximum_attempts=5, initial_interval=timedelta(seconds=2), maximum_interval=timedelta(seconds=30), backoff_coefficient=2.0, non_retryable_error_types=["FileNotFoundError", "PermissionError"],)
EMBED_RETRY = RetryPolicy( maximum_attempts=8, initial_interval=timedelta(seconds=5), maximum_interval=timedelta(minutes=2), backoff_coefficient=2.0, non_retryable_error_types=["openai.BadRequestError", "openai.AuthenticationError"],)
UPSERT_RETRY = RetryPolicy( maximum_attempts=10, initial_interval=timedelta(seconds=2), maximum_interval=timedelta(minutes=1), backoff_coefficient=2.0,)
# --- Workflow ---
@workflow.defnclass RAGIngestionWorkflow: @workflow.run async def run(self, params: RAGIngestionInput) -> list[IngestionResult]: results = []
for doc_ref in params.documents: # Check if document has changed since last index run stored_hash = await workflow.execute_activity( check_document_hash, doc_ref, start_to_close_timeout=timedelta(seconds=10), retry_policy=UPSERT_RETRY, )
if stored_hash == doc_ref.content_hash: results.append(IngestionResult( doc_id=doc_ref.doc_id, chunks_indexed=0, status="skipped", )) continue
# Fetch content = await workflow.execute_activity( fetch_document, doc_ref, start_to_close_timeout=timedelta(seconds=60), retry_policy=FETCH_RETRY, )
# Chunk chunks = await workflow.execute_activity( chunk_document, args=[doc_ref.doc_id, content, params.chunk_size, params.chunk_overlap], start_to_close_timeout=timedelta(seconds=30), retry_policy=RetryPolicy(maximum_attempts=3), )
# Embed + upsert in batches total_upserted = 0 for i in range(0, len(chunks), params.batch_size): batch = chunks[i:i + params.batch_size]
embedded = await workflow.execute_activity( embed_chunk_batch, args=[batch, params.embedding_model], start_to_close_timeout=timedelta(minutes=5), heartbeat_timeout=timedelta(seconds=30), retry_policy=EMBED_RETRY, )
upserted = await workflow.execute_activity( upsert_to_index, args=[embedded, "rag-production"], start_to_close_timeout=timedelta(minutes=2), retry_policy=UPSERT_RETRY, )
total_upserted += upserted
result = IngestionResult( doc_id=doc_ref.doc_id, chunks_indexed=total_upserted, status="completed", )
await workflow.execute_activity( record_ingestion_state, result, start_to_close_timeout=timedelta(seconds=15), retry_policy=UPSERT_RETRY, )
results.append(result)
return resultsEmbedding Activity: Retry Policies and Rate Limiting
The embedding activity deserves specific attention because its failure modes are different from generic HTTP calls.
Non-retryable errors are not obvious. A 400 Bad Request from an embeddings API often means the input text exceeds the token limit for the model. Retrying it will fail again. Include the provider’s bad-request exception in non_retryable_error_types. Authentication failures should also be non-retryable. Rate limit errors should use exponential backoff with a conservative initial interval to avoid hammering a rate-limited endpoint.
Heartbeats matter for long batches. If you embed large batches or process slow documents, the activity can legitimately run longer than ordinary HTTP defaults. Without heartbeats, Temporal cannot distinguish a slow-but-healthy activity from a stuck worker. Set a heartbeat timeout and call activity.heartbeat() after each sub-batch or processing checkpoint. This gives Temporal a liveness signal without requiring the activity to complete.
Batch size interacts with cost. Larger batches reduce API call overhead but create coarser retry granularity. If a large batch fails late, the whole batch may be re-embedded on retry. Choose a batch size that balances call overhead against retry cost for the embedding model and provider limits you use.
Index Management: Partial Updates and Consistency
Keeping the vector index consistent with the document corpus is the hardest part of RAG pipeline operations. Three scenarios cause index drift:
Partial upserts. If a network error interrupts an upsert, some vectors from a batch may have written and some may not. Because upsert operations are typically not transactional in vector databases, you cannot roll back the partial write. The correct approach is to make upserts idempotent: use stable chunk IDs (derived from document ID and chunk position, not random UUIDs), so that re-upserting the same chunk on retry overwrites rather than duplicates.
Document updates. When a source document changes, the stale chunk vectors must be deleted before or alongside the new chunks being written. Do not rely on overwriting by ID alone — if a document edit reduces chunk count, orphaned chunk vectors can remain indexed and surface in retrieval. Track the full list of chunk IDs per document in a metadata store, and on update, delete the full prior set before upserting the new set.
Document deletes. When a document is removed from the source, its chunks must be purged from the index. Without a delete signal, the vectors remain indefinitely. Build a delete activity that accepts a document ID, retrieves the associated chunk IDs from the metadata store, deletes them from the vector index by ID, and removes the metadata record.
Monitoring Pipeline Health
A Temporal workflow gives you execution history for free. Every activity start, completion, failure, and retry is recorded in the workflow history and visible in the Temporal UI. This handles the “did the pipeline run” question.
The harder monitoring question is “did the pipeline produce a correct index.” Four metrics matter:
Ingestion lag — the time between a document being created or modified in the source and its embeddings landing in the index. This is the freshness window. Measure it by comparing document updated_at timestamps in the source with indexed_at timestamps in your metadata store.
Chunk coverage — for each document, the ratio of expected chunks (from chunking) to confirmed-indexed chunks (from upsert counts). Any document where coverage is less than 1.0 has partial indexing. A nightly scan across your metadata store that flags coverage gaps is more reliable than inferring health from API error rates.
Embedding error rate by document type — PDFs with complex layouts, scanned documents, and very short documents (under 50 tokens) fail at higher rates. Tracking error rate by MIME type and source lets you identify structurally problematic document categories rather than treating all failures as equivalent.
Index-metadata agreement — periodically sample a set of document IDs from your metadata store and verify that the expected chunk IDs exist in the vector index. This catches cases where the metadata write succeeded but the vector write failed silently.
Re-indexing at Scale
Full corpus re-indexing is required when you switch embedding models, change chunking strategy, or need to recover from index corruption. For large corpora, sequential re-indexing may be too slow. The Temporal pattern for this is child workflows.
The parent workflow accepts the full document list and fans out — spawning one child workflow per document (or per batch of N documents, depending on corpus size). Each child workflow runs the full fetch-chunk-embed-upsert sequence for its slice. The parent collects results and surfaces any child failures without blocking successful children.
@workflow.defnclass RAGBulkReindexWorkflow: @workflow.run async def run(self, doc_refs: list[DocumentRef]) -> dict: batch_size = 50 child_handles = []
for i in range(0, len(doc_refs), batch_size): batch = doc_refs[i:i + batch_size] handle = await workflow.start_child_workflow( RAGIngestionWorkflow, RAGIngestionInput(documents=batch), id=f"rag-reindex-batch-{i // batch_size}", execution_timeout=timedelta(hours=2), ) child_handles.append(handle)
results = await asyncio.gather(*[h.result() for h in child_handles])
total_indexed = sum( r.chunks_indexed for batch_results in results for r in batch_results if r.status == "completed" ) failed_docs = [ r.doc_id for batch_results in results for r in batch_results if r.status == "failed" ]
return {"total_indexed": total_indexed, "failed_docs": failed_docs}Child workflows run concurrently up to Temporal’s configured parallelism limit. Each child’s history is independent — a failure in one batch does not block others, and individual child workflows can be retried without re-running the entire corpus.
The deeper issue is that most teams treat the vector index as an immutable artifact once it is built — they think about ingestion once at setup, not as an ongoing operational concern. That framing is wrong. The index is a derived cache of your document corpus. Anything that changes the corpus — document edits, model upgrades, chunking strategy changes — invalidates some or all of that cache. A production RAG system needs an ingestion pipeline with the same operational maturity as a data warehouse ETL: idempotent, observable, and independently re-runnable on any subset of the corpus without manual intervention.
Pre-Deploy Checklist for RAG Ingestion with Temporal
- Each activity is idempotent — re-running with the same inputs produces the same index state without duplicates
- Chunk IDs are derived deterministically from document ID and content, not generated randomly at runtime
- Non-retryable error types are explicitly listed in each activity's retry policy (especially for embedding API 400 and 401 responses)
- Heartbeat calls are present in any embedding or fetch activity that may run longer than ordinary request timeouts
- Document deletes trigger chunk vector deletion by ID list, not metadata filter scan
- Chunk coverage monitoring is in place — a nightly scan that flags documents with coverage below 1.0
- The bulk re-index workflow is tested against a representative corpus slice before the production run
Related Reading
The retry policy patterns in this post extend the approach in Temporal Activity Retry Patterns for LLM API Calls — particularly the non-retryable error classification and heartbeat timeout configuration. The ingestion architecture connects directly to the broader enterprise challenge covered in Enterprise RAG Beyond the Demo, where ACL propagation and document churn compound the consistency problems that durable execution addresses at the infrastructure layer. For a full checklist of production RAG concerns beyond ingestion, see The Production-Ready RAG Pipeline.
For a broader introduction to Temporal in AI systems, Temporal for Durable AI Agents covers the execution model and worker architecture that this ingestion design builds on.
Why do standard RAG ingestion pipelines fail silently?
Most RAG ingestion pipelines run as scripts or cron jobs with no execution durability. If a network timeout occurs mid-embedding, or a vector database upsert partially completes, there is no record of which chunks succeeded and which did not. The process exits with either no error or a generic exception, and the index is left in an inconsistent state. Documents appear indexed because their metadata was written, but the underlying embeddings for some chunks were never stored. Queries that should retrieve those chunks return nothing, with no visible signal that retrieval is broken.
What is the correct unit of work for a Temporal activity in a RAG pipeline?
Each activity should correspond to one atomic, independently retryable operation: fetching a document, chunking it, embedding a single batch, or upserting one batch to the index. Grouping multiple operations into one activity defeats Temporal's retry model — if embedding succeeds but the upsert fails, you want to retry only the upsert, not re-embed at cost. Keeping activities granular means each retry is scoped to the actual failure point. The workflow function composes these activities in sequence and tracks completion state across retries and worker restarts.
How should partial re-indexing work when source documents change?
Track a content hash per document alongside its chunk IDs in a metadata store. When a document changes, compute the new hash, identify which chunks changed, delete the stale chunk vectors from the index by ID, embed and upsert only the changed chunks, and update the stored hash and chunk manifest. This avoids full re-indexing on minor edits. For large corpora, a Temporal workflow can process changed documents in parallel child workflows, each scoped to one document, with the parent workflow coordinating overall progress and surfacing any failures.
What monitoring signals matter most for a RAG ingestion pipeline?
Four signals matter: ingestion lag (time between a document being created or updated and its embeddings landing in the index), chunk coverage (ratio of expected chunks to indexed chunks per document — gaps indicate silent partial failures), embedding error rate by document type and source, and index consistency check results (periodic scans that verify metadata and vector counts agree). Temporal's built-in workflow history gives you retry counts and activity failure details without custom instrumentation. For the index, a nightly coverage scan that writes results to a monitoring table is more reliable than inferring health from API error rates alone.
The decision rule
Building a RAG pipeline that breaks silently is not a code quality problem; it is an architecture choice. If your current ingestion pipeline runs as a script with no durable execution guarantee, partial failures are invisible by design. Review ingestion durability, index consistency guarantees, embedding failure paths, and recovery visibility before retrieval quality incidents make the gap visible. The Enterprise Agentic Assessment Kit can structure that first review.