· 6 min read
The Sequential Pipeline pattern processes work through a series of ordered stages, where each stage receives input from its predecessor and produces output for its successor. This linear flow ensures that data is progressively refined, validated, and enriched as it moves through the pipeline. No stage operates without the full context of everything that came before it.
Each agent in the pipeline has a narrowly defined responsibility: extract, validate, transform, enrich, or load. This specialization means agents can be tuned for their specific task without compromising on other concerns. The extraction agent does not worry about business logic validation; the transformation agent does not concern itself with source connectivity.
The strict ordering also provides built-in error isolation. If corrupted data enters the pipeline, the validation stage catches it before it propagates to transformation and enrichment. Each stage can reject or quarantine problematic records, producing a clean stream for downstream stages and a separate error stream for human review.
Data processing is defined by dependencies between stages. You cannot validate data you have not yet extracted. You cannot apply business transformations to data that has not been validated. You cannot enrich records that have not been standardized. Each stage fundamentally depends on the completed output of the previous one.
Attempting to parallelize these stages creates data integrity risks. If transformation runs concurrently with validation, some records may be transformed before their quality is confirmed, leading to corrupted outputs that are difficult to trace. The Sequential Pipeline eliminates this class of errors by ensuring every record passes through every gate in order.
The pattern also maps naturally to how data engineering teams think about pipelines. ETL (Extract, Transform, Load) and ELT (Extract, Load, Transform) are inherently sequential concepts. By assigning each stage to a specialized agent, you get the reliability of a sequential pipeline with the flexibility of AI-powered processing that can handle schema variations, unstructured data, and complex transformation logic without brittle hard-coded rules.
Ingestion Agent — Connects to source systems and extracts raw data in its original format. This agent handles connection management, pagination, rate limiting, and retry logic. It supports multiple source types (APIs, databases, file uploads, streaming feeds) and produces a standardized raw data package with source metadata including extraction timestamp, record count, and source system version.
Validation Agent — Receives the raw data package and applies comprehensive quality checks. This agent verifies schema conformance (required fields present, data types correct), checks referential integrity (foreign keys resolve, enum values are valid), detects anomalies (values outside expected ranges, sudden volume changes), and flags duplicates. It produces a validated dataset with a quality report detailing pass/fail rates per check and quarantined records with failure reasons.
Transformation Agent — Takes the validated dataset and applies business logic transformations. This agent normalizes units and currencies, maps source-specific codes to standard taxonomies, calculates derived fields (profit margins from revenue and cost, tenure from hire date), applies conditional logic (customer segmentation rules, risk tier assignments), and restructures records to match the target schema. It documents every transformation applied to maintain an audit trail.
Enrichment Agent — Receives the transformed dataset and augments it with additional context from reference data and external sources. This agent resolves company names to canonical identifiers, appends geographic data from postal codes, adds industry classifications from company registries, attaches historical context (prior period values for trend calculation), and computes aggregate metrics. Each enrichment is tagged with its source and confidence level.
Load and Index Agent — Takes the enriched dataset and writes it to the target systems. This agent handles schema migration if the target structure has changed, manages upsert logic (insert new records, update existing ones), updates indexes and materialized views, triggers downstream notifications, and produces a load confirmation report with record counts, load duration, and any write failures.
Step 1 — Source extraction. The Ingestion Agent connects to the configured data sources and pulls the latest data. For an API source, it handles authentication, paginates through all available records, respects rate limits, and retries transient failures. It packages the raw response data with metadata: 12,847 records extracted from Salesforce Opportunities API at 2026-05-02T08:00:00Z, covering modifications since the last extraction checkpoint.
Step 2 — Quality verification. The Validation Agent runs the raw dataset through thirty-two quality checks. It verifies that every opportunity record has a required stage value from the allowed set, that close dates are not in the past for open opportunities, that amount fields are non-negative, and that each account ID references an existing account record. It passes 12,614 clean records downstream and quarantines 233 records with a detailed failure report.
Step 3 — Business transformation. The Transformation Agent applies the business rules to the validated dataset. It converts all currency amounts to USD using the daily exchange rate, maps Salesforce stage names to the internal pipeline stage taxonomy, calculates weighted pipeline value (amount multiplied by stage probability), derives the days-in-stage metric, and segments opportunities by deal size tier. Each transformation is logged with before-and-after values.
Step 4 — Contextual enrichment. The Enrichment Agent augments each record with reference data. It resolves account IDs to canonical company profiles with industry and size classifications, appends geographic region from the account's billing address, attaches the historical win rate for each sales representative, adds the average deal cycle length for the opportunity's segment, and flags any account with overdue invoices in the finance system.
Step 5 — Target loading. The Load and Index Agent writes the enriched dataset to the analytics data warehouse. It detects that two new columns were added by the enrichment stage and applies the schema migration automatically. It upserts all 12,614 records, updates the pipeline analytics materialized views, refreshes the executive dashboard cache, and sends a completion webhook to the reporting system.
Step 6 — Pipeline completion report. The pipeline produces a comprehensive run report: total records extracted (12,847), records quarantined (233, 1.8%), transformations applied (six rules across all records), enrichments added (five per record), records loaded (12,614), load duration (47 seconds), and downstream systems notified (three).
The final data processing pipeline report would contain the following:
Run Summary — Pipeline execution ID, start and end timestamps, total duration (4 minutes 12 seconds), source system (Salesforce Production), target system (Snowflake Analytics Warehouse), and overall status (Completed with Warnings).
Extraction Report — Records extracted by object type, API calls made, rate limit utilization (72% of quota), data volume transferred (18.4 MB), and extraction window (records modified between 2026-05-01T08:00:00Z and 2026-05-02T08:00:00Z).
Quality Report — Thirty-two checks executed with pass rates. Schema conformance: 99.8% (24 records missing required fields). Referential integrity: 99.2% (102 records with orphaned account IDs). Anomaly detection: 99.1% (107 records with amounts exceeding three standard deviations from the mean). Overall quarantine rate: 1.8%.
Transformation Log — Six transformations applied with statistics. Currency normalization: 3,241 non-USD records converted. Stage mapping: all records mapped successfully with zero unmapped values. Weighted value calculation: mean weighted value $127,400, median $84,200. Size tier segmentation: 42% SMB, 35% Mid-Market, 23% Enterprise.
Enrichment Summary — Five enrichment types applied. Company resolution: 99.4% match rate (76 unresolved accounts flagged for manual review). Geographic tagging: 100% coverage. Historical win rate: attached for 98.7% of records (16 new reps without history). Overdue invoice flag: 847 opportunities linked to accounts with outstanding balances.
Load Confirmation — Schema migration applied (two columns added), 12,614 records upserted (8,903 updates, 3,711 inserts), three materialized views refreshed, downstream webhook delivered successfully to reporting, alerting, and CRM sync systems.