Skip to main content

Ingestion Pipeline

The Ingestion service is a standalone Node.js process that continuously reads device telemetry from Redis Streams and persists it to TimescaleDB. It has no user-facing REST API — only internal health and metrics endpoints.


Why a Separate Service?

Separating ingestion from the API keeps two concerns independent:

  • The Cloud API stays fast — it receives MQTT messages and queues telemetry batches into Redis immediately, without waiting for the database write to complete.
  • The Ingestion service owns the write rate to TimescaleDB, applying batching, deduplication, and backpressure so the database is never overwhelmed during traffic spikes.
  • Multiple Ingestion pods can consume the same Redis consumer group for horizontal scale-out.

Data Flow

Agent

│ MQTT publish
│ i/{tenant}/a/{agent}/endpoints/{protocol}

MQTT Broker (Mosquitto)

│ MQTT subscribe

Cloud API (MQTT subscriber)
│ deserialize: DEFLATE → msgpack → JSON
│ expand key-compaction dictionary (if enabled)
│ deduplicate via Redis SETNX

├──▶ Redis Pub/Sub ← real-time state / metrics (dashboard)

└──▶ Redis Stream tenant:{id}:agent:devices:ingestion

│ XREADGROUP (consumer group)

Ingestion Worker

┌─────────┴──────────────┐
│ 1. Decode payload │
│ 2. Normalize readings │
│ 3. Deduplicate batch │
│ 4. Bulk INSERT │
└─────────┬──────────────┘

TimescaleDB
readings hypertable

Redis Streams

Stream keys

KeyDirectionContent
tenant:{id}:agent:devices:ingestionConsumer readsDevice telemetry batches from MQTT
tenant:{id}:agent:devices:dlqWorker writes on failureFailed message batches
tenant:{id}:agent:logsConsumer readsAgent log batches
tenant:{id}:device:{uuid}:metricsPub/Sub (publish only)Real-time metrics for WebSocket clients

Consumer group

The Ingestion service creates a consumer group {tenantId}:device-writers on startup if it does not already exist. Each worker in the group uses XREADGROUP to claim exclusive ownership of messages. Processed messages are acknowledged with XACK and trimmed from the stream. Unacknowledged messages (worker crash) are reclaimed via XAUTOCLAIM after a configurable timeout.


Processing Pipeline

1. Decode payload

The Cloud API MQTT handler has already deserialized and decoded the payload before writing it to Redis. The Ingestion worker receives a normalized JS object. Payloads from the MQTT path may have gone through:

  • DEFLATE decompression (outer wrapper)
  • MessagePack decoding (binary format)
  • Key-compaction dictionary expansion (integer indices → field names)

Unrecognized formats are moved to the DLQ rather than crashing the worker.

2. Normalization

The normalizer expands the device representation into individual readings rows. A single MQTT endpoints message typically carries readings from one sensor device with multiple metrics:

Input (from Redis — decoded MQTT endpoints payload):
{
agentUuid: "abc-123",
deviceName: "modbus-plc",
protocol: "modbus",
readings: [
{ name: "temperature", value: 72.4, unit: "°C", quality: "good" },
{ name: "pressure", value: 1013, unit: "hPa", quality: "good" }
]
}

Output (rows inserted into TimescaleDB):
{ time, agent_uuid, metric_name: "modbus-plc.temperature", value: 72.4, protocol: "modbus", ... }
{ time, agent_uuid, metric_name: "modbus-plc.pressure", value: 1013, protocol: "modbus", ... }

3. Deduplication

Within each batch, if the same (agent_uuid, metric_name, time) tuple appears more than once, the last value wins. Rows are then sorted by primary key before the database insert to prevent deadlocks when multiple workers operate concurrently.

At the MQTT layer, the Cloud API already deduplicates by msgId using Redis SETNX (24-hour TTL) to drop duplicate MQTT deliveries. The in-batch deduplication in the Ingestion worker is a second-pass safeguard for overlapping windows during reconnects.

4. Database Insert

Rows are inserted using a parameterized INSERT … ON CONFLICT DO NOTHING or PostgreSQL COPY (configurable via DB_INSERT_METHOD). The insert target is the readings hypertable — TimescaleDB routes each row to the correct 1-day chunk automatically.

After a successful batch, a background task updates last_telemetry_at on the relevant endpoint rows (fire-and-forget, non-blocking).


Resilience

Circuit Breaker

The circuit breaker monitors database insert failures. If errors exceed the threshold within a rolling window, it opens — workers stop attempting inserts and immediately route messages to the disk spool instead. The breaker polls the database connection in the background and closes automatically when the database recovers.

CLOSED (normal) ──errors exceed threshold──▶ OPEN (fault)
▲ │
└────── DB healthy, spool drained ────────────┘

Disk Spool

When the circuit is open (or Redis itself is unreachable), the spool writes raw message payloads to local disk (/tmp/iotistic-spool by default, configurable). The spool is capped at 500 MB. When the circuit closes, a replayer drains the spool back into the normal insert pipeline in order.

Dead-Letter Queue

Messages that fail parsing or produce unrecoverable insert errors after MAX_RETRIES attempts are written to the DLQ stream (tenant:{id}:agent:devices:dlq). The DLQ can be inspected and replayed manually using iotctl or directly via redis-cli XRANGE.


Autoscaling

The Ingestion service auto-adjusts its worker count (1–20) based on four signals sampled every 5 seconds:

SignalScale up whenScale down when
Consumer lag> 5 000 ms behind real-time< 500 ms
DB pool saturation> 80 % of connections in use< 40 %
Redis stream depth> 60 % of MAXLEN< 20 %
Redis memory pressure> 75 % of maxmemory< 50 %

Worker count changes are gradual (±1 per cycle) to avoid thundering-herd on the database connection pool.


Metrics

The Ingestion service exposes Prometheus metrics at GET /metrics (port 3003):

MetricDescription
ingestion_messages_processed_totalTotal device batches consumed from Redis
ingestion_readings_inserted_totalIndividual metric rows written to TimescaleDB
ingestion_dlq_lengthCurrent dead-letter queue depth
ingestion_stream_lengthPending messages in the ingest stream
ingestion_worker_lag_msConsumer group lag (ms behind real-time)
ingestion_worker_countCurrent active worker count
ingestion_batch_duration_p95_ms95th-percentile batch processing time
ingestion_db_insert_duration_p95_ms95th-percentile database insert time

Configuration

# Database
DB_HOST=localhost
DB_PORT=5432
DB_NAME=iotistic
DB_USER=postgres
DB_PASSWORD=postgres
DB_SSL=true
DB_POOL_SIZE=50

# Redis
REDIS_HOST=localhost
REDIS_PORT=6379

# Ingestion tuning
WORKER_COUNT=2 # initial worker count (autoscaling adjusts this)
BATCH_SIZE=100 # max messages per worker cycle
MAX_RETRIES=3 # attempts before DLQ
DB_INSERT_METHOD=insert # "insert" or "copy"

# Resilience
DISK_SPOOL_ENABLED=true
DISK_SPOOL_PATH=/tmp/iotistic-spool
DISK_SPOOL_MAX_SIZE_MB=500

# Port (health + metrics only)
PORT=3003

Health Check

GET /health

{
"status": "ok",
"uptime": 3847,
"workers": 3,
"streamLag": 120,
"circuitBreaker": "closed"
}

A status of degraded indicates the circuit breaker is open or the disk spool is active. A status of error means the service cannot reach either Redis or PostgreSQL.