broadcasting · pico/dht22
T22.4°C
H39.7%
Tc30.6°C
Field Notes · IoT → Big Data

From a $10 chip
to a live dashboard.

Notes on building a real-time IoT pipeline with the same patterns you ship at work — only smaller, cheaper, and visible end-to-end.
esp32 · pico w · dht22
mqtt · emqx · mosquitto
timescaledb · fastapi · websocket
§ 01 Overview what · why · what you leave with

What's this all about?

A weekend lab. A €15 sensor node streaming temperature and humidity to a browser in real time — with the same architectural shape you'd see on a Kafka-Spark stack at work.

The build

  • A $10 sensor node publishing live temp/humidity over MQTT.
  • Same patterns as production data stacks — broker, time-series store, push-to-UI — only miniature.
  • End-to-end visible: sensor → wire → DB → screen.

What you leave with

  • A mental model for IoT ingest: MQTT and the broker pattern.
  • One trick: Postgres LISTEN/NOTIFY as a zero-cost pub/sub bus.
  • The seams — when to swap parts for Kafka or Debezium.
§ 02 Speaker credentials · context

Quick intro — who's talking.

Background

  • Python & data engineering day-to-day.
  • Co-lead of a Big Data community in Madrid.
  • Hobbyist: microcontrollers, self-hosting, home IoT.
  • This deck's repo started as a weekend lab on a Raspberry Pi.

Why this demo?

  • The cheapest way to feel a real streaming pipeline end-to-end.
  • Touches every layer: edge → broker → store → API → UI.
  • Runs on one Pi. No cloud bill. Reproducible in an afternoon.
  • And — most production stacks hide this. Here, nothing hides.
§ 03 Thesis why a data engineer should care

Same patterns. Different physics.

Everything we'll show is something you already know from data engineering. It's just glued together at a different scale.
5parts
moving pieces · edge → ui
~150loc
python · the whole stack
0polls
no setInterval anywhere
§ 04 Architecture topology · five hops

The whole pipeline, on one page.

┌─ edge ─────────┐ ┌─ bus ──┐ ┌─ store ────────┐ ┌─ serve ────────┐ ┌─ ui ───────────┐ Pico W + DHT22 │ ──▶│ EMQX │──▶ │ TimescaleDB │──▶ │ FastAPI / WS │──▶ │ Browser / JS │ MicroPython │ │ pub/ │ │ hypertable │ │ asyncpg │ │ Chart.js │ │ publish 5s │ │ sub │ │ pg_notify trg │ │ LISTEN tail │ │ WebSocket pull│ └────────────────┘ └────────┘ └────────────────┘ └────────────────┘ └────────────────┘ ─ producer ─ ─ source-of-truth ─ ─ consumer ─ fire-and-forget durable + replayable live tail MQTT QoS 0 hypertable chunks push, not poll

Producer and consumer fully decoupled. The database is both the source of truth and the fan-out point — the same pattern you use in Debezium-style CDC, just with Postgres's built-in NOTIFY instead of a Kafka topic.

§ 05 Stack six components · one host

Six pieces. Nothing exotic.

01
Edge
Pico W + DHT22

RP2040 dual-core, MicroPython firmware. Publishes temp & humidity every 5 s.

02
Transport
EMQX broker

Pub/sub on TCP. Topic tree pico/#. QoS 0 for telemetry — fire & forget.

03
Ingest
mqtt_to_db.py

One Python process: paho subscriber → psycopg2 INSERT. The whole "connector" tier.

04
Store
TimescaleDB

Postgres 16 with the hypertable extension. Time-partitioned automatically.

05
Serve
FastAPI + asyncpg

WebSocket endpoint, listens to Postgres NOTIFY. Pushes JSON to the browser.

06
UI
Vanilla JS

Three cards + Chart.js line chart. No React, no Vue, no build pipeline.

§ 06 Edge hardware · sensor · wiring

The edge — Raspberry Pi Pico W.

Specs

  • RP2040 · dual-core ARM Cortex-M0+ @ 133 MHz
  • 264 KB SRAM · 2 MB flash
  • WiFi 802.11n via Infineon CYW43439
  • ~$6 USD. Runs MicroPython out of the box.
  • Idle ~20 mA — months on a USB power bank.

Sensor — DHT22

  • Single-wire digital · ±0.5 °C, ±2 % RH.
  • Max one reading every 2 s.

Wiring

DHT22 Pico W ───── ────── VCC ──▶ 3.3 V DATA ──▶ GPIO 15 (PIO driver) GND ──▶ GND

Three signals on the bus

  • pico/temperature/dht22 — air temp °C
  • pico/humidity/dht22 — RH %
  • pico/temperature/internal — chip temp (ADC4)
§ 07 Edge / firmware micropython · self-healing

The publisher loop, in full.

# MicroPython on the Pico W — publishes every 5 s
from umqtt.robust import MQTTClient
from DHT22 import DHT22

MQTT_BROKER = "192.168.1.11"
CLIENT_ID   = ubinascii.hexlify(machine.unique_id())

mqtt = MQTTClient(CLIENT_ID, MQTT_BROKER, keepalive=60)
mqtt.connect()

while True:
    t, h = dht_sensor.read()
    mqtt.publish(b"pico/temperature/dht22", f"{t:.1f}".encode())
    mqtt.publish(b"pico/humidity/dht22",    f"{h:.1f}".encode())
    utime.sleep(5)

What's interesting

  • umqtt.robust auto-reconnects on broker drop.
  • OS error → machine.reset(). Self-healing edge node.
  • Payload = raw float as ASCII bytes. Smallest possible wire format.
  • No TLS on LAN. For internet, swap in umqtt.simple2 with cert on flash.
The MQTT topic is your schema. device · metric · sensor — all in one string
§ 08 Bus mqtt · kafka · scale physics

What does a Kafka producer look like on 264 KB of RAM?

Apache Kafka

  • Min RAM ≈ 512 MB
  • TCP binary protocol, long-lived TCP streams
  • Built-in replay + log retention
  • Best for: scale, durability, the ecosystem

MQTT (EMQX / Mosquitto)

  • Min RAM ≈ 64 KB on the client
  • 2-byte header pub/sub — built for flaky links
  • QoS 0/1/2 · retained messages · last will
  • Best for: constrained devices, low latency, fan-out
Neither is better. The right protocol lives at the right layer of your pipeline. protocol choice = constraints choice
§ 09 Bus / broker emqx · topic tree · ops

EMQX — more than a hobby broker.

Why this broker

  • Built on Erlang/OTP. Millions of concurrent clients per node.
  • Clusters horizontally. ACL + auth plugins built in.
  • Dashboard at :18083 (admin/public default — change it).
  • Rule engine forwards MQTT → Kafka, Postgres, HTTP — without code.

When MQTT bites

  • QoS 0 = no replay. Bump to QoS 1 + retained for at-least-once.
  • Wildcards (pico/#) make naming the schema — choose carefully.

Topic tree

pico/ ├── temperature/ │ ├── dht22 → air temp °C │ └── internal → chip temp °C └── humidity/ └── dht22 → RH %

Topic naming is exactly like naming a Kafka topic or a partitioning column. Get it wrong in firmware and you'll be fixing it in production.

§ 10 Ops docker compose · single host

The whole stack — two containers.

EMQX broker

# docker-compose.yml
services:
  emqx:
    image: emqx/emqx
    container_name: emqx
    ports:
      - "1883:1883"     # MQTT plain
      - "8883:8883"     # MQTT over TLS
      - "8083:8083"     # MQTT / WebSocket
      - "18083:18083"   # admin dashboard
    restart: always

TimescaleDB

docker run -d --name timescaledb \
  -e POSTGRES_USER=pico \
  -e POSTGRES_PASSWORD=pico \
  -e POSTGRES_DB=sensors \
  -p 5432:5432 \
  timescale/timescaledb:latest-pg16
  • Two containers + a FastAPI host process under systemd.
  • Whole stack runs on one Raspberry Pi.
  • ~150 MB RAM total at idle.
§ 11 Ingest mqtt_to_db.py · the connector tier

The bridge — 10 lines of Python.

# mqtt_to_db.py — paho callback
def on_message(client, userdata, msg):
    value = float(msg.payload.decode())
    with userdata["conn"].cursor() as cur:
        cur.execute(
            "INSERT INTO readings"
            "  (topic, value)"
            " VALUES (%s, %s)",
            (msg.topic, value)
        )

# resilience without a service manager
while True:
    try:
        main()
    except Exception as e:
        print(f"restarting in 5s: {e}")
        time.sleep(5)

The whole connector tier

  • One process, one job. Subscribe → parse → insert.
  • This is conceptually the same shape as a Kafka Connect JDBC Sink — just simpler.
  • Schema: (ts, topic, value) — wide-table style. Normalize later.
  • autocommit=True — no transaction overhead for append-only telemetry.
  • Crash-restart loop wraps main() at the top level — cheaper than systemd for a weekend.
§ 12 Store timescaledb · hypertable

TimescaleDB — Postgres that thinks in time.

CREATE TABLE readings (
    ts    TIMESTAMPTZ      NOT NULL DEFAULT NOW(),
    topic TEXT             NOT NULL,
    value DOUBLE PRECISION NOT NULL
);

SELECT create_hypertable('readings', 'ts');

-- query: hourly buckets for last 24h
SELECT time_bucket('1 hour', ts) AS hour,
       topic,
       ROUND(AVG(value)::numeric, 2) AS avg
FROM readings
WHERE ts > NOW() - INTERVAL '24 hours'
GROUP BY hour, topic
ORDER BY topic, hour DESC;

Why it earns its keep

  • Hypertable = one logical table, chunked under the hood.
  • Inserts go to the current chunk → fast. Queries prune old chunks → fast.
  • Plain SQL. No new query language to learn.
  • time_bucket() = date_trunc() for arbitrary intervals — the only function you'll miss in vanilla Postgres.

If your data has a timestamp and you're storing it in a flat table with no partitioning strategy, you're paying a hidden tax on every query.

§ 13 Store / CDC trigger · pg_notify · the bus

The DB is the bus.

CREATE OR REPLACE FUNCTION notify_new_reading()
RETURNS trigger AS $$
BEGIN
  PERFORM pg_notify(
    'new_reading',
    row_to_json(NEW)::text
  );
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER readings_notify
AFTER INSERT ON readings
FOR EACH ROW EXECUTE FUNCTION notify_new_reading();

What we just got, for free

  • Postgres pub/sub, built in. pg_notify ships the row as JSON to any listener.
  • No Kafka. No Redis. No queue. The DB is the bus.
  • Same idea as Debezium / CDC — just without the JVM.

Trade-offs

  • Payload < 8 KB.
  • No durability — if a listener is offline, it misses the event.
  • Great for fan-out. Bad as a durable queue.
§ 14 Serve fastapi · asyncpg · websocket

FastAPI bridges NOTIFY → WebSocket.

async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    conn = await asyncpg.connect(DB_DSN)

    # 1. history backfill — last 20 rows
    rows = await conn.fetch(
        "SELECT ts, topic, value FROM readings"
        " ORDER BY ts DESC LIMIT 20"
    )
    await websocket.send_text(
        json.dumps({"type": "history", "data": rows})
    )

    # 2. live tail — Postgres NOTIFY → queue → ws
    queue = asyncio.Queue()
    def handle_notify(conn, pid, channel, payload):
        queue.put_nowait(payload)

    await conn.add_listener("new_reading", handle_notify)
    while True:
        payload = await queue.get()
        await websocket.send_text(payload)

Replay + tail in one socket

  • Browser gets the last 20 rows, then live appends.
  • asyncpg speaks NOTIFY natively. Callback drops the payload into an asyncio queue, the loop drains it to the socket.
  • This is the same shape as a Kafka consumer with a from-beginning seek — replay + tail — only it's 30 lines.
Push, not poll. The browser never asks "are we there yet?" latency · sensor → screen ≈ tens of ms on LAN
§ 15 UI vanilla js · chart.js · no build

The dashboard — built without a framework.

const ws = new WebSocket(`ws://${location.host}/ws`);

ws.onmessage = ({ data }) => {
  const msg = JSON.parse(data);
  if (msg.type === "history") msg.data.forEach(push);
  if (msg.type === "reading") push(msg.data);
};

function push(row) {
  const series = chartByTopic[row.topic];
  series.data.push({ x: row.ts, y: row.value });
  if (series.data.length > 40) series.data.shift();
  chart.update('none');
}
Pico W — DHT22 Live
▸ connected · ws://pi.local:8000/ws
DHT22 Temp
22.4
°C
Humidity
39.7
%
Chip Temp
30.6
°C
§ 16 Takeaways three things for monday

Three things to take to work on Monday.

01

Decouple the producer.

Whether it's a $6 chip or a mobile app — understanding the producer shapes everything downstream: schema, latency tolerance, retry logic. Brokers turn N producers × M consumers into N + M.

02

Protocol choice is architecture.

MQTT vs Kafka vs HTTP is not a vendor pick — it's a constraints pick. The right protocol fits the producer's memory budget and the consumer's reliability requirement. Both can coexist.

03

Push, not poll. Time is a citizen.

Use LISTEN/NOTIFY or CDC instead of polling, and partition by time at the storage layer. If your data has a timestamp and you store it flat, every query pays a hidden tax.

§End of deck · Live demo

Let me plug it in.

Terminal · MQTT raw stream · browser dashboard · live SQL query on TimescaleDB. The whole stack, in two minutes.
http://192.168.1.2:8077
github.com/JAlcocerT/RPi/tree/main/Z_MicroControllers/RPiPicoW
Thanks — Jesús Alcocer Tagua · questions welcome
cover · 01 / 18 navigate F fullscreen Home first