Loading
Build a log aggregation system that ingests logs via HTTP, parses structured data, supports full-text search, time-range filtering, and displays metrics on a dashboard.
Every production system generates logs — HTTP access logs, application errors, audit trails, performance metrics. Without aggregation, these logs are scattered across servers, unsearchable, and effectively useless when you need them most: during an incident at 2 AM.
In this tutorial, you will build a log aggregation system from scratch. It ingests logs via HTTP from any service, parses them into structured fields, stores them with full-text search capability, supports time-range filtering and field-based queries, and presents metrics on a dashboard with charts. Think of it as a simplified version of the ELK stack (Elasticsearch, Logstash, Kibana) that you can run locally and understand completely.
The architecture follows the standard log pipeline: ingest (receive logs) → parse (extract structure) → store (index for search) → query (find and filter) → visualize (charts and dashboards).
Initialize the project with SQLite for storage and Express for the HTTP layer.
Define the core types:
Create parsers that extract structure from common log formats.
The parser is intentionally flexible. It handles JSON logs (from structured logging libraries like Pino or Winston), bracket-format logs, and plain text. In production, you would add parsers for syslog, Apache access logs, and nginx logs.
Create a SQLite-backed store with full-text search using FTS5.
SQLite's FTS5 (Full-Text Search 5) extension provides fast, relevant text search. The virtual table is kept in sync with the main table via a trigger that fires on every insert. This is the same approach used by many desktop applications for local search.
Create endpoints for receiving logs from applications.
The /ingest/text endpoint accepts raw text with one log entry per line — useful for piping output from docker logs or journalctl directly to your aggregator with curl.
Create endpoints for searching and filtering logs.
Create a server-rendered dashboard that displays log metrics and search results.
Implement automatic cleanup of old logs to prevent unbounded storage growth.
You will need to add a deleteOlderThan method to the LogStorage class:
Create a lightweight client that applications use to send logs to your aggregator.
The client buffers logs in memory and flushes them in batches to reduce HTTP overhead. If a flush fails, logs are put back in the buffer for the next attempt. This ensures no logs are lost during transient network failures.
Implement simple alerting that triggers when error rates exceed thresholds.
Usage example that alerts when error rate exceeds 10% in the last hour:
Wire everything together into the final application.
Test the full pipeline:
You now have a working log aggregation system. The architecture — HTTP ingestion, structured parsing, indexed storage, full-text search, time-series metrics, alerting — mirrors exactly what production systems like Datadog, Splunk, and Grafana Loki do at scale. The difference is scale: for production use, you would swap SQLite for ClickHouse or Elasticsearch, add a message broker like Kafka for ingestion buffering, and deploy the dashboard as a proper React application with streaming updates.
mkdir log-aggregator && cd log-aggregator
npm init -y
npm install express better-sqlite3
npm install -D typescript @types/node @types/express @types/better-sqlite3 tsx
npx tsc --init --target ES2022 --module NodeNext --moduleResolution NodeNext// src/types.ts
export type LogLevel = "debug" | "info" | "warn" | "error" | "fatal";
export interface LogEntry {
id: string;
timestamp: string;
level: LogLevel;
service: string;
message: string;
fields: Record<string, string>;
raw: string;
}
export interface LogQuery {
search?: string;
level?: LogLevel;
service?: string;
from?: string;
to?: string;
limit?: number;
offset?: number;
}
export interface LogStats {
totalLogs: number;
byLevel: Record<LogLevel, number>;
byService: Record<string, number>;
recentErrors: number;
}// src/parser.ts
import { LogEntry, LogLevel } from "./types.js";
const VALID_LEVELS: LogLevel[] = ["debug", "info", "warn", "error", "fatal"];
export function parseLogLine(raw: string, defaultService = "unknown"): Omit<LogEntry, "id"> {
// Try JSON format first
try {
const parsed = JSON.parse(raw);
if (typeof parsed === "object" && parsed !== null) {
return parseJsonLog(parsed, raw, defaultService);
}
} catch {
// Not JSON, try other formats
}
// Try common log format: [TIMESTAMP] LEVEL [SERVICE] Message
const clfMatch = raw.match(
/^\[([^\]]+)\]\s+(debug|info|warn|error|fatal)\s+\[([^\]]+)\]\s+(.+)$/i
);
if (clfMatch) {
return {
timestamp: normalizeTimestamp(clfMatch[1]),
level: clfMatch[2].toLowerCase() as LogLevel,
service: clfMatch[3],
message: clfMatch[4],
fields: {},
raw,
};
}
// Fallback: treat entire line as message
return {
timestamp: new Date().toISOString(),
level: "info",
service: defaultService,
message: raw.trim(),
fields: {},
raw,
};
}
function parseJsonLog(
obj: Record<string, unknown>,
raw: string,
defaultService: string
): Omit<LogEntry, "id"> {
const level = normalizeLevel(String(obj.level ?? obj.severity ?? "info"));
const message = String(obj.message ?? obj.msg ?? obj.text ?? "");
const service = String(obj.service ?? obj.app ?? obj.source ?? defaultService);
const timestamp = normalizeTimestamp(
String(obj.timestamp ?? obj.time ?? obj.ts ?? new Date().toISOString())
);
// Extract remaining fields
const reserved = new Set([
"level",
"severity",
"message",
"msg",
"text",
"service",
"app",
"source",
"timestamp",
"time",
"ts",
]);
const fields: Record<string, string> = {};
for (const [key, value] of Object.entries(obj)) {
if (!reserved.has(key)) {
fields[key] = String(value);
}
}
return { timestamp, level, service, message, fields, raw };
}
function normalizeLevel(level: string): LogLevel {
const lower = level.toLowerCase();
if (VALID_LEVELS.includes(lower as LogLevel)) return lower as LogLevel;
if (lower === "warning") return "warn";
if (lower === "critical" || lower === "emergency") return "fatal";
if (lower === "trace" || lower === "verbose") return "debug";
return "info";
}
function normalizeTimestamp(input: string): string {
const date = new Date(input);
return isNaN(date.getTime()) ? new Date().toISOString() : date.toISOString();
}// src/storage.ts
import Database from "better-sqlite3";
import { LogEntry, LogQuery, LogLevel, LogStats } from "./types.js";
export class LogStorage {
private db: Database.Database;
constructor(dbPath = "logs.db") {
this.db = new Database(dbPath);
this.db.pragma("journal_mode = WAL");
this.initialize();
}
private initialize(): void {
this.db.exec(`
CREATE TABLE IF NOT EXISTS logs (
id TEXT PRIMARY KEY,
timestamp TEXT NOT NULL,
level TEXT NOT NULL,
service TEXT NOT NULL,
message TEXT NOT NULL,
fields TEXT NOT NULL DEFAULT '{}',
raw TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_logs_timestamp ON logs(timestamp);
CREATE INDEX IF NOT EXISTS idx_logs_level ON logs(level);
CREATE INDEX IF NOT EXISTS idx_logs_service ON logs(service);
CREATE VIRTUAL TABLE IF NOT EXISTS logs_fts USING fts5(
message,
content='logs',
content_rowid='rowid'
);
CREATE TRIGGER IF NOT EXISTS logs_ai AFTER INSERT ON logs BEGIN
INSERT INTO logs_fts(rowid, message) VALUES (new.rowid, new.message);
END;
`);
}
insert(entry: LogEntry): void {
this.db
.prepare(
`
INSERT INTO logs (id, timestamp, level, service, message, fields, raw)
VALUES (?, ?, ?, ?, ?, ?, ?)
`
)
.run(
entry.id,
entry.timestamp,
entry.level,
entry.service,
entry.message,
JSON.stringify(entry.fields),
entry.raw
);
}
insertBatch(entries: LogEntry[]): void {
const stmt = this.db.prepare(`
INSERT INTO logs (id, timestamp, level, service, message, fields, raw)
VALUES (?, ?, ?, ?, ?, ?, ?)
`);
const transaction = this.db.transaction((items: LogEntry[]) => {
for (const entry of items) {
stmt.run(
entry.id,
entry.timestamp,
entry.level,
entry.service,
entry.message,
JSON.stringify(entry.fields),
entry.raw
);
}
});
transaction(entries);
}
query(params: LogQuery): LogEntry[] {
let sql = "SELECT * FROM logs WHERE 1=1";
const bindings: unknown[] = [];
if (params.search) {
sql += " AND rowid IN (SELECT rowid FROM logs_fts WHERE logs_fts MATCH ?)";
bindings.push(params.search);
}
if (params.level) {
sql += " AND level = ?";
bindings.push(params.level);
}
if (params.service) {
sql += " AND service = ?";
bindings.push(params.service);
}
if (params.from) {
sql += " AND timestamp >= ?";
bindings.push(params.from);
}
if (params.to) {
sql += " AND timestamp <= ?";
bindings.push(params.to);
}
sql += " ORDER BY timestamp DESC LIMIT ? OFFSET ?";
bindings.push(params.limit ?? 100, params.offset ?? 0);
const rows = this.db.prepare(sql).all(...bindings) as Array<Record<string, unknown>>;
return rows.map((row) => this.rowToEntry(row));
}
getStats(): LogStats {
const total = this.db.prepare("SELECT COUNT(*) as count FROM logs").get() as { count: number };
const levelRows = this.db
.prepare("SELECT level, COUNT(*) as count FROM logs GROUP BY level")
.all() as Array<{ level: LogLevel; count: number }>;
const serviceRows = this.db
.prepare(
"SELECT service, COUNT(*) as count FROM logs GROUP BY service ORDER BY count DESC LIMIT 10"
)
.all() as Array<{ service: string; count: number }>;
const oneHourAgo = new Date(Date.now() - 3600000).toISOString();
const recentErrors = this.db
.prepare(
"SELECT COUNT(*) as count FROM logs WHERE level IN ('error', 'fatal') AND timestamp >= ?"
)
.get(oneHourAgo) as { count: number };
const byLevel: Record<string, number> = { debug: 0, info: 0, warn: 0, error: 0, fatal: 0 };
for (const row of levelRows) byLevel[row.level] = row.count;
const byService: Record<string, number> = {};
for (const row of serviceRows) byService[row.service] = row.count;
return {
totalLogs: total.count,
byLevel: byLevel as Record<LogLevel, number>,
byService,
recentErrors: recentErrors.count,
};
}
getTimeSeries(
intervalMinutes: number,
hours = 24
): Array<{ bucket: string; count: number; errors: number }> {
const since = new Date(Date.now() - hours * 3600000).toISOString();
const rows = this.db
.prepare(
`
SELECT
strftime('%Y-%m-%dT%H:', timestamp) ||
printf('%02d', (CAST(strftime('%M', timestamp) AS INTEGER) / ?) * ?) || ':00Z' as bucket,
COUNT(*) as count,
SUM(CASE WHEN level IN ('error', 'fatal') THEN 1 ELSE 0 END) as errors
FROM logs
WHERE timestamp >= ?
GROUP BY bucket
ORDER BY bucket
`
)
.all(intervalMinutes, intervalMinutes, since) as Array<{
bucket: string;
count: number;
errors: number;
}>;
return rows;
}
getServices(): string[] {
const rows = this.db
.prepare("SELECT DISTINCT service FROM logs ORDER BY service")
.all() as Array<{ service: string }>;
return rows.map((r) => r.service);
}
private rowToEntry(row: Record<string, unknown>): LogEntry {
return {
id: row.id as string,
timestamp: row.timestamp as string,
level: row.level as LogLevel,
service: row.service as string,
message: row.message as string,
fields: JSON.parse(row.fields as string),
raw: row.raw as string,
};
}
}// src/ingest.ts
import { Router, Request, Response } from "express";
import { LogStorage } from "./storage.js";
import { parseLogLine } from "./parser.js";
export function createIngestRouter(storage: LogStorage): Router {
const router = Router();
// Single log entry
router.post("/ingest", (req: Request, res: Response) => {
try {
const service = (req.headers["x-service"] as string) ?? "unknown";
if (typeof req.body === "string") {
const parsed = parseLogLine(req.body, service);
const entry = { ...parsed, id: crypto.randomUUID() };
storage.insert(entry);
res.status(202).json({ id: entry.id });
return;
}
const parsed = parseLogLine(JSON.stringify(req.body), service);
const entry = { ...parsed, id: crypto.randomUUID() };
storage.insert(entry);
res.status(202).json({ id: entry.id });
} catch (error) {
res.status(400).json({ error: `Failed to ingest log: ${error}` });
}
});
// Batch ingestion
router.post("/ingest/batch", (req: Request, res: Response) => {
try {
const service = (req.headers["x-service"] as string) ?? "unknown";
const lines: string[] = Array.isArray(req.body)
? req.body.map((item: unknown) => (typeof item === "string" ? item : JSON.stringify(item)))
: [];
const entries = lines.map((line) => ({
...parseLogLine(line, service),
id: crypto.randomUUID(),
}));
storage.insertBatch(entries);
res.status(202).json({ ingested: entries.length });
} catch (error) {
res.status(400).json({ error: `Batch ingestion failed: ${error}` });
}
});
// Plain text ingestion (one log per line)
router.post("/ingest/text", (req: Request, res: Response) => {
try {
const service = (req.headers["x-service"] as string) ?? "unknown";
const text = typeof req.body === "string" ? req.body : JSON.stringify(req.body);
const lines = text.split("\n").filter((line: string) => line.trim());
const entries = lines.map((line: string) => ({
...parseLogLine(line, service),
id: crypto.randomUUID(),
}));
storage.insertBatch(entries);
res.status(202).json({ ingested: entries.length });
} catch (error) {
res.status(400).json({ error: `Text ingestion failed: ${error}` });
}
});
return router;
}// src/query.ts
import { Router, Request, Response } from "express";
import { LogStorage } from "./storage.js";
import { LogLevel } from "./types.js";
export function createQueryRouter(storage: LogStorage): Router {
const router = Router();
router.get("/logs", (req: Request, res: Response) => {
const query = {
search: req.query.q as string | undefined,
level: req.query.level as LogLevel | undefined,
service: req.query.service as string | undefined,
from: req.query.from as string | undefined,
to: req.query.to as string | undefined,
limit: Number(req.query.limit) || 100,
offset: Number(req.query.offset) || 0,
};
const results = storage.query(query);
res.json({
results,
count: results.length,
query,
});
});
router.get("/logs/:id", (req: Request, res: Response) => {
const results = storage.query({ search: req.params.id, limit: 1 });
if (results.length === 0) {
res.status(404).json({ error: "Log not found" });
return;
}
res.json(results[0]);
});
router.get("/stats", (_: Request, res: Response) => {
res.json(storage.getStats());
});
router.get("/timeseries", (req: Request, res: Response) => {
const interval = Number(req.query.interval) || 15;
const hours = Number(req.query.hours) || 24;
res.json(storage.getTimeSeries(interval, hours));
});
router.get("/services", (_: Request, res: Response) => {
res.json(storage.getServices());
});
return router;
}// src/dashboard.ts
import { Router, Request, Response } from "express";
import { LogStorage } from "./storage.js";
import { LogLevel } from "./types.js";
export function createDashboardRouter(storage: LogStorage): Router {
const router = Router();
router.get("/dashboard", (req: Request, res: Response) => {
const stats = storage.getStats();
const timeseries = storage.getTimeSeries(15, 24);
const query = {
search: req.query.q as string | undefined,
level: req.query.level as LogLevel | undefined,
service: req.query.service as string | undefined,
limit: 50,
};
const logs = storage.query(query);
const levelColors: Record<string, string> = {
debug: "#6b7280",
info: "#3b82f6",
warn: "#f59e0b",
error: "#ef4444",
fatal: "#dc2626",
};
const html = `<!DOCTYPE html>
<html><head><title>Log Aggregator</title>
<style>
* { margin: 0; padding: 0; box-sizing: border-box; }
body { font-family: system-ui, sans-serif; background: #0f172a; color: #e2e8f0; }
.container { max-width: 1200px; margin: 0 auto; padding: 24px; }
.grid { display: grid; grid-template-columns: repeat(4, 1fr); gap: 16px; margin-bottom: 24px; }
.card { background: #1e293b; border-radius: 8px; padding: 16px; }
.card h3 { font-size: 12px; color: #94a3b8; text-transform: uppercase; }
.card .value { font-size: 28px; font-weight: 700; margin-top: 4px; }
.search { width: 100%; padding: 12px; border-radius: 8px; border: 1px solid #334155; background: #1e293b; color: #e2e8f0; font-size: 14px; margin-bottom: 16px; }
table { width: 100%; border-collapse: collapse; }
th { text-align: left; padding: 8px; border-bottom: 1px solid #334155; font-size: 12px; color: #94a3b8; }
td { padding: 8px; border-bottom: 1px solid #1e293b; font-size: 13px; font-family: monospace; }
.level { padding: 2px 6px; border-radius: 4px; font-size: 11px; font-weight: 600; }
</style></head>
<body><div class="container">
<h1 style="margin-bottom: 24px;">Log Aggregator</h1>
<div class="grid">
<div class="card"><h3>Total Logs</h3><div class="value">${stats.totalLogs.toLocaleString()}</div></div>
<div class="card"><h3>Errors (1h)</h3><div class="value" style="color: #ef4444;">${stats.recentErrors}</div></div>
<div class="card"><h3>Services</h3><div class="value">${Object.keys(stats.byService).length}</div></div>
<div class="card"><h3>Error Rate</h3><div class="value">${stats.totalLogs > 0 ? (((stats.byLevel.error + stats.byLevel.fatal) / stats.totalLogs) * 100).toFixed(1) : 0}%</div></div>
</div>
<form method="GET" action="/dashboard">
<input class="search" name="q" placeholder="Search logs..." value="${query.search ?? ""}" />
</form>
<div class="card" style="overflow-x: auto;">
<table>
<thead><tr><th>Time</th><th>Level</th><th>Service</th><th>Message</th></tr></thead>
<tbody>
${logs
.map(
(log) => `<tr>
<td>${new Date(log.timestamp).toLocaleString()}</td>
<td><span class="level" style="background: ${levelColors[log.level]}20; color: ${levelColors[log.level]}">${log.level}</span></td>
<td>${log.service}</td>
<td>${escapeHtml(log.message.substring(0, 200))}</td>
</tr>`
)
.join("")}
</tbody>
</table>
</div>
</div></body></html>`;
res.setHeader("Content-Type", "text/html");
res.send(html);
});
return router;
}
function escapeHtml(text: string): string {
return text.replace(/&/g, "&").replace(/</g, "<").replace(/>/g, ">");
}// src/retention.ts
import { LogStorage } from "./storage.js";
export class RetentionManager {
private intervalId: ReturnType<typeof setInterval> | null = null;
constructor(
private storage: LogStorage,
private retentionDays: number = 30
) {}
start(checkIntervalMs = 3600000): void {
this.intervalId = setInterval(() => {
this.cleanup();
}, checkIntervalMs);
// Run immediately on start
this.cleanup();
}
stop(): void {
if (this.intervalId) {
clearInterval(this.intervalId);
this.intervalId = null;
}
}
cleanup(): number {
const cutoff = new Date(Date.now() - this.retentionDays * 86400000).toISOString();
return this.storage.deleteOlderThan(cutoff);
}
}deleteOlderThan(timestamp: string): number {
const result = this.db.prepare(
"DELETE FROM logs WHERE timestamp < ?"
).run(timestamp);
return result.changes;
}// src/client.ts
export class LogClient {
private buffer: string[] = [];
private flushInterval: ReturnType<typeof setInterval> | null = null;
constructor(
private endpoint: string,
private service: string,
private batchSize = 50,
private flushMs = 5000
) {
this.flushInterval = setInterval(() => this.flush(), flushMs);
}
log(level: string, message: string, fields: Record<string, unknown> = {}): void {
const entry = JSON.stringify({
level,
message,
service: this.service,
timestamp: new Date().toISOString(),
...fields,
});
this.buffer.push(entry);
if (this.buffer.length >= this.batchSize) {
this.flush();
}
}
async flush(): Promise<void> {
if (this.buffer.length === 0) return;
const batch = this.buffer.splice(0);
try {
await fetch(`${this.endpoint}/ingest/batch`, {
method: "POST",
headers: {
"Content-Type": "application/json",
"X-Service": this.service,
},
body: JSON.stringify(batch.map((b) => JSON.parse(b))),
});
} catch (error) {
// Put logs back in buffer on failure
this.buffer.unshift(...batch);
console.error(`Failed to ship logs: ${error}`);
}
}
destroy(): void {
if (this.flushInterval) clearInterval(this.flushInterval);
this.flush();
}
}// src/alerts.ts
import { LogStorage } from "./storage.js";
interface AlertRule {
name: string;
condition: (storage: LogStorage) => boolean;
action: (rule: AlertRule) => void;
cooldownMs: number;
lastTriggered: number;
}
export class AlertManager {
private rules: AlertRule[] = [];
private intervalId: ReturnType<typeof setInterval> | null = null;
constructor(private storage: LogStorage) {}
addRule(
name: string,
condition: (storage: LogStorage) => boolean,
action: (rule: AlertRule) => void,
cooldownMs = 300000
): void {
this.rules.push({ name, condition, action, cooldownMs, lastTriggered: 0 });
}
start(checkIntervalMs = 60000): void {
this.intervalId = setInterval(() => this.evaluate(), checkIntervalMs);
}
stop(): void {
if (this.intervalId) clearInterval(this.intervalId);
}
private evaluate(): void {
const now = Date.now();
for (const rule of this.rules) {
if (now - rule.lastTriggered < rule.cooldownMs) continue;
try {
if (rule.condition(this.storage)) {
rule.lastTriggered = now;
rule.action(rule);
}
} catch (error) {
console.error(`Alert rule "${rule.name}" evaluation failed: ${error}`);
}
}
}
}const alertManager = new AlertManager(storage);
alertManager.addRule(
"High error rate",
(store) => {
const stats = store.getStats();
if (stats.totalLogs === 0) return false;
const errorRate = (stats.byLevel.error + stats.byLevel.fatal) / stats.totalLogs;
return errorRate > 0.1;
},
(rule) => {
console.warn(`ALERT: ${rule.name} triggered!`);
// In production: send to Slack, PagerDuty, or email
}
);// src/index.ts
import express from "express";
import { LogStorage } from "./storage.js";
import { createIngestRouter } from "./ingest.js";
import { createQueryRouter } from "./query.js";
import { createDashboardRouter } from "./dashboard.js";
import { RetentionManager } from "./retention.js";
import { AlertManager } from "./alerts.js";
const storage = new LogStorage();
const retention = new RetentionManager(storage, 30);
const alertManager = new AlertManager(storage);
// Configure alerts
alertManager.addRule(
"High error rate",
(store) => store.getStats().recentErrors > 50,
(rule) => console.warn(`ALERT: ${rule.name}`),
300000
);
const app = express();
app.use(express.json());
app.use(express.text());
app.use(createIngestRouter(storage));
app.use(createQueryRouter(storage));
app.use(createDashboardRouter(storage));
app.listen(3000, () => {
console.log("Log aggregator running on port 3000");
retention.start();
alertManager.start();
});# Send a structured log
curl -X POST http://localhost:3000/ingest \
-H "Content-Type: application/json" \
-H "X-Service: api-server" \
-d '{"level": "info", "message": "User logged in", "userId": "123"}'
# Send a batch of logs
curl -X POST http://localhost:3000/ingest/batch \
-H "Content-Type: application/json" \
-H "X-Service: worker" \
-d '[{"level": "error", "message": "Job failed: timeout"}, {"level": "info", "message": "Job completed"}]'
# Search logs
curl "http://localhost:3000/logs?q=failed&level=error"
# View the dashboard
open http://localhost:3000/dashboard