Data Publishing
Subscriptions define what data goes where — the data publishing layer handles the actual delivery. This page explains how sensor readings travel from a connected endpoint all the way to a cloud destination, what happens when connectivity is lost, and how to tune the pipeline for constrained devices and high-reliability deployments.
The Publishing Pipeline
Every message follows the same path from the moment a sensor delivers bytes to the agent:
Endpoint (Modbus, OPC-UA, etc.)
│ raw bytes over TCP/RTU/UDP socket
▼
MessageBatcher ← frame reassembly, batch accumulation
│ complete batch
▼
PayloadCompressor ← JSON / msgpack / deflate per subscription
│
▼
Subscription router ← matches endpoint to one or more destinations
│
├─► Iotistica MQTT ─────────────┐
├─► Azure IoT Hub │ online: publish directly
├─► AWS IoT Core │ offline: write to SQLite buffer
├─► GCP IoT Core │
├─► Generic MQTT │
└─► InfluxDB ──────────────────┘
Each step runs inside the agent process. No external services are required on the device except the destination broker itself.
MQTT Topic Structure
Every publish lands on a topic with the following pattern:
i/{tenantId}/a/{agentId}/endpoints/{mqttTopic}
The tenant and agent UUIDs are encoded as 22-character Base64 URL-safe strings (instead of the standard 36-character hyphenated form) to reduce topic length on constrained links:
i/ABC123xyz789uvw0/a/DEF456uvwxyz1234/endpoints/energy-meter
The mqttTopic segment is the value you set in the MQTT Topic field when configuring an endpoint in the admin UI. It can use slashes to create a hierarchy, for example plant-a/line-3/energy-meter.
Heartbeat messages use a separate topic configured per endpoint:
i/{tenantId}/a/{agentId}/endpoints/{heartbeatTopic}
Payload Formats
Each subscription can use one of three payload formats. The format is set per subscription and controls what the JSON sent to the destination looks like. You can route the same endpoint to two different destinations with different formats simultaneously.
custom — Default
The full batch of readings is published as a single envelope. This is the richest format and preserves all protocol metadata.
{
"timestamp": "2024-01-15T10:30:00.123Z",
"protocol": "modbus",
"msgId": "v2_a1b2c3d4_e5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0c1d2e3f4a5b6c7d8e9f0a1b2c3d4",
"messages": [
{
"readings": [
{
"deviceName": "energy-meter-1",
"deviceId": "meter-stable-id-001",
"metric": "voltage_l1",
"value": 231.4,
"unit": "V",
"timestamp": "2024-01-15T10:29:59.950Z",
"quality": "GOOD"
},
{
"deviceName": "energy-meter-1",
"deviceId": "meter-stable-id-001",
"metric": "current_l1",
"value": 12.3,
"unit": "A",
"timestamp": "2024-01-15T10:29:59.950Z",
"quality": "GOOD"
},
{
"deviceName": "energy-meter-1",
"deviceId": "meter-stable-id-001",
"metric": "power_factor",
"value": 0.97,
"unit": "",
"timestamp": "2024-01-15T10:29:59.950Z",
"quality": "GOOD"
}
]
}
]
}
When a register or node read fails, the affected data point has quality: "BAD", a null value, and a qualityCode that identifies the reason:
{
"deviceName": "energy-meter-1",
"deviceId": "meter-stable-id-001",
"metric": "voltage_l1",
"value": null,
"unit": "V",
"timestamp": "2024-01-15T10:30:00.123Z",
"quality": "BAD",
"qualityCode": "TIMEOUT"
}
Quality codes
| Code | Meaning |
|---|---|
TIMEOUT | Device did not respond within the configured timeout |
CONNECTION_REFUSED | TCP connection rejected |
HOST_NOT_FOUND | Device hostname could not be resolved |
DEVICE_OFFLINE | Serial port not open or device not reachable |
DEVICE_BUSY | Modbus exception 6 — device processing a previous request |
BAD_DATA | Response received but could not be decoded |
READ_ERROR | Generic read failure |
OPC-UA data points use the same structure but include additional protocol fields:
{
"deviceName": "plc-line-3",
"deviceId": "opcua-stable-id-003",
"metric": "Motor1.Speed",
"value": 1450.5,
"unit": "rpm",
"timestamp": "2024-01-15T10:30:00.050Z",
"quality": "GOOD",
"protocol": "opcua",
"nodeType": "metric"
}
tags — Flattened Tag List
A compact format that flattens all readings into a flat tags array. Use this when the destination expects a tag-based data model (SCADA historians, tag databases).
{
"timestamp": 1705311000123,
"node": "energy-meter-1",
"group": "energy-meter",
"tags": [
{ "name": "voltage_l1", "value": 231.4 },
{ "name": "current_l1", "value": 12.3 },
{ "name": "power_factor", "value": 0.97 }
]
}
groupis derived from the endpoint name.nodeis the device name reported by the protocol (e.g., the Modbus device name or OPC-UA device node). Falls back to the endpoint name if the device does not report one.timestampis a Unix millisecond epoch, not an ISO string.
When a read fails, the tag carries an error field instead of a value:
{ "name": "voltage_l1", "error": "TIMEOUT" }
ecp — Edge Computing Protocol
A compact typed format designed for edge platforms that need explicit type information to correctly parse values. Extends the tags shape with a numeric type discriminator.
{
"timestamp": 1705311000123,
"node": "energy-meter-1",
"group": "energy-meter",
"tags": [
{ "name": "voltage_l1", "value": 231.4, "type": 3 },
{ "name": "current_l1", "value": 12.3, "type": 3 },
{ "name": "phase_ok", "value": true, "type": 1 },
{ "name": "fault_code", "value": 0, "type": 2 },
{ "name": "serial_no", "value": "SN0042", "type": 4 }
]
}
Type codes
| Code | Type | Example value |
|---|---|---|
1 | Boolean | true, false |
2 | Integer | 0, 42, -7 |
3 | Float | 231.4, 0.97 |
4 | String | "SN0042", "RUNNING" |
Unlike the tags format, ECP silently drops any tag that has a BAD quality code or a null value. The payload only contains data points with valid readings.
Format Comparison
| Feature | custom | tags | ecp |
|---|---|---|---|
| Protocol metadata preserved | Yes | No | No |
| Timestamp format | ISO 8601 | Unix ms | Unix ms |
| BAD quality representation | quality:"BAD" + qualityCode | error field | Tag omitted |
| Type information | Inferred from JSON | None | Explicit type field |
| Null values included | Yes | Yes | No |
| Best for | Cloud analytics, debugging | SCADA historians | Typed edge platforms |
Anomaly Enrichment
When anomaly detection is enabled, the agent enriches each data point in the custom format with its local ML scores before publishing. These fields are added inline to each reading:
{
"deviceName": "energy-meter-1",
"metric": "voltage_l1",
"value": 228.1,
"unit": "V",
"quality": "GOOD",
"anomaly_score": 0.73,
"anomaly_threshold": 0.65,
"baseline_samples": 1440,
"detection_methods": ["zscore", "mad"],
"predicted_next": 229.5,
"trend": "rising",
"trend_strength": 0.42,
"forecast_confidence": 0.87,
"time_to_threshold": {
"threshold": 240.0,
"estimated_seconds": 3600,
"confidence": 0.72
}
}
| Field | Description |
|---|---|
anomaly_score | 0.0–1.0 score from the edge detection model. Values above anomaly_threshold are flagged. |
anomaly_threshold | The learned threshold for this metric. |
baseline_samples | Number of readings used to build the current baseline. |
detection_methods | Which algorithms contributed to the score (zscore, mad, rate, ml). |
predicted_next | Model's predicted value for the next reading. |
trend | Direction: rising, falling, or stable. |
trend_strength | 0.0–1.0 strength of the trend. |
forecast_confidence | Confidence in the prediction (0.0–1.0). |
time_to_threshold | How long until the value is predicted to breach a threshold, if applicable. |
Anomaly fields are only added when the detection model has enough baseline samples (default: 20 batches warmup period). During warmup the fields are absent.
Heartbeat Message Format
When a heartbeat topic is configured on an endpoint, the agent publishes a health signal at the configured interval (default: every 5 minutes). Heartbeat messages are JSON and use the custom encoding (not affected by subscription compression settings):
{
"endpoint": "energy-meter",
"timestamp": "2024-01-15T10:30:00.123Z",
"state": "CONNECTED",
"stats": {
"messagesReceived": 1440,
"messagesPublished": 1438,
"bytesReceived": 98304,
"bytesPublished": 245760,
"reconnectAttempts": 1,
"lastPublishTime": "2024-01-15T10:29:55.000Z",
"lastHeartbeatTime": "2024-01-15T10:25:00.000Z",
"lastConnectedTime": "2024-01-15T08:00:00.000Z"
}
}
| Field | Description |
|---|---|
state | Endpoint connection state: CONNECTED, DISCONNECTED, CONNECTING, ERROR |
messagesReceived | Total frames received from the device since agent start |
messagesPublished | Total batches published since agent start |
reconnectAttempts | Total reconnect attempts since agent start |
Message Deduplication (msgId)
Every payload in custom format carries a msgId field. This ID is stable — if a message is written to the offline buffer and replayed after a reconnect, the same msgId is preserved in the replayed payload. The cloud can use this ID to detect and deduplicate messages that were published more than once due to a network failure mid-flush.
The msgId format is derived from the device API key and a monotonically increasing sequence:
v2_{kid}_{sequence}
Where kid is the 8-character key identifier from the device's v2 API key, and sequence is an agent-local counter that increments with each message.
Frame Reassembly and Batching
Industrial protocols deliver bytes as a stream, not as discrete messages. The MessageBatcher handles frame reconstruction:
- Bytes arrive in chunks from the socket (TCP segments, serial reads, etc.).
- Chunks are accumulated in a raw byte buffer until an end-of-message delimiter is detected.
- Once a complete frame is found, it is parsed as JSON and added to the message batch.
Batch Flush Triggers
A batch is dispatched to the publish step when any of these conditions is true:
| Trigger | Configuration field | Default |
|---|---|---|
| No buffering configured | — | Flush each message immediately |
| Batch size reached | bufferSize | 0 (disabled) |
| Buffer time elapsed | bufferTimeMs | 0 ms (disabled) |
| Safety limit: message count | hard-coded | 10,000 messages |
| Safety limit: memory | hard-coded | 5% of heap, max 10 MB |
| Endpoint disconnected | — | Flush partial batch on disconnect |
The safety limits fire before the process runs out of memory on a malfunctioning device that sends data faster than it can be published.
Configuring Batching
In the admin UI, open an Endpoint and set the batching fields:
| Field | Description |
|---|---|
| Buffer size | Accumulate this many messages before publishing. Use this on endpoints that produce high-frequency small readings (e.g., 10 Hz vibration sensors) to amortize MQTT round-trips. |
| Buffer time (ms) | Publish every N milliseconds regardless of batch size. Use alongside buffer size so slow endpoints still flush periodically. |
Setting both to zero publishes every message individually as it arrives — lowest latency, highest overhead.
Offline Buffering
When the agent cannot reach a destination — the MQTT broker is down, the cellular link dropped, the cloud is unreachable — no data is lost. Instead, every message that cannot be delivered is written to a local SQLite database on the device and retried once the connection returns.
This is the same store-and-forward pattern used by AWS IoT Greengrass and Azure IoT Edge, implemented natively inside the agent.
How It Works
Agent attempts to publish
│
▼
Is the connection up and in direct mode?
YES ──► Publish over network immediately
NO ──► Write to SQLite offline queue
│
▼
[connection restores]
│
▼
Buffer sync wakes up
│
▼
Read batch from SQLite (100 records)
│
├─ Published OK ──► Delete record from queue
├─ Publish failed ─► Mark for retry with backoff
└─ Retry exceeded ─► Drop record, increment counter
The offline queue is completely transparent to subscriptions and plugins — they call publish() and the buffer layer intercepts the call if needed.
Flush Triggers
The buffer sync service wakes up and drains the queue in three situations:
| Event | Action |
|---|---|
| MQTT connects or reconnects | Immediate flush — starts draining as soon as the link is up |
| Periodic timer | Attempts a flush every 30 seconds if connected |
| Queue depth threshold | Triggers an immediate flush when the queue exceeds 1,000 messages |
Retry Backoff
When a message fails to publish (network blip during flush), it is retried with increasing delays:
| Retry attempt | Wait before retry |
|---|---|
| 1st retry | 5 seconds |
| 2nd retry | 30 seconds |
| 3rd retry | 5 minutes |
| After 3 failures | Message dropped |
This prevents a single failing destination from blocking the queue indefinitely.
Queue Capacity and Backpressure
The offline queue has a hard cap to protect device storage:
| Limit | Default | Description |
|---|---|---|
| Max records | 10,000 | Hard limit on queued message count |
| Max retries | 3 | Messages exceeding this are dropped |
| Cleanup interval | 1 hour | Expired records are removed periodically |
When the queue is full, the agent applies a drop policy:
| Policy | Behaviour |
|---|---|
oldest (default) | The oldest messages are evicted to make room for new ones. You always have the latest data even during a long outage. |
newest | New messages are rejected until the queue drains. Older data is preserved intact. |
error | An error is thrown; callers must handle the full queue condition. |
Critical messages (alerts and events) are exempt from ordinary pressure handling. When a critical message arrives and the queue is full under the oldest policy, the oldest non-critical records are evicted first to make room. This ensures alarm data is never silently dropped.
Message Deduplication
Each message buffered offline retains its original message ID. When the buffer drains, the original ID is restored in the replay payload. The cloud can use this to detect and discard any duplicates that might arrive if a partial flush was interrupted — for example, if the device reboots mid-drain.
Inspecting the Buffer
You can check the current state of the offline queue at any time:
GET /v1/buffer/status
The response includes the current record count, total bytes, oldest record timestamp, and the number of messages dropped since startup. Use this endpoint to monitor outage depth and drain progress.
Multi-Destination Buffering
Each configured destination gets its own isolated offline buffer. If your Iotistica MQTT destination is offline but your local InfluxDB destination is healthy, messages continue to reach InfluxDB in real time while only the Iotistica copies queue up. The two destinations do not share a buffer or interfere with each other.
A single endpoint can feed multiple destinations simultaneously through subscriptions. Each subscription path drains independently when its destination comes back online.
Payload Compression
On constrained cellular or satellite links, payload size matters. Each subscription can specify an independent compression strategy:
| Strategy | When to use |
|---|---|
json (default) | Full JSON. Readable in any tool. No overhead. |
msgpack | Binary serialisation — 15–40% smaller than JSON with minimal CPU. Good for mid-range links. |
json+deflate | JSON then compressed with zlib deflate — best ratio for large payloads. Only fires when payload > 4 KB and CPU < 70%. |
msgpack+deflate | Binary then compressed — smallest payloads, highest CPU. Use only when bandwidth is the binding constraint. |
The agent also supports dictionary compression — a shared key-shortening map built from frequently observed field names. This is most effective on endpoints that send the same set of fields in every message (e.g., a Modbus device with fixed register layout).
Adaptive Compression
Deflate compression is skipped automatically when:
- The payload is smaller than 4 KB (compression overhead would exceed savings).
- CPU load is above 70% (protecting latency-sensitive workloads on embedded devices).
Every 1,000th publish is sent as raw JSON regardless of the configured strategy. This baseline measurement lets the agent track actual bandwidth savings so you can verify the compression setting is working as intended.
Endpoint Heartbeats
Each endpoint can be configured with a heartbeat — a periodic health signal published to a separate MQTT topic. The heartbeat fires at a fixed interval (default: 300 seconds) and includes the endpoint's current connection state and cumulative statistics.
Heartbeats are published only when the endpoint is connected and MQTT is available. They are automatically silenced during outages and resume as soon as both conditions are met.
Use heartbeats to detect silent failures — an endpoint that stops producing data is different from an endpoint that stopped connecting. The heartbeat catches the latter even when no real data is flowing.
To configure it, set the Heartbeat topic field on the endpoint. Leave it blank to disable heartbeats for that endpoint.
Schema Drift Detection
For endpoints where data consistency is critical, the agent continuously monitors the schema of arriving payloads and alerts when it changes unexpectedly.
What It Detects
| Drift type | Example | Severity |
|---|---|---|
| Missing field | A Modbus device stopped sending temperature | Warning → Critical |
| Field renamed | temp became temperature | Warning |
| Type change | voltage changed from number to string | Warning |
How It Works
The drift detector observes the first 20 batches as a warmup period to build the expected schema baseline — which fields are present, how often (presence ratio), and what types they carry. After warmup, every incoming batch is compared to the baseline.
A field must appear in at least 50% of batches before it is considered "expected". A field that disappears from 10 consecutive batches triggers an alert. Alerts for the same field are suppressed for 30 minutes after the first fire to prevent alert storms.
The schema adapts over time:
- A field seen in 60% of the last 50 batches is promoted to expected.
- A field missing from the last 250 consecutive batches is retired from the baseline.
Drift events are stored in SQLite and can be reviewed in the admin UI.
Publish Modes
The entire publish pipeline operates in one of three modes at any given time:
| Mode | Meaning |
|---|---|
direct | Connected and healthy. Messages are published over the network immediately. |
buffer-only | Disconnected. All publishes are written to the SQLite offline queue. |
recovering | Just reconnected. New messages are sent directly while the offline queue drains in the background. |
The mode switches automatically based on connection health. Transitions are logged and visible in the dashboard. You never need to manually switch modes or restart the agent after a network outage.
Per-Endpoint vs Per-Destination Buffering
There are actually two separate buffering layers in the agent:
| Layer | What it buffers | Purpose |
|---|---|---|
| Device publish buffer | Raw messages from protocol endpoints | Handles the device-to-agent edge (slow/lossy device connections) |
| Destination offline buffer | MQTT payloads bound for cloud destinations | Handles agent-to-cloud outages |
Most users only interact with the destination offline buffer. The device publish buffer is relevant for endpoints that disconnect frequently (e.g., a serial device that resets periodically) — the in-memory batch for that endpoint is flushed before the connection closes so no in-flight readings are lost.
Configuration Reference
Endpoint-level settings (admin UI → Endpoints)
| Field | Default | Description |
|---|---|---|
| Poll interval (s) | 10 | How often the agent reads from the device |
| Buffer size | 0 | Number of messages to accumulate before publishing |
| Buffer time (ms) | 0 | Maximum age of a batch before it is published |
| Buffer capacity (bytes) | 1 MB | Maximum raw byte buffer for frame reassembly |
| Heartbeat topic | — | MQTT topic for periodic health signal. Leave blank to disable. |
| Heartbeat interval (s) | 300 | How often to publish the heartbeat |
Subscription-level settings (admin UI → Subscriptions)
| Field | Default | Description |
|---|---|---|
| Compression | json | Payload encoding: json, msgpack, json+deflate, msgpack+deflate |
Offline buffer settings (code defaults)
These are currently set at deployment time and are not exposed in the admin UI:
| Parameter | Default | Description |
|---|---|---|
flushBatchSize | 100 records | Records processed per flush batch |
flushIntervalMs | 30,000 ms | How often the periodic flush runs |
maxRetries | 3 | Retries before a message is dropped |
maxBufferRecords | 10,000 | Hard queue size limit |
dropPolicy | oldest | What to drop when the queue is full |
flushTriggerThreshold | 1,000 | Queue depth that triggers an immediate flush |
maxFlushPerCycle | 1,000 | Records drained in a single flush run |
cleanupIntervalMs | 3,600,000 ms (1 h) | How often expired records are removed |
Operating Fully Offline
The agent is designed to run without any cloud connection indefinitely. When no destination is reachable:
- Protocol endpoints continue polling and collecting data normally.
- Anomaly detection continues processing every batch locally.
- Heartbeats stop (MQTT is unavailable) but the detection intervals continue.
- All data accumulates in the SQLite offline queue, up to the configured capacity.
- When the queue fills, the oldest records are evicted so the device always holds the most recent data.
- The moment any destination reconnects, the queue drains automatically with no manual intervention.
There is no restart required. The agent handles reconnection at the transport layer and the queued data flows as soon as the link is established.
Related Docs
- Endpoints — connecting to industrial devices (polling, addressing)
- Destinations — configuring cloud publish targets
- Subscriptions — routing endpoint data to destinations
- Alerts — anomaly detection and alerting
- Cloud Sync — agent-to-cloud state sync (separate from sensor data publishing)