No description
| cmd/pulsar-iotbridge | ||
| internal | ||
| .gitignore | ||
| go.mod | ||
| go.sum | ||
| LICENSE | ||
| Makefile | ||
| README.md | ||
| TESTING.MD | ||
pulsar-iotbridge
High-performance MQTT → Apache Pulsar IoT bridge, written in Go.
It subscribes to all MQTT topics on a broker and mirrors every incoming message to a Pulsar topic, with:
- stable, battle-tested client libraries
- a fixed-size worker pool for publishing
- a safe, panic-free payload pooling implementation to reduce GC pressure
- built-in Prometheus metrics on
/metrics
1. What this bridge does
At a high level:
- Connects to an MQTT broker and subscribes to
#(all topics). - For every incoming MQTT message:
- Copies the payload into a reusable buffer (if ≤ 4 KB) or into a fresh slice.
- Pushes a small
InMessagestruct onto an internal buffered channel.
- A configurable pool of worker goroutines:
- Reads from that channel.
- Maps the MQTT topic to a Pulsar topic.
- Publishes to Pulsar using a per-topic producer (created lazily and cached).
- Exposes internal metrics for Prometheus.
If the queue is full, messages are intentionally dropped and counted.
2. Topic mapping
MQTT topics map 1:1 to Pulsar topics:
persistent://<tenant>/<namespace>/<mqtt-topic>
Example:
- MQTT:
dt/device123/temperature - Pulsar:
persistent://public/mqtt_ingest/dt/device123/temperature
Regex consumers can subscribe to entire trees:
./pulsar-cli consumer \
--regex \
-t "persistent://public/mqtt_ingest/dt/.*/.*" \
-s test
3. Features at a glance
Streaming
- Subscribes to MQTT
# - QoS 0 or 1 supported
- 1:1 MQTT → Pulsar topic mirroring
Resource-safe design
- 100k-message bounded internal queue
- Worker-pool based Pulsar publishing
- Overload = drop messages, don’t crash
Efficient Pulsar producers
- Cached producers per topic
- Batching (up to 1000 msgs, ~5ms delay)
- LZ4 compression
- Large pending buffer (100k)
Memory pooling
- Messages ≤ 4 KiB use pooled buffers
- Larger messages allocate new slices
- Strict, panic-free pooling logic
Metrics
Exposes per-topic:
- MQTT messages received
- Pulsar messages sent
- In-flight messages
- Pulsar send errors
- Dropped messages
Graceful shutdown
- Closes MQTT
- Closes Pulsar client and all producers
- Drains internal channel
4. Build and Run
Prerequisites
- Go 1.22+
- MQTT broker
- Pulsar cluster or standalone
Build
make build
Quick start
./pulsar-iotbridge run \
--mqtt-url tcp://localhost:1883 \
--mqtt-client-id pulsar-iotbridge \
--mqtt-qos 0 \
--pulsar-url pulsar://localhost:6650 \
--pulsar-tenant public \
--pulsar-namespace mqtt_ingest \
--workers 8 \
--metrics-addr :9090 \
--log-level info
5. Configuration Reference
Global flags
| Flag | Default | Description |
|---|---|---|
--log-level |
info |
Log level |
MQTT flags
| Flag | Default | Description |
|---|---|---|
--mqtt-url |
tcp://localhost:1883 |
Broker address |
--mqtt-client-id |
pulsar-iotbridge |
Client ID |
--mqtt-qos |
0 |
QoS 0 or 1 |
--mqtt-username |
"" |
Optional |
--mqtt-password |
"" |
Optional |
Pulsar flags
| Flag | Default | Description |
|---|---|---|
--pulsar-url |
pulsar://localhost:6650 |
Pulsar service URL |
--pulsar-token |
"" |
Auth token |
--pulsar-tenant |
public |
Tenant |
--pulsar-namespace |
mqtt_ingest |
Namespace |
Runtime flags
| Flag | Default | Description |
|---|---|---|
--workers |
8 |
Worker goroutines |
--metrics-addr |
:9090 |
Metrics endpoint |
6. Backpressure and Dropping
The internal channel:
inCh := make(chan *InMessage, 100_000)
Incoming MQTT messages:
- Are copied into pooled/static buffers
- Attempt to enqueue via non-blocking
select - On full queue → drop + metric increment
Worker goroutines:
- Pull from channel
- Resolve/create Pulsar producer
- Send via
SendAsync - Track metrics
- Return buffers to pools
Throughput stays high, memory stays bounded.
7. Metrics
Metrics available on /metrics (Prometheus):
mqtt_messages_total{topic=...}pulsar_messages_total{topic=...}pulsar_errors_total{topic=...}inflight_messages{topic=...}dropped_messages_total{topic=...}
Prometheus example:
scrape_configs:
- job_name: 'pulsar-iotbridge'
static_configs:
- targets: ['localhost:9090']
8. Operational tips
- Start MQTT and Pulsar before the bridge
- Scale workers if Pulsar throughput is bottleneck
- Watch “pending” and “dropped” metrics
- SIGINT/SIGTERM triggers full graceful shutdown
9. License
MIT License © 2025 Matthias Petermann