Overview
The Iotistica cloud platform gives edge agents a home in the cloud: persistent device identity, target-state control, time-series telemetry storage, anomaly detection, fleet management, and a dashboard for operators. It is composed of two services that run independently and communicate through Redis Streams.
| Service | Role | Port |
|---|---|---|
| Cloud API | MQTT subscriber + REST gateway — device sync, auth, queries, MQTT management | 3002 |
| Ingestion | Redis stream consumer — normalizes and persists telemetry to TimescaleDB | 3003 |
Architecture
Data moves through the system in two distinct paths.
Write path (blue) — MQTT
Telemetry and device state flow to the cloud primarily over MQTT, not HTTP. This is the critical design decision that enables low-latency, offline-buffering, and broker-level QoS guarantees.
-
Edge agents connect to the Mosquitto MQTT broker and publish telemetry batches and state reports to compressed topics:
i/{encodedTenantId}/a/{encodedAgentUuid}/endpoints/{protocol}i/{encodedTenantId}/a/{encodedAgentUuid}/statei/{encodedTenantId}/a/{encodedAgentUuid}/metricsTopic IDs are Base64 URL-safe encoded (tenant 12-char hex → 8 chars, UUID 36-char → 22 chars) to minimize bandwidth on constrained links.
-
The Cloud API maintains a persistent MQTT subscription to the broker and receives all device messages. It auto-detects and decodes payloads in order: DEFLATE-compressed → MessagePack binary → JSON. An optional key-compaction dictionary (enabled with
USE_KEY_COMPACTION_POC=true) further reduces payload size by replacing field names with short integer indices. -
The API processes each message type:
- endpoints (sensor readings) → published to a Redis Stream (
tenant:{id}:agent:devices:ingestion) for async persistence - state (agent full state) → written to PostgreSQL and published to Redis Pub/Sub for real-time dashboard updates
- metrics (system CPU/memory/storage) → Redis Pub/Sub channel (
tenant:{id}:device:{uuid}:metrics) - events (anomalies, alerts) → PostgreSQL via the anomaly event handler
- jobs (status updates, job-start requests) → PostgreSQL
agent_job_statustable
- endpoints (sensor readings) → published to a Redis Stream (
-
The Ingestion service runs consumer workers in a Redis consumer group. Each worker reads from
tenant:{id}:agent:devices:ingestion, decompresses and normalizes the payload, deduplicates within the batch, then bulk-inserts into thereadingsTimescaleDB hypertable. -
Failed batches land in a dead-letter queue (
tenant:{id}:agent:devices:dlq). A circuit breaker and local disk spool prevent data loss if the database is temporarily unreachable.
Query path (dashed)
Dashboards and API clients call read endpoints (GET /api/v1/readings/:uuid/timeseries, etc.) over HTTPS. The Cloud API queries TimescaleDB directly — continuous aggregates serve pre-computed hourly and daily rollups, keeping response times fast even over long time ranges.
Control path
Agents poll GET /api/v1/device/:uuid/state over HTTPS to receive target state (which Docker containers should be running). The API serves ETag-cached responses so polling is cheap. Operators write target state through the dashboard or POST /api/v1/agents/:uuid/target-state.
MQTT Topic Structure
All MQTT topics are tenant-scoped and agent-scoped to prevent cross-tenant data leakage:
| Topic | Direction | Content |
|---|---|---|
i/{T}/a/{A}/endpoints/{protocol} | Agent → Broker → API | Sensor readings (Modbus, OPC-UA, BACnet, etc.) |
i/{T}/a/{A}/state | Agent → Broker → API | Full device state: apps, config, resource usage |
i/{T}/a/{A}/metrics | Agent → Broker → API | System metrics: CPU, memory, storage, temperature |
i/{T}/a/{A}/events/{type} | Agent → Broker → API | Anomaly events, alerts |
i/{T}/a/{A}/logs/{container} | Agent → Broker → API | Container log streams |
i/{T}/a/{A}/agent/{type} | Agent → Broker → API | Agent lifecycle: version, startup, shutdown |
i/{T}/a/{A}/jobs/{jobId}/update | Agent → Broker → API | Job execution status |
i/{T}/a/{A}/jobs/start-next | Agent → Broker → API | Agent requests next queued job |
{T} = Base64 URL-safe encoded tenant ID, {A} = Base64 URL-safe encoded agent UUID.
Message Payload Formats
The API auto-detects the payload format in priority order:
| Format | Detection | Notes |
|---|---|---|
| DEFLATE compressed | Byte header 0x78 0x9C (or 0x01, 0x5E, 0xDA) | Outer wrapper, payload inside may be msgpack or JSON |
| MessagePack binary | First byte in 0x90–0x9F, 0xDC–0xDD, 0x80–0x8F range | Compact binary representation of JSON-like structure |
| JSON | Default fallback | UTF-8 string |
An optional key compaction mode (USE_KEY_COMPACTION_POC=true) allows agents to publish field names as integer indices instead of strings, with a shared dictionary stored in Redis (dict:{agentUuid}). This is most beneficial on very low-bandwidth links.
Stack
| Layer | Technology |
|---|---|
| API framework | Fastify 5 + TypeScript |
| Transport (agents) | MQTT v5 (Mosquitto broker, QoS 1) |
| Time-series DB | PostgreSQL + TimescaleDB (hypertables, continuous aggregates, compression) |
| Message queue | Redis Streams (ioredis, consumer groups) |
| Real-time events | Redis Pub/Sub |
| Auth | JWT (HS256 / RS256) + API key per device |
| Metrics | Prometheus-compatible /metrics endpoint |
| Logging | Pino structured JSON |
Multi-Tenancy
All MQTT topics, Redis keys, and database queries are scoped to a tenant ID:
# MQTT topics
i/{encodedTenantId}/a/{encodedAgentUuid}/endpoints/modbus
# Redis streams
tenant:{tenantId}:agent:devices:ingestion ← telemetry write stream
tenant:{tenantId}:agent:devices:dlq ← dead-letter queue
tenant:{tenantId}:agent:logs ← log stream
# Redis pub/sub
tenant:{tenantId}:device:{uuid}:state ← real-time state updates
tenant:{tenantId}:device:{uuid}:metrics ← real-time metrics
Redis hash tags ({tenantId}) pin related keys to the same cluster slot for atomic operations. MQTT ACLs in the broker are per-tenant-user so subscribers cannot cross tenant boundaries at the broker level.
TimescaleDB Schema Highlights
The readings table is the core of the time-series store:
CREATE TABLE readings (
time timestamptz NOT NULL,
agent_uuid uuid NOT NULL,
metric_name text NOT NULL,
value double precision,
quality text DEFAULT 'good',
unit text,
protocol text NOT NULL, -- modbus, opcua, mqtt, …
extra jsonb DEFAULT '{}', -- endpoint_uuid, device_name, …
anomaly_score double precision,
anomaly_threshold double precision,
PRIMARY KEY (agent_uuid, metric_name, time)
);
TimescaleDB policies applied automatically:
- Hypertable — 1-day chunks, automatic partition management
- Compression — auto-compress chunks older than 7 days (~90 % storage saving)
- Retention — auto-drop chunks older than 730 days
- Continuous aggregates —
readings_1m,readings_1h,readings_dailypre-computed
Authentication
See Authentication for details on JWT tokens, device API keys, and MQTT credentials.
Endpoints
See REST Endpoints for the full request/response reference.
Ingestion Pipeline
See Ingestion Pipeline for stream consumer internals, resilience, and autoscaling.