Loading
Create a streaming data processor with event ingestion, windowed aggregation, backpressure handling, multiple output sinks, and real-time metrics.
Batch processing waits until all data arrives before computing results. Streaming processors handle data as it arrives — event by event, in real time. This matters for dashboards, alerting, fraud detection, and any system where latency is the enemy. In this tutorial, you'll build a streaming data processor in Node.js that ingests events from multiple sources, performs windowed aggregation, handles backpressure when consumers can't keep up, routes results to multiple output sinks, and exposes real-time metrics.
What you'll learn:
The system processes a continuous stream of simulated sensor readings, computing averages, maximums, and alert counts per time window.
Define the core event types in src/types.ts:
When push() returns false, the stream's internal buffer is full. We stop generating events until the consumer calls _read() again — that's backpressure in action.
A tumbling window collects events for a fixed duration, then emits one aggregated result per sensor per window. Windows don't overlap — when one closes, the next opens immediately.
Route window results to multiple sinks simultaneously:
Start the processor:
You'll see windowed aggregation results streaming to the console every 5 seconds. The metrics endpoint at http://localhost:9090/metrics returns live processing stats. Results are also appended to output.jsonl for offline analysis.
Test backpressure by increasing EVENTS_PER_SECOND to 5000 — watch the dropped event count rise in the metrics as the filter sheds load. Decrease the window size to 1000ms for faster feedback.
To query the output file:
The pipeline demonstrates every core concept in stream processing: sourcing, filtering, windowing, fan-out, backpressure, and observability. From here, you could add sliding windows (overlapping time intervals), event-time watermarks for handling late data, exactly-once processing with checkpointing, or a WebSocket sink for real-time dashboard updates.
mkdir stream-processor && cd stream-processor
npm init -y
npm install -D typescript @types/node tsx// tsconfig.json
{
"compilerOptions": {
"target": "ES2022",
"module": "NodeNext",
"moduleResolution": "NodeNext",
"strict": true,
"outDir": "dist",
"rootDir": "src"
},
"include": ["src"]
}export interface SensorEvent {
sensorId: string;
timestamp: number;
value: number;
unit: string;
}
export interface WindowResult {
windowStart: number;
windowEnd: number;
sensorId: string;
count: number;
sum: number;
avg: number;
min: number;
max: number;
alertCount: number;
}
export interface ProcessorMetrics {
eventsReceived: number;
eventsProcessed: number;
eventsDropped: number;
windowsEmitted: number;
backpressureEvents: number;
uptimeMs: number;
}// src/source.ts
import { Readable, ReadableOptions } from "node:stream";
import { SensorEvent } from "./types.js";
export class SensorSource extends Readable {
private sensorIds: string[];
private intervalMs: number;
private timer: NodeJS.Timeout | null = null;
constructor(sensorIds: string[], eventsPerSecond: number, opts?: ReadableOptions) {
super({ ...opts, objectMode: true, highWaterMark: 64 });
this.sensorIds = sensorIds;
this.intervalMs = 1000 / eventsPerSecond;
}
_read(): void {
if (this.timer) return;
this.timer = setInterval(() => {
const sensorId = this.sensorIds[Math.floor(Math.random() * this.sensorIds.length)];
const event: SensorEvent = {
sensorId,
timestamp: Date.now(),
value: Math.random() * 100 + Math.sin(Date.now() / 10000) * 20,
unit: "celsius",
};
// push() returns false when the internal buffer is full — backpressure signal
const canContinue = this.push(event);
if (!canContinue) {
clearInterval(this.timer!);
this.timer = null;
}
}, this.intervalMs);
}
_destroy(): void {
if (this.timer) {
clearInterval(this.timer);
this.timer = null;
}
}
}// src/window.ts
import { Transform, TransformCallback } from "node:stream";
import { SensorEvent, WindowResult } from "./types.js";
interface WindowState {
events: SensorEvent[];
start: number;
}
export class TumblingWindow extends Transform {
private windowMs: number;
private windows: Map<string, WindowState> = new Map();
private alertThreshold: number;
private flushTimer: NodeJS.Timeout;
constructor(windowMs: number, alertThreshold: number = 80) {
super({ objectMode: true, highWaterMark: 32 });
this.windowMs = windowMs;
this.alertThreshold = alertThreshold;
// Periodically check for windows that should close
this.flushTimer = setInterval(() => this.flushExpiredWindows(), windowMs / 2);
}
_transform(event: SensorEvent, _encoding: string, callback: TransformCallback): void {
const windowStart = Math.floor(event.timestamp / this.windowMs) * this.windowMs;
const key = `${event.sensorId}:${windowStart}`;
let window = this.windows.get(key);
if (!window) {
window = { events: [], start: windowStart };
this.windows.set(key, window);
}
window.events.push(event);
callback();
}
private flushExpiredWindows(): void {
const now = Date.now();
const cutoff = Math.floor(now / this.windowMs) * this.windowMs;
for (const [key, window] of this.windows) {
if (window.start + this.windowMs <= cutoff) {
const result = this.aggregate(key.split(":")[0], window);
this.push(result);
this.windows.delete(key);
}
}
}
private aggregate(sensorId: string, window: WindowState): WindowResult {
const values = window.events.map((e) => e.value);
const sum = values.reduce((a, b) => a + b, 0);
const alertCount = values.filter((v) => v > this.alertThreshold).length;
return {
windowStart: window.start,
windowEnd: window.start + this.windowMs,
sensorId,
count: values.length,
sum,
avg: sum / values.length,
min: Math.min(...values),
max: Math.max(...values),
alertCount,
};
}
_flush(callback: TransformCallback): void {
clearInterval(this.flushTimer);
// Flush all remaining windows
for (const [key, window] of this.windows) {
const result = this.aggregate(key.split(":")[0], window);
this.push(result);
}
this.windows.clear();
callback();
}
}// src/filter.ts
import { Transform, TransformCallback } from "node:stream";
import { SensorEvent } from "./types.js";
export class EventFilter extends Transform {
private droppedCount: number = 0;
private maxQueueSize: number;
constructor(maxQueueSize: number = 1000) {
super({ objectMode: true, highWaterMark: 16 });
this.maxQueueSize = maxQueueSize;
}
_transform(event: SensorEvent, _encoding: string, callback: TransformCallback): void {
// Drop events with invalid data rather than crashing
if (typeof event.value !== "number" || isNaN(event.value)) {
this.droppedCount++;
return callback();
}
// Check if downstream is applying backpressure
if (this.writableLength > this.maxQueueSize) {
this.droppedCount++;
return callback();
}
callback(null, event);
}
getDroppedCount(): number {
return this.droppedCount;
}
}// src/sinks.ts
import { Writable } from "node:stream";
import { appendFileSync } from "node:fs";
import { WindowResult } from "./types.js";
export class ConsoleSink extends Writable {
constructor() {
super({ objectMode: true });
}
_write(result: WindowResult, _encoding: string, callback: (error?: Error | null) => void): void {
const time = new Date(result.windowStart).toISOString();
const alert = result.alertCount > 0 ? ` [${result.alertCount} ALERTS]` : "";
console.log(
`[${time}] ${result.sensorId}: avg=${result.avg.toFixed(1)} min=${result.min.toFixed(1)} max=${result.max.toFixed(1)} count=${result.count}${alert}`
);
callback();
}
}
export class FileSink extends Writable {
private filePath: string;
constructor(filePath: string) {
super({ objectMode: true });
this.filePath = filePath;
}
_write(result: WindowResult, _encoding: string, callback: (error?: Error | null) => void): void {
try {
appendFileSync(this.filePath, JSON.stringify(result) + "\n");
callback();
} catch (err) {
callback(err as Error);
}
}
}
export class WebhookSink extends Writable {
private url: string;
private batchSize: number;
private buffer: WindowResult[] = [];
constructor(url: string, batchSize: number = 10) {
super({ objectMode: true });
this.url = url;
this.batchSize = batchSize;
}
_write(result: WindowResult, _encoding: string, callback: (error?: Error | null) => void): void {
this.buffer.push(result);
if (this.buffer.length >= this.batchSize) {
this.flush()
.then(() => callback())
.catch((err) => callback(err));
} else {
callback();
}
}
private async flush(): Promise<void> {
if (this.buffer.length === 0) return;
const batch = this.buffer.splice(0);
try {
await fetch(this.url, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(batch),
});
} catch (err) {
console.error(`[webhook] Failed to send batch: ${(err as Error).message}`);
// Put events back for retry
this.buffer.unshift(...batch);
}
}
_final(callback: (error?: Error | null) => void): void {
this.flush()
.then(() => callback())
.catch((err) => callback(err));
}
}// src/fanout.ts
import { Writable } from "node:stream";
export class FanOut extends Writable {
private sinks: Writable[];
constructor(sinks: Writable[]) {
super({ objectMode: true });
this.sinks = sinks;
}
_write(chunk: unknown, encoding: string, callback: (error?: Error | null) => void): void {
let pending = this.sinks.length;
let hasErrored = false;
for (const sink of this.sinks) {
sink.write(chunk, encoding, (err) => {
if (err && !hasErrored) {
hasErrored = true;
callback(err);
return;
}
pending--;
if (pending === 0 && !hasErrored) {
callback();
}
});
}
}
_final(callback: (error?: Error | null) => void): void {
let pending = this.sinks.length;
for (const sink of this.sinks) {
sink.end(() => {
pending--;
if (pending === 0) callback();
});
}
}
}// src/metrics.ts
import { ProcessorMetrics } from "./types.js";
export class MetricsCollector {
private startTime: number = Date.now();
private counters: ProcessorMetrics = {
eventsReceived: 0,
eventsProcessed: 0,
eventsDropped: 0,
windowsEmitted: 0,
backpressureEvents: 0,
uptimeMs: 0,
};
increment(key: keyof Omit<ProcessorMetrics, "uptimeMs">): void {
this.counters[key]++;
}
add(key: keyof Omit<ProcessorMetrics, "uptimeMs">, value: number): void {
this.counters[key] += value;
}
getMetrics(): ProcessorMetrics {
return {
...this.counters,
uptimeMs: Date.now() - this.startTime,
};
}
report(): string {
const m = this.getMetrics();
const uptime = (m.uptimeMs / 1000).toFixed(0);
const rate = m.eventsProcessed / (m.uptimeMs / 1000);
return [
`=== Metrics (${uptime}s uptime) ===`,
` Received: ${m.eventsReceived}`,
` Processed: ${m.eventsProcessed}`,
` Dropped: ${m.eventsDropped}`,
` Windows: ${m.windowsEmitted}`,
` Backpressure: ${m.backpressureEvents}`,
` Rate: ${rate.toFixed(1)} events/sec`,
].join("\n");
}
}// src/metrics-server.ts
import { createServer } from "node:http";
import { MetricsCollector } from "./metrics.js";
export function startMetricsServer(collector: MetricsCollector, port: number): void {
const server = createServer((req, res) => {
if (req.url === "/metrics") {
res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify(collector.getMetrics()));
} else {
res.writeHead(404);
res.end("Not found");
}
});
server.listen(port, () => {
console.log(`[metrics] HTTP endpoint at http://localhost:${port}/metrics`);
});
}// src/pipeline.ts
import { SensorSource } from "./source.js";
import { EventFilter } from "./filter.js";
import { TumblingWindow } from "./window.js";
import { FanOut } from "./fanout.js";
import { ConsoleSink, FileSink } from "./sinks.js";
import { MetricsCollector } from "./metrics.js";
import { startMetricsServer } from "./metrics-server.js";
import { Transform, TransformCallback } from "node:stream";
import { SensorEvent, WindowResult } from "./types.js";
const SENSORS = ["sensor-a", "sensor-b", "sensor-c", "sensor-d"];
const EVENTS_PER_SECOND = 50;
const WINDOW_MS = 5000;
const metrics = new MetricsCollector();
// Metrics-tracking transform for incoming events
class IngestCounter extends Transform {
constructor(private collector: MetricsCollector) {
super({ objectMode: true });
}
_transform(event: SensorEvent, _enc: string, cb: TransformCallback): void {
this.collector.increment("eventsReceived");
cb(null, event);
}
}
class ProcessedCounter extends Transform {
constructor(private collector: MetricsCollector) {
super({ objectMode: true });
}
_transform(event: SensorEvent, _enc: string, cb: TransformCallback): void {
this.collector.increment("eventsProcessed");
cb(null, event);
}
}
class WindowCounter extends Transform {
constructor(private collector: MetricsCollector) {
super({ objectMode: true });
}
_transform(result: WindowResult, _enc: string, cb: TransformCallback): void {
this.collector.increment("windowsEmitted");
cb(null, result);
}
}
// Build the pipeline
const source = new SensorSource(SENSORS, EVENTS_PER_SECOND);
const ingestCounter = new IngestCounter(metrics);
const filter = new EventFilter(500);
const processedCounter = new ProcessedCounter(metrics);
const window = new TumblingWindow(WINDOW_MS, 80);
const windowCounter = new WindowCounter(metrics);
const fanout = new FanOut([new ConsoleSink(), new FileSink("./output.jsonl")]);
source
.pipe(ingestCounter)
.pipe(filter)
.pipe(processedCounter)
.pipe(window)
.pipe(windowCounter)
.pipe(fanout);
startMetricsServer(metrics, 9090);
// Print metrics every 10 seconds
setInterval(() => {
console.log("\n" + metrics.report() + "\n");
}, 10000);
console.log(
`[pipeline] Started: ${SENSORS.length} sensors, ${EVENTS_PER_SECOND} events/sec, ${WINDOW_MS}ms windows`
);npx tsx src/pipeline.ts# Count alerts per sensor
cat output.jsonl | jq -s 'group_by(.sensorId) | map({sensor: .[0].sensorId, alerts: [.[].alertCount] | add})'