Ingestion Pipeline
The Ingestion service is a standalone Node.js process that continuously reads device telemetry from Redis Streams and persists it to TimescaleDB. It has no user-facing REST API — only internal health and metrics endpoints.
Why a Separate Service?
Separating ingestion from the API keeps two concerns independent:
- The Cloud API stays fast — it receives MQTT messages and queues telemetry batches into Redis immediately, without waiting for the database write to complete.
- The Ingestion service owns the write rate to TimescaleDB, applying batching, deduplication, and backpressure so the database is never overwhelmed during traffic spikes.
- Multiple Ingestion pods can consume the same Redis consumer group for horizontal scale-out.
Data Flow
Agent
│
│ MQTT publish
│ i/{tenant}/a/{agent}/endpoints/{protocol}
▼
MQTT Broker (Mosquitto)
│
│ MQTT subscribe
▼
Cloud API (MQTT subscriber)
│ deserialize: DEFLATE → msgpack → JSON
│ expand key-compaction dictionary (if enabled)
│ deduplicate via Redis SETNX
│
├──▶ Redis Pub/Sub ← real-time state / metrics (dashboard)
│
└──▶ Redis Stream tenant:{id}:agent:devices:ingestion
│
│ XREADGROUP (consumer group)
▼
Ingestion Worker
│
┌─────────┴──────────────┐
│ 1. Decode payload │
│ 2. Normalize readings │
│ 3. Deduplicate batch │
│ 4. Bulk INSERT │
└─────────┬──────────────┘
│
TimescaleDB
readings hypertable
Redis Streams
Stream keys
| Key | Direction | Content |
|---|---|---|
tenant:{id}:agent:devices:ingestion | Consumer reads | Device telemetry batches from MQTT |
tenant:{id}:agent:devices:dlq | Worker writes on failure | Failed message batches |
tenant:{id}:agent:logs | Consumer reads | Agent log batches |
tenant:{id}:device:{uuid}:metrics | Pub/Sub (publish only) | Real-time metrics for WebSocket clients |
Consumer group
The Ingestion service creates a consumer group {tenantId}:device-writers on startup if it does not already exist. Each worker in the group uses XREADGROUP to claim exclusive ownership of messages. Processed messages are acknowledged with XACK and trimmed from the stream. Unacknowledged messages (worker crash) are reclaimed via XAUTOCLAIM after a configurable timeout.
Processing Pipeline
1. Decode payload
The Cloud API MQTT handler has already deserialized and decoded the payload before writing it to Redis. The Ingestion worker receives a normalized JS object. Payloads from the MQTT path may have gone through:
- DEFLATE decompression (outer wrapper)
- MessagePack decoding (binary format)
- Key-compaction dictionary expansion (integer indices → field names)
Unrecognized formats are moved to the DLQ rather than crashing the worker.
2. Normalization
The normalizer expands the device representation into individual readings rows. A single MQTT endpoints message typically carries readings from one sensor device with multiple metrics:
Input (from Redis — decoded MQTT endpoints payload):
{
agentUuid: "abc-123",
deviceName: "modbus-plc",
protocol: "modbus",
readings: [
{ name: "temperature", value: 72.4, unit: "°C", quality: "good" },
{ name: "pressure", value: 1013, unit: "hPa", quality: "good" }
]
}
Output (rows inserted into TimescaleDB):
{ time, agent_uuid, metric_name: "modbus-plc.temperature", value: 72.4, protocol: "modbus", ... }
{ time, agent_uuid, metric_name: "modbus-plc.pressure", value: 1013, protocol: "modbus", ... }
3. Deduplication
Within each batch, if the same (agent_uuid, metric_name, time) tuple appears more than once, the last value wins. Rows are then sorted by primary key before the database insert to prevent deadlocks when multiple workers operate concurrently.
At the MQTT layer, the Cloud API already deduplicates by msgId using Redis SETNX (24-hour TTL) to drop duplicate MQTT deliveries. The in-batch deduplication in the Ingestion worker is a second-pass safeguard for overlapping windows during reconnects.
4. Database Insert
Rows are inserted using a parameterized INSERT … ON CONFLICT DO NOTHING or PostgreSQL COPY (configurable via DB_INSERT_METHOD). The insert target is the readings hypertable — TimescaleDB routes each row to the correct 1-day chunk automatically.
After a successful batch, a background task updates last_telemetry_at on the relevant endpoint rows (fire-and-forget, non-blocking).
Resilience
Circuit Breaker
The circuit breaker monitors database insert failures. If errors exceed the threshold within a rolling window, it opens — workers stop attempting inserts and immediately route messages to the disk spool instead. The breaker polls the database connection in the background and closes automatically when the database recovers.
CLOSED (normal) ──errors exceed threshold──▶ OPEN (fault)
▲ │
└────── DB healthy, spool drained ────────────┘
Disk Spool
When the circuit is open (or Redis itself is unreachable), the spool writes raw message payloads to local disk (/tmp/iotistic-spool by default, configurable). The spool is capped at 500 MB. When the circuit closes, a replayer drains the spool back into the normal insert pipeline in order.
Dead-Letter Queue
Messages that fail parsing or produce unrecoverable insert errors after MAX_RETRIES attempts are written to the DLQ stream (tenant:{id}:agent:devices:dlq). The DLQ can be inspected and replayed manually using iotctl or directly via redis-cli XRANGE.
Autoscaling
The Ingestion service auto-adjusts its worker count (1–20) based on four signals sampled every 5 seconds:
| Signal | Scale up when | Scale down when |
|---|---|---|
| Consumer lag | > 5 000 ms behind real-time | < 500 ms |
| DB pool saturation | > 80 % of connections in use | < 40 % |
| Redis stream depth | > 60 % of MAXLEN | < 20 % |
| Redis memory pressure | > 75 % of maxmemory | < 50 % |
Worker count changes are gradual (±1 per cycle) to avoid thundering-herd on the database connection pool.
Metrics
The Ingestion service exposes Prometheus metrics at GET /metrics (port 3003):
| Metric | Description |
|---|---|
ingestion_messages_processed_total | Total device batches consumed from Redis |
ingestion_readings_inserted_total | Individual metric rows written to TimescaleDB |
ingestion_dlq_length | Current dead-letter queue depth |
ingestion_stream_length | Pending messages in the ingest stream |
ingestion_worker_lag_ms | Consumer group lag (ms behind real-time) |
ingestion_worker_count | Current active worker count |
ingestion_batch_duration_p95_ms | 95th-percentile batch processing time |
ingestion_db_insert_duration_p95_ms | 95th-percentile database insert time |
Configuration
# Database
DB_HOST=localhost
DB_PORT=5432
DB_NAME=iotistic
DB_USER=postgres
DB_PASSWORD=postgres
DB_SSL=true
DB_POOL_SIZE=50
# Redis
REDIS_HOST=localhost
REDIS_PORT=6379
# Ingestion tuning
WORKER_COUNT=2 # initial worker count (autoscaling adjusts this)
BATCH_SIZE=100 # max messages per worker cycle
MAX_RETRIES=3 # attempts before DLQ
DB_INSERT_METHOD=insert # "insert" or "copy"
# Resilience
DISK_SPOOL_ENABLED=true
DISK_SPOOL_PATH=/tmp/iotistic-spool
DISK_SPOOL_MAX_SIZE_MB=500
# Port (health + metrics only)
PORT=3003
Health Check
GET /health
{
"status": "ok",
"uptime": 3847,
"workers": 3,
"streamLag": 120,
"circuitBreaker": "closed"
}
A status of degraded indicates the circuit breaker is open or the disk spool is active. A status of error means the service cannot reach either Redis or PostgreSQL.