Skip to main content

Overview

The Iotistica cloud platform gives edge agents a home in the cloud: persistent device identity, target-state control, time-series telemetry storage, anomaly detection, fleet management, and a dashboard for operators. It is composed of two services that run independently and communicate through Redis Streams.

ServiceRolePort
Cloud APIMQTT subscriber + REST gateway — device sync, auth, queries, MQTT management3002
IngestionRedis stream consumer — normalizes and persists telemetry to TimescaleDB3003

Architecture

SOURCESBROKERCLOUD APIREDISINGESTIONTIMESCALEDBAgent 1IoT deviceAgent 2IoT deviceAgent NIoT deviceDashboardweb / API clientMQTT publishMQTT BrokerMosquittoFile AuthsubscribeCloud APIMQTT SubscriberREST EndpointsAuth / JWTRedis PublisherQuery LayerwriteRedis StreamsIngest streamLogs streamPub/SubIngestionConsumer WorkersNormalizerCircuit BreakerDisk SpoolTimescaleDBreadings (hypertable)Cont. AggregatesState & ConfigAudit Logsquery path

Data moves through the system in two distinct paths.

Write path (blue) — MQTT

Telemetry and device state flow to the cloud primarily over MQTT, not HTTP. This is the critical design decision that enables low-latency, offline-buffering, and broker-level QoS guarantees.

  1. Edge agents connect to the Mosquitto MQTT broker and publish telemetry batches and state reports to compressed topics:

    i/{encodedTenantId}/a/{encodedAgentUuid}/endpoints/{protocol}
    i/{encodedTenantId}/a/{encodedAgentUuid}/state
    i/{encodedTenantId}/a/{encodedAgentUuid}/metrics

    Topic IDs are Base64 URL-safe encoded (tenant 12-char hex → 8 chars, UUID 36-char → 22 chars) to minimize bandwidth on constrained links.

  2. The Cloud API maintains a persistent MQTT subscription to the broker and receives all device messages. It auto-detects and decodes payloads in order: DEFLATE-compressed → MessagePack binary → JSON. An optional key-compaction dictionary (enabled with USE_KEY_COMPACTION_POC=true) further reduces payload size by replacing field names with short integer indices.

  3. The API processes each message type:

    • endpoints (sensor readings) → published to a Redis Stream (tenant:{id}:agent:devices:ingestion) for async persistence
    • state (agent full state) → written to PostgreSQL and published to Redis Pub/Sub for real-time dashboard updates
    • metrics (system CPU/memory/storage) → Redis Pub/Sub channel (tenant:{id}:device:{uuid}:metrics)
    • events (anomalies, alerts) → PostgreSQL via the anomaly event handler
    • jobs (status updates, job-start requests) → PostgreSQL agent_job_status table
  4. The Ingestion service runs consumer workers in a Redis consumer group. Each worker reads from tenant:{id}:agent:devices:ingestion, decompresses and normalizes the payload, deduplicates within the batch, then bulk-inserts into the readings TimescaleDB hypertable.

  5. Failed batches land in a dead-letter queue (tenant:{id}:agent:devices:dlq). A circuit breaker and local disk spool prevent data loss if the database is temporarily unreachable.

Query path (dashed)

Dashboards and API clients call read endpoints (GET /api/v1/readings/:uuid/timeseries, etc.) over HTTPS. The Cloud API queries TimescaleDB directly — continuous aggregates serve pre-computed hourly and daily rollups, keeping response times fast even over long time ranges.

Control path

Agents poll GET /api/v1/device/:uuid/state over HTTPS to receive target state (which Docker containers should be running). The API serves ETag-cached responses so polling is cheap. Operators write target state through the dashboard or POST /api/v1/agents/:uuid/target-state.


MQTT Topic Structure

All MQTT topics are tenant-scoped and agent-scoped to prevent cross-tenant data leakage:

TopicDirectionContent
i/{T}/a/{A}/endpoints/{protocol}Agent → Broker → APISensor readings (Modbus, OPC-UA, BACnet, etc.)
i/{T}/a/{A}/stateAgent → Broker → APIFull device state: apps, config, resource usage
i/{T}/a/{A}/metricsAgent → Broker → APISystem metrics: CPU, memory, storage, temperature
i/{T}/a/{A}/events/{type}Agent → Broker → APIAnomaly events, alerts
i/{T}/a/{A}/logs/{container}Agent → Broker → APIContainer log streams
i/{T}/a/{A}/agent/{type}Agent → Broker → APIAgent lifecycle: version, startup, shutdown
i/{T}/a/{A}/jobs/{jobId}/updateAgent → Broker → APIJob execution status
i/{T}/a/{A}/jobs/start-nextAgent → Broker → APIAgent requests next queued job

{T} = Base64 URL-safe encoded tenant ID, {A} = Base64 URL-safe encoded agent UUID.


Message Payload Formats

The API auto-detects the payload format in priority order:

FormatDetectionNotes
DEFLATE compressedByte header 0x78 0x9C (or 0x01, 0x5E, 0xDA)Outer wrapper, payload inside may be msgpack or JSON
MessagePack binaryFirst byte in 0x90–0x9F, 0xDC–0xDD, 0x80–0x8F rangeCompact binary representation of JSON-like structure
JSONDefault fallbackUTF-8 string

An optional key compaction mode (USE_KEY_COMPACTION_POC=true) allows agents to publish field names as integer indices instead of strings, with a shared dictionary stored in Redis (dict:{agentUuid}). This is most beneficial on very low-bandwidth links.


Stack

LayerTechnology
API frameworkFastify 5 + TypeScript
Transport (agents)MQTT v5 (Mosquitto broker, QoS 1)
Time-series DBPostgreSQL + TimescaleDB (hypertables, continuous aggregates, compression)
Message queueRedis Streams (ioredis, consumer groups)
Real-time eventsRedis Pub/Sub
AuthJWT (HS256 / RS256) + API key per device
MetricsPrometheus-compatible /metrics endpoint
LoggingPino structured JSON

Multi-Tenancy

All MQTT topics, Redis keys, and database queries are scoped to a tenant ID:

# MQTT topics
i/{encodedTenantId}/a/{encodedAgentUuid}/endpoints/modbus

# Redis streams
tenant:{tenantId}:agent:devices:ingestion ← telemetry write stream
tenant:{tenantId}:agent:devices:dlq ← dead-letter queue
tenant:{tenantId}:agent:logs ← log stream

# Redis pub/sub
tenant:{tenantId}:device:{uuid}:state ← real-time state updates
tenant:{tenantId}:device:{uuid}:metrics ← real-time metrics

Redis hash tags ({tenantId}) pin related keys to the same cluster slot for atomic operations. MQTT ACLs in the broker are per-tenant-user so subscribers cannot cross tenant boundaries at the broker level.


TimescaleDB Schema Highlights

The readings table is the core of the time-series store:

CREATE TABLE readings (
time timestamptz NOT NULL,
agent_uuid uuid NOT NULL,
metric_name text NOT NULL,
value double precision,
quality text DEFAULT 'good',
unit text,
protocol text NOT NULL, -- modbus, opcua, mqtt, …
extra jsonb DEFAULT '{}', -- endpoint_uuid, device_name, …
anomaly_score double precision,
anomaly_threshold double precision,
PRIMARY KEY (agent_uuid, metric_name, time)
);

TimescaleDB policies applied automatically:

  • Hypertable — 1-day chunks, automatic partition management
  • Compression — auto-compress chunks older than 7 days (~90 % storage saving)
  • Retention — auto-drop chunks older than 730 days
  • Continuous aggregatesreadings_1m, readings_1h, readings_daily pre-computed

Authentication

See Authentication for details on JWT tokens, device API keys, and MQTT credentials.

Endpoints

See REST Endpoints for the full request/response reference.

Ingestion Pipeline

See Ingestion Pipeline for stream consumer internals, resilience, and autoscaling.