Skip to main content

Data Publishing

Subscriptions define what data goes where — the data publishing layer handles the actual delivery. This page explains how sensor readings travel from a connected endpoint all the way to a cloud destination, what happens when connectivity is lost, and how to tune the pipeline for constrained devices and high-reliability deployments.


The Publishing Pipeline

Every message follows the same path from the moment a sensor delivers bytes to the agent:

Endpoint (Modbus, OPC-UA, etc.)
│ raw bytes over TCP/RTU/UDP socket

MessageBatcher ← frame reassembly, batch accumulation
│ complete batch

PayloadCompressor ← JSON / msgpack / deflate per subscription


Subscription router ← matches endpoint to one or more destinations

├─► Iotistica MQTT ─────────────┐
├─► Azure IoT Hub │ online: publish directly
├─► AWS IoT Core │ offline: write to SQLite buffer
├─► GCP IoT Core │
├─► Generic MQTT │
└─► InfluxDB ──────────────────┘

Each step runs inside the agent process. No external services are required on the device except the destination broker itself.


MQTT Topic Structure

Every publish lands on a topic with the following pattern:

i/{tenantId}/a/{agentId}/endpoints/{mqttTopic}

The tenant and agent UUIDs are encoded as 22-character Base64 URL-safe strings (instead of the standard 36-character hyphenated form) to reduce topic length on constrained links:

i/ABC123xyz789uvw0/a/DEF456uvwxyz1234/endpoints/energy-meter

The mqttTopic segment is the value you set in the MQTT Topic field when configuring an endpoint in the admin UI. It can use slashes to create a hierarchy, for example plant-a/line-3/energy-meter.

Heartbeat messages use a separate topic configured per endpoint:

i/{tenantId}/a/{agentId}/endpoints/{heartbeatTopic}

Payload Formats

Each subscription can use one of three payload formats. The format is set per subscription and controls what the JSON sent to the destination looks like. You can route the same endpoint to two different destinations with different formats simultaneously.

custom — Default

The full batch of readings is published as a single envelope. This is the richest format and preserves all protocol metadata.

{
"timestamp": "2024-01-15T10:30:00.123Z",
"protocol": "modbus",
"msgId": "v2_a1b2c3d4_e5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0c1d2e3f4a5b6c7d8e9f0a1b2c3d4",
"messages": [
{
"readings": [
{
"deviceName": "energy-meter-1",
"deviceId": "meter-stable-id-001",
"metric": "voltage_l1",
"value": 231.4,
"unit": "V",
"timestamp": "2024-01-15T10:29:59.950Z",
"quality": "GOOD"
},
{
"deviceName": "energy-meter-1",
"deviceId": "meter-stable-id-001",
"metric": "current_l1",
"value": 12.3,
"unit": "A",
"timestamp": "2024-01-15T10:29:59.950Z",
"quality": "GOOD"
},
{
"deviceName": "energy-meter-1",
"deviceId": "meter-stable-id-001",
"metric": "power_factor",
"value": 0.97,
"unit": "",
"timestamp": "2024-01-15T10:29:59.950Z",
"quality": "GOOD"
}
]
}
]
}

When a register or node read fails, the affected data point has quality: "BAD", a null value, and a qualityCode that identifies the reason:

{
"deviceName": "energy-meter-1",
"deviceId": "meter-stable-id-001",
"metric": "voltage_l1",
"value": null,
"unit": "V",
"timestamp": "2024-01-15T10:30:00.123Z",
"quality": "BAD",
"qualityCode": "TIMEOUT"
}

Quality codes

CodeMeaning
TIMEOUTDevice did not respond within the configured timeout
CONNECTION_REFUSEDTCP connection rejected
HOST_NOT_FOUNDDevice hostname could not be resolved
DEVICE_OFFLINESerial port not open or device not reachable
DEVICE_BUSYModbus exception 6 — device processing a previous request
BAD_DATAResponse received but could not be decoded
READ_ERRORGeneric read failure

OPC-UA data points use the same structure but include additional protocol fields:

{
"deviceName": "plc-line-3",
"deviceId": "opcua-stable-id-003",
"metric": "Motor1.Speed",
"value": 1450.5,
"unit": "rpm",
"timestamp": "2024-01-15T10:30:00.050Z",
"quality": "GOOD",
"protocol": "opcua",
"nodeType": "metric"
}

tags — Flattened Tag List

A compact format that flattens all readings into a flat tags array. Use this when the destination expects a tag-based data model (SCADA historians, tag databases).

{
"timestamp": 1705311000123,
"node": "energy-meter-1",
"group": "energy-meter",
"tags": [
{ "name": "voltage_l1", "value": 231.4 },
{ "name": "current_l1", "value": 12.3 },
{ "name": "power_factor", "value": 0.97 }
]
}
  • group is derived from the endpoint name.
  • node is the device name reported by the protocol (e.g., the Modbus device name or OPC-UA device node). Falls back to the endpoint name if the device does not report one.
  • timestamp is a Unix millisecond epoch, not an ISO string.

When a read fails, the tag carries an error field instead of a value:

{ "name": "voltage_l1", "error": "TIMEOUT" }

ecp — Edge Computing Protocol

A compact typed format designed for edge platforms that need explicit type information to correctly parse values. Extends the tags shape with a numeric type discriminator.

{
"timestamp": 1705311000123,
"node": "energy-meter-1",
"group": "energy-meter",
"tags": [
{ "name": "voltage_l1", "value": 231.4, "type": 3 },
{ "name": "current_l1", "value": 12.3, "type": 3 },
{ "name": "phase_ok", "value": true, "type": 1 },
{ "name": "fault_code", "value": 0, "type": 2 },
{ "name": "serial_no", "value": "SN0042", "type": 4 }
]
}

Type codes

CodeTypeExample value
1Booleantrue, false
2Integer0, 42, -7
3Float231.4, 0.97
4String"SN0042", "RUNNING"

Unlike the tags format, ECP silently drops any tag that has a BAD quality code or a null value. The payload only contains data points with valid readings.


Format Comparison

Featurecustomtagsecp
Protocol metadata preservedYesNoNo
Timestamp formatISO 8601Unix msUnix ms
BAD quality representationquality:"BAD" + qualityCodeerror fieldTag omitted
Type informationInferred from JSONNoneExplicit type field
Null values includedYesYesNo
Best forCloud analytics, debuggingSCADA historiansTyped edge platforms

Anomaly Enrichment

When anomaly detection is enabled, the agent enriches each data point in the custom format with its local ML scores before publishing. These fields are added inline to each reading:

{
"deviceName": "energy-meter-1",
"metric": "voltage_l1",
"value": 228.1,
"unit": "V",
"quality": "GOOD",
"anomaly_score": 0.73,
"anomaly_threshold": 0.65,
"baseline_samples": 1440,
"detection_methods": ["zscore", "mad"],
"predicted_next": 229.5,
"trend": "rising",
"trend_strength": 0.42,
"forecast_confidence": 0.87,
"time_to_threshold": {
"threshold": 240.0,
"estimated_seconds": 3600,
"confidence": 0.72
}
}
FieldDescription
anomaly_score0.0–1.0 score from the edge detection model. Values above anomaly_threshold are flagged.
anomaly_thresholdThe learned threshold for this metric.
baseline_samplesNumber of readings used to build the current baseline.
detection_methodsWhich algorithms contributed to the score (zscore, mad, rate, ml).
predicted_nextModel's predicted value for the next reading.
trendDirection: rising, falling, or stable.
trend_strength0.0–1.0 strength of the trend.
forecast_confidenceConfidence in the prediction (0.0–1.0).
time_to_thresholdHow long until the value is predicted to breach a threshold, if applicable.

Anomaly fields are only added when the detection model has enough baseline samples (default: 20 batches warmup period). During warmup the fields are absent.


Heartbeat Message Format

When a heartbeat topic is configured on an endpoint, the agent publishes a health signal at the configured interval (default: every 5 minutes). Heartbeat messages are JSON and use the custom encoding (not affected by subscription compression settings):

{
"endpoint": "energy-meter",
"timestamp": "2024-01-15T10:30:00.123Z",
"state": "CONNECTED",
"stats": {
"messagesReceived": 1440,
"messagesPublished": 1438,
"bytesReceived": 98304,
"bytesPublished": 245760,
"reconnectAttempts": 1,
"lastPublishTime": "2024-01-15T10:29:55.000Z",
"lastHeartbeatTime": "2024-01-15T10:25:00.000Z",
"lastConnectedTime": "2024-01-15T08:00:00.000Z"
}
}
FieldDescription
stateEndpoint connection state: CONNECTED, DISCONNECTED, CONNECTING, ERROR
messagesReceivedTotal frames received from the device since agent start
messagesPublishedTotal batches published since agent start
reconnectAttemptsTotal reconnect attempts since agent start

Message Deduplication (msgId)

Every payload in custom format carries a msgId field. This ID is stable — if a message is written to the offline buffer and replayed after a reconnect, the same msgId is preserved in the replayed payload. The cloud can use this ID to detect and deduplicate messages that were published more than once due to a network failure mid-flush.

The msgId format is derived from the device API key and a monotonically increasing sequence:

v2_{kid}_{sequence}

Where kid is the 8-character key identifier from the device's v2 API key, and sequence is an agent-local counter that increments with each message.


Frame Reassembly and Batching

Industrial protocols deliver bytes as a stream, not as discrete messages. The MessageBatcher handles frame reconstruction:

  1. Bytes arrive in chunks from the socket (TCP segments, serial reads, etc.).
  2. Chunks are accumulated in a raw byte buffer until an end-of-message delimiter is detected.
  3. Once a complete frame is found, it is parsed as JSON and added to the message batch.

Batch Flush Triggers

A batch is dispatched to the publish step when any of these conditions is true:

TriggerConfiguration fieldDefault
No buffering configuredFlush each message immediately
Batch size reachedbufferSize0 (disabled)
Buffer time elapsedbufferTimeMs0 ms (disabled)
Safety limit: message counthard-coded10,000 messages
Safety limit: memoryhard-coded5% of heap, max 10 MB
Endpoint disconnectedFlush partial batch on disconnect

The safety limits fire before the process runs out of memory on a malfunctioning device that sends data faster than it can be published.

Configuring Batching

In the admin UI, open an Endpoint and set the batching fields:

FieldDescription
Buffer sizeAccumulate this many messages before publishing. Use this on endpoints that produce high-frequency small readings (e.g., 10 Hz vibration sensors) to amortize MQTT round-trips.
Buffer time (ms)Publish every N milliseconds regardless of batch size. Use alongside buffer size so slow endpoints still flush periodically.

Setting both to zero publishes every message individually as it arrives — lowest latency, highest overhead.


Offline Buffering

When the agent cannot reach a destination — the MQTT broker is down, the cellular link dropped, the cloud is unreachable — no data is lost. Instead, every message that cannot be delivered is written to a local SQLite database on the device and retried once the connection returns.

This is the same store-and-forward pattern used by AWS IoT Greengrass and Azure IoT Edge, implemented natively inside the agent.

How It Works

Agent attempts to publish


Is the connection up and in direct mode?
YES ──► Publish over network immediately
NO ──► Write to SQLite offline queue


[connection restores]


Buffer sync wakes up


Read batch from SQLite (100 records)

├─ Published OK ──► Delete record from queue
├─ Publish failed ─► Mark for retry with backoff
└─ Retry exceeded ─► Drop record, increment counter

The offline queue is completely transparent to subscriptions and plugins — they call publish() and the buffer layer intercepts the call if needed.

Flush Triggers

The buffer sync service wakes up and drains the queue in three situations:

EventAction
MQTT connects or reconnectsImmediate flush — starts draining as soon as the link is up
Periodic timerAttempts a flush every 30 seconds if connected
Queue depth thresholdTriggers an immediate flush when the queue exceeds 1,000 messages

Retry Backoff

When a message fails to publish (network blip during flush), it is retried with increasing delays:

Retry attemptWait before retry
1st retry5 seconds
2nd retry30 seconds
3rd retry5 minutes
After 3 failuresMessage dropped

This prevents a single failing destination from blocking the queue indefinitely.

Queue Capacity and Backpressure

The offline queue has a hard cap to protect device storage:

LimitDefaultDescription
Max records10,000Hard limit on queued message count
Max retries3Messages exceeding this are dropped
Cleanup interval1 hourExpired records are removed periodically

When the queue is full, the agent applies a drop policy:

PolicyBehaviour
oldest (default)The oldest messages are evicted to make room for new ones. You always have the latest data even during a long outage.
newestNew messages are rejected until the queue drains. Older data is preserved intact.
errorAn error is thrown; callers must handle the full queue condition.

Critical messages (alerts and events) are exempt from ordinary pressure handling. When a critical message arrives and the queue is full under the oldest policy, the oldest non-critical records are evicted first to make room. This ensures alarm data is never silently dropped.

Message Deduplication

Each message buffered offline retains its original message ID. When the buffer drains, the original ID is restored in the replay payload. The cloud can use this to detect and discard any duplicates that might arrive if a partial flush was interrupted — for example, if the device reboots mid-drain.

Inspecting the Buffer

You can check the current state of the offline queue at any time:

GET /v1/buffer/status

The response includes the current record count, total bytes, oldest record timestamp, and the number of messages dropped since startup. Use this endpoint to monitor outage depth and drain progress.


Multi-Destination Buffering

Each configured destination gets its own isolated offline buffer. If your Iotistica MQTT destination is offline but your local InfluxDB destination is healthy, messages continue to reach InfluxDB in real time while only the Iotistica copies queue up. The two destinations do not share a buffer or interfere with each other.

A single endpoint can feed multiple destinations simultaneously through subscriptions. Each subscription path drains independently when its destination comes back online.


Payload Compression

On constrained cellular or satellite links, payload size matters. Each subscription can specify an independent compression strategy:

StrategyWhen to use
json (default)Full JSON. Readable in any tool. No overhead.
msgpackBinary serialisation — 15–40% smaller than JSON with minimal CPU. Good for mid-range links.
json+deflateJSON then compressed with zlib deflate — best ratio for large payloads. Only fires when payload > 4 KB and CPU < 70%.
msgpack+deflateBinary then compressed — smallest payloads, highest CPU. Use only when bandwidth is the binding constraint.

The agent also supports dictionary compression — a shared key-shortening map built from frequently observed field names. This is most effective on endpoints that send the same set of fields in every message (e.g., a Modbus device with fixed register layout).

Adaptive Compression

Deflate compression is skipped automatically when:

  • The payload is smaller than 4 KB (compression overhead would exceed savings).
  • CPU load is above 70% (protecting latency-sensitive workloads on embedded devices).

Every 1,000th publish is sent as raw JSON regardless of the configured strategy. This baseline measurement lets the agent track actual bandwidth savings so you can verify the compression setting is working as intended.


Endpoint Heartbeats

Each endpoint can be configured with a heartbeat — a periodic health signal published to a separate MQTT topic. The heartbeat fires at a fixed interval (default: 300 seconds) and includes the endpoint's current connection state and cumulative statistics.

Heartbeats are published only when the endpoint is connected and MQTT is available. They are automatically silenced during outages and resume as soon as both conditions are met.

Use heartbeats to detect silent failures — an endpoint that stops producing data is different from an endpoint that stopped connecting. The heartbeat catches the latter even when no real data is flowing.

To configure it, set the Heartbeat topic field on the endpoint. Leave it blank to disable heartbeats for that endpoint.


Schema Drift Detection

For endpoints where data consistency is critical, the agent continuously monitors the schema of arriving payloads and alerts when it changes unexpectedly.

What It Detects

Drift typeExampleSeverity
Missing fieldA Modbus device stopped sending temperatureWarning → Critical
Field renamedtemp became temperatureWarning
Type changevoltage changed from number to stringWarning

How It Works

The drift detector observes the first 20 batches as a warmup period to build the expected schema baseline — which fields are present, how often (presence ratio), and what types they carry. After warmup, every incoming batch is compared to the baseline.

A field must appear in at least 50% of batches before it is considered "expected". A field that disappears from 10 consecutive batches triggers an alert. Alerts for the same field are suppressed for 30 minutes after the first fire to prevent alert storms.

The schema adapts over time:

  • A field seen in 60% of the last 50 batches is promoted to expected.
  • A field missing from the last 250 consecutive batches is retired from the baseline.

Drift events are stored in SQLite and can be reviewed in the admin UI.


Publish Modes

The entire publish pipeline operates in one of three modes at any given time:

ModeMeaning
directConnected and healthy. Messages are published over the network immediately.
buffer-onlyDisconnected. All publishes are written to the SQLite offline queue.
recoveringJust reconnected. New messages are sent directly while the offline queue drains in the background.

The mode switches automatically based on connection health. Transitions are logged and visible in the dashboard. You never need to manually switch modes or restart the agent after a network outage.


Per-Endpoint vs Per-Destination Buffering

There are actually two separate buffering layers in the agent:

LayerWhat it buffersPurpose
Device publish bufferRaw messages from protocol endpointsHandles the device-to-agent edge (slow/lossy device connections)
Destination offline bufferMQTT payloads bound for cloud destinationsHandles agent-to-cloud outages

Most users only interact with the destination offline buffer. The device publish buffer is relevant for endpoints that disconnect frequently (e.g., a serial device that resets periodically) — the in-memory batch for that endpoint is flushed before the connection closes so no in-flight readings are lost.


Configuration Reference

Endpoint-level settings (admin UI → Endpoints)

FieldDefaultDescription
Poll interval (s)10How often the agent reads from the device
Buffer size0Number of messages to accumulate before publishing
Buffer time (ms)0Maximum age of a batch before it is published
Buffer capacity (bytes)1 MBMaximum raw byte buffer for frame reassembly
Heartbeat topicMQTT topic for periodic health signal. Leave blank to disable.
Heartbeat interval (s)300How often to publish the heartbeat

Subscription-level settings (admin UI → Subscriptions)

FieldDefaultDescription
CompressionjsonPayload encoding: json, msgpack, json+deflate, msgpack+deflate

Offline buffer settings (code defaults)

These are currently set at deployment time and are not exposed in the admin UI:

ParameterDefaultDescription
flushBatchSize100 recordsRecords processed per flush batch
flushIntervalMs30,000 msHow often the periodic flush runs
maxRetries3Retries before a message is dropped
maxBufferRecords10,000Hard queue size limit
dropPolicyoldestWhat to drop when the queue is full
flushTriggerThreshold1,000Queue depth that triggers an immediate flush
maxFlushPerCycle1,000Records drained in a single flush run
cleanupIntervalMs3,600,000 ms (1 h)How often expired records are removed

Operating Fully Offline

The agent is designed to run without any cloud connection indefinitely. When no destination is reachable:

  • Protocol endpoints continue polling and collecting data normally.
  • Anomaly detection continues processing every batch locally.
  • Heartbeats stop (MQTT is unavailable) but the detection intervals continue.
  • All data accumulates in the SQLite offline queue, up to the configured capacity.
  • When the queue fills, the oldest records are evicted so the device always holds the most recent data.
  • The moment any destination reconnects, the queue drains automatically with no manual intervention.

There is no restart required. The agent handles reconnection at the transport layer and the queued data flows as soon as the link is established.


  • Endpoints — connecting to industrial devices (polling, addressing)
  • Destinations — configuring cloud publish targets
  • Subscriptions — routing endpoint data to destinations
  • Alerts — anomaly detection and alerting
  • Cloud Sync — agent-to-cloud state sync (separate from sensor data publishing)