Loading
Extract data from a REST API, transform it with Python, load it into PostgreSQL, schedule with cron, and handle errors with retries and dead letter queues.
ETL (Extract, Transform, Load) pipelines are the backbone of data engineering. They pull raw data from sources, clean and reshape it, and load it into a destination where analysts and applications can use it. In this tutorial, you'll build a production-grade ETL pipeline in Python that extracts data from a REST API, transforms it with validation and enrichment, loads it into PostgreSQL, runs on a schedule, and handles failures gracefully with retries and dead letter logging.
What you'll learn:
The pipeline extracts user activity data from a mock API, normalizes timestamps, computes derived metrics, and loads everything into a reporting table.
Create the project structure:
Set up .env:
For cron-based scheduling, add to your crontab:
Run the pipeline manually with python run.py run. Check dead letters with python run.py dead-letters. Start the built-in scheduler with python run.py schedule 300 for a five-minute interval. The pipeline tracks every run, retries transient failures automatically, and preserves unprocessable records for inspection. From here, extend it with parallel extraction, incremental loading using watermarks, or a web dashboard querying the pipeline_runs table.
etl-pipeline/
├── .env
├── config.py
├── extract.py
├── transform.py
├── load.py
├── pipeline.py
├── retry.py
├── dead_letter.py
└── scheduler.pyDATABASE_URL=postgresql://user:password@localhost:5432/etl_db
API_BASE_URL=https://jsonplaceholder.typicode.com
BATCH_SIZE=100
MAX_RETRIES=3mkdir etl-pipeline && cd etl-pipeline
python -m venv venv
source venv/bin/activate
pip install psycopg2-binary requests python-dotenv-- schema.sql
CREATE TABLE IF NOT EXISTS user_activities (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL,
post_id INTEGER NOT NULL,
activity_type VARCHAR(50) NOT NULL,
title TEXT,
body TEXT,
word_count INTEGER,
extracted_at TIMESTAMP NOT NULL,
loaded_at TIMESTAMP DEFAULT NOW(),
UNIQUE(user_id, post_id, activity_type)
);
CREATE TABLE IF NOT EXISTS pipeline_runs (
id SERIAL PRIMARY KEY,
started_at TIMESTAMP NOT NULL,
completed_at TIMESTAMP,
status VARCHAR(20) NOT NULL,
records_extracted INTEGER DEFAULT 0,
records_loaded INTEGER DEFAULT 0,
errors INTEGER DEFAULT 0,
error_message TEXT
);
CREATE TABLE IF NOT EXISTS dead_letters (
id SERIAL PRIMARY KEY,
pipeline_run_id INTEGER REFERENCES pipeline_runs(id),
raw_data JSONB NOT NULL,
error_message TEXT NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX idx_activities_user ON user_activities(user_id);
CREATE INDEX idx_pipeline_status ON pipeline_runs(status);# Run ETL every hour
0 * * * * cd /path/to/etl-pipeline && /path/to/venv/bin/python run.py run >> /var/log/etl.log 2>&1# config.py
import os
from dotenv import load_dotenv
load_dotenv()
DATABASE_URL = os.environ["DATABASE_URL"]
API_BASE_URL = os.environ["API_BASE_URL"]
BATCH_SIZE = int(os.environ.get("BATCH_SIZE", "100"))
MAX_RETRIES = int(os.environ.get("MAX_RETRIES", "3"))# extract.py
import time
import requests
from typing import Generator
from config import API_BASE_URL
def extract_posts(page_size: int = 20) -> Generator[dict, None, None]:
"""Extract posts from the API with pagination and rate limiting."""
page = 1
while True:
url = f"{API_BASE_URL}/posts"
params = {"_page": page, "_limit": page_size}
response = requests.get(url, params=params, timeout=30)
response.raise_for_status()
data = response.json()
if not data:
break
for record in data:
yield record
# Respect rate limits
total = int(response.headers.get("x-total-count", 0))
if page * page_size >= total:
break
page += 1
time.sleep(0.5) # Rate limiting
def extract_comments_for_post(post_id: int) -> list[dict]:
"""Extract comments for a specific post."""
url = f"{API_BASE_URL}/posts/{post_id}/comments"
response = requests.get(url, timeout=30)
response.raise_for_status()
return response.json()
def extract_all() -> list[dict]:
"""Extract all posts with their comment counts."""
records = []
for post in extract_posts():
comments = extract_comments_for_post(post["id"])
post["comment_count"] = len(comments)
records.append(post)
time.sleep(0.2)
return records# transform.py
from datetime import datetime, timezone
from typing import Optional
class TransformError(Exception):
def __init__(self, message: str, record: dict):
super().__init__(message)
self.record = record
def validate_record(record: dict) -> bool:
"""Validate that a record has all required fields."""
required = ["id", "userId", "title", "body"]
return all(key in record for key in required)
def compute_word_count(text: str) -> int:
"""Count words in a text string."""
return len(text.split()) if text else 0
def normalize_text(text: str) -> str:
"""Clean and normalize text content."""
return " ".join(text.strip().split())
def transform_record(record: dict) -> Optional[dict]:
"""Transform a raw API record into the target schema.
Returns None if the record should be skipped.
Raises TransformError for invalid records.
"""
if not validate_record(record):
raise TransformError(f"Missing required fields in record {record.get('id')}", record)
title = normalize_text(record["title"])
body = normalize_text(record["body"])
if len(title) < 3:
raise TransformError(f"Title too short: '{title}'", record)
return {
"user_id": int(record["userId"]),
"post_id": int(record["id"]),
"activity_type": "post",
"title": title.title(),
"body": body,
"word_count": compute_word_count(body),
"extracted_at": datetime.now(timezone.utc).isoformat(),
}
def transform_batch(records: list[dict]) -> tuple[list[dict], list[tuple[dict, str]]]:
"""Transform a batch of records.
Returns (transformed_records, failed_records_with_errors).
"""
transformed = []
failures = []
for record in records:
try:
result = transform_record(record)
if result is not None:
transformed.append(result)
except TransformError as e:
failures.append((e.record, str(e)))
return transformed, failures# load.py
import psycopg2
from psycopg2.extras import execute_values
from config import DATABASE_URL, BATCH_SIZE
def get_connection():
return psycopg2.connect(DATABASE_URL)
def ensure_schema(conn) -> None:
"""Create tables if they don't exist."""
with open("schema.sql", "r") as f:
with conn.cursor() as cur:
cur.execute(f.read())
conn.commit()
def load_records(conn, records: list[dict]) -> int:
"""Bulk upsert records into the target table.
Returns the number of rows affected.
"""
if not records:
return 0
sql = """
INSERT INTO user_activities (user_id, post_id, activity_type, title, body, word_count, extracted_at)
VALUES %s
ON CONFLICT (user_id, post_id, activity_type)
DO UPDATE SET
title = EXCLUDED.title,
body = EXCLUDED.body,
word_count = EXCLUDED.word_count,
extracted_at = EXCLUDED.extracted_at,
loaded_at = NOW()
"""
total_loaded = 0
with conn.cursor() as cur:
for i in range(0, len(records), BATCH_SIZE):
batch = records[i : i + BATCH_SIZE]
values = [
(r["user_id"], r["post_id"], r["activity_type"], r["title"], r["body"], r["word_count"], r["extracted_at"])
for r in batch
]
execute_values(cur, sql, values)
total_loaded += len(batch)
conn.commit()
return total_loaded# retry.py
import time
import functools
from typing import TypeVar, Callable
T = TypeVar("T")
def with_retry(
max_attempts: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
retryable_exceptions: tuple = (Exception,),
) -> Callable:
"""Decorator that retries a function with exponential backoff."""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> T:
last_exception = None
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except retryable_exceptions as e:
last_exception = e
if attempt == max_attempts:
break
delay = min(base_delay * (2 ** (attempt - 1)), max_delay)
print(f"[retry] Attempt {attempt}/{max_attempts} failed: {e}")
print(f"[retry] Retrying in {delay:.1f}s...")
time.sleep(delay)
raise last_exception
return wrapper
return decorator# dead_letter.py
import json
from load import get_connection
def write_dead_letters(pipeline_run_id: int, failures: list[tuple[dict, str]]) -> int:
"""Write failed records to the dead letter table for later inspection."""
if not failures:
return 0
conn = get_connection()
try:
with conn.cursor() as cur:
for record, error in failures:
cur.execute(
"INSERT INTO dead_letters (pipeline_run_id, raw_data, error_message) VALUES (%s, %s, %s)",
(pipeline_run_id, json.dumps(record), error),
)
conn.commit()
return len(failures)
finally:
conn.close()
def get_dead_letters(limit: int = 50) -> list[dict]:
"""Retrieve recent dead letter records for debugging."""
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
"SELECT id, pipeline_run_id, raw_data, error_message, created_at FROM dead_letters ORDER BY created_at DESC LIMIT %s",
(limit,),
)
rows = cur.fetchall()
return [
{"id": r[0], "pipeline_run_id": r[1], "raw_data": r[2], "error_message": r[3], "created_at": str(r[4])}
for r in rows
]
finally:
conn.close()# pipeline.py
from datetime import datetime, timezone
from extract import extract_all
from transform import transform_batch
from load import get_connection, ensure_schema, load_records
from dead_letter import write_dead_letters
from retry import with_retry
from config import MAX_RETRIES
@with_retry(max_attempts=MAX_RETRIES, retryable_exceptions=(ConnectionError, TimeoutError))
def run_extraction() -> list[dict]:
return extract_all()
def run_pipeline() -> dict:
"""Execute the full ETL pipeline with tracking."""
conn = get_connection()
ensure_schema(conn)
# Create pipeline run record
run_id = create_run(conn)
started_at = datetime.now(timezone.utc)
try:
# Extract
print("[extract] Starting extraction...")
raw_records = run_extraction()
print(f"[extract] Got {len(raw_records)} records")
update_run(conn, run_id, records_extracted=len(raw_records))
# Transform
print("[transform] Transforming records...")
transformed, failures = transform_batch(raw_records)
print(f"[transform] Transformed: {len(transformed)}, Failed: {len(failures)}")
# Load
print("[load] Loading into database...")
loaded = load_records(conn, transformed)
print(f"[load] Loaded {loaded} records")
# Dead letters
if failures:
dl_count = write_dead_letters(run_id, failures)
print(f"[dead-letter] Wrote {dl_count} failed records")
# Finalize
complete_run(conn, run_id, "success", loaded, len(failures))
return {
"status": "success",
"extracted": len(raw_records),
"loaded": loaded,
"errors": len(failures),
"duration_seconds": (datetime.now(timezone.utc) - started_at).total_seconds(),
}
except Exception as e:
complete_run(conn, run_id, "failed", 0, 1, str(e))
raise
finally:
conn.close()
def create_run(conn) -> int:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO pipeline_runs (started_at, status) VALUES (%s, %s) RETURNING id",
(datetime.now(timezone.utc), "running"),
)
run_id = cur.fetchone()[0]
conn.commit()
return run_id
def update_run(conn, run_id: int, **kwargs) -> None:
sets = ", ".join(f"{k} = %s" for k in kwargs)
values = list(kwargs.values()) + [run_id]
with conn.cursor() as cur:
cur.execute(f"UPDATE pipeline_runs SET {sets} WHERE id = %s", values)
conn.commit()
def complete_run(conn, run_id: int, status: str, loaded: int, errors: int, error_msg: str = None) -> None:
with conn.cursor() as cur:
cur.execute(
"UPDATE pipeline_runs SET completed_at = %s, status = %s, records_loaded = %s, errors = %s, error_message = %s WHERE id = %s",
(datetime.now(timezone.utc), status, loaded, errors, error_msg, run_id),
)
conn.commit()# scheduler.py
import time
import signal
import sys
from datetime import datetime
from pipeline import run_pipeline
class PipelineScheduler:
def __init__(self, interval_seconds: int = 3600):
self.interval = interval_seconds
self.running = True
signal.signal(signal.SIGINT, self._shutdown)
signal.signal(signal.SIGTERM, self._shutdown)
def _shutdown(self, signum, frame):
print(f"\n[scheduler] Received signal {signum}, shutting down...")
self.running = False
def start(self) -> None:
print(f"[scheduler] Starting with {self.interval}s interval")
while self.running:
print(f"\n[scheduler] Pipeline run starting at {datetime.now().isoformat()}")
try:
result = run_pipeline()
print(f"[scheduler] Complete: {result}")
except Exception as e:
print(f"[scheduler] Pipeline failed: {e}")
# Sleep in small increments so we can respond to signals
for _ in range(self.interval):
if not self.running:
break
time.sleep(1)
print("[scheduler] Stopped")
if __name__ == "__main__":
interval = int(sys.argv[1]) if len(sys.argv) > 1 else 3600
scheduler = PipelineScheduler(interval)
scheduler.start()# run.py
import sys
from pipeline import run_pipeline
from dead_letter import get_dead_letters
def main() -> None:
command = sys.argv[1] if len(sys.argv) > 1 else "run"
if command == "run":
result = run_pipeline()
print(f"\nPipeline result: {result}")
elif command == "dead-letters":
letters = get_dead_letters(limit=20)
for dl in letters:
print(f"[{dl['created_at']}] Run #{dl['pipeline_run_id']}: {dl['error_message']}")
print(f" Data: {dl['raw_data']}\n")
elif command == "schedule":
from scheduler import PipelineScheduler
interval = int(sys.argv[2]) if len(sys.argv) > 2 else 3600
PipelineScheduler(interval).start()
else:
print("Usage: python run.py [run|dead-letters|schedule [interval_seconds]]")
if __name__ == "__main__":
main()