JAlcocerTech E-books

Chapter 15 — AIoT: Asking Questions of Your Sensor Data with LangChain

Temperature and humidity readings in a database are data. A question like “was the grow room humidity in the optimal range for the past 48 hours?” is intelligence.

The gap between the two used to require writing queries, building dashboards, and reading charts. Now it requires a LangChain pipeline and a DHT22.

The AIoT Architecture

The full pipeline:

DHT22 → Raspberry Pi Pico W → MQTT → InfluxDB → LangChain SQL Agent → Answer
graph LR
    DHT22[🌡️ DHT22] -->|GPIO| PicoW[Pico W]
    PicoW -->|MQTT publish| Broker[MQTT Broker]
    Broker -->|subscribe| Influx[(InfluxDB\ntime-series)]
    Influx -->|SQL| LangChain[LangChain\nSQL Agent]
    LangChain <-->|query + answer| LLM[LLM\ngpt-4o-mini]
    LangChain --> Answer[💬 Plain English\nAnswer]
    Broker -->|stream| ekuiper[ekuiper\nEdge Rules Engine]
    ekuiper -->|alert: avg > 28°C| HA[Home Assistant]
    HA --> Notify[📱 Notification]

    style LangChain fill:#e8f5e9,stroke:#388e3c
    style ekuiper fill:#fff3e0,stroke:#e65100
    style HA fill:#f3e5f5,stroke:#6a1b9a

Each component does one job:

  • The sensor reads physical conditions
  • The microcontroller digitizes and transmits them
  • MQTT routes the data to storage
  • InfluxDB stores time-series data with efficient querying
  • LangChain translates natural language into database queries and returns answers

ekuiper: Edge Stream Processing

ekuiper (formerly EMQ X Kuiper) is an edge streaming processing engine for IoT. It runs on a Raspberry Pi and processes MQTT message streams with SQL-like rules.

A rule that calculates a rolling average and triggers an alert:

SELECT AVG(temperature) as avg_temp 
FROM mqtt_stream 
WHERE topic = 'home/sensors/temperature'
GROUP BY TUMBLINGWINDOW(mi, 5)
HAVING avg_temp > 28

When the 5-minute rolling average exceeds 28°C, ekuiper publishes an alert to another MQTT topic, which Home Assistant picks up and sends as a notification. No Python. No custom code.

DB2Rest: Sensor Data as an API

DB2Rest wraps a database (PostgreSQL, MySQL, SQLite) in a REST API automatically — no backend code required. If your sensor readings land in a PostgreSQL table, DB2Rest exposes them as:

GET /sensor_readings?filter=location:eq:living_room&sort=-timestamp&limit=100

This makes sensor data accessible to any frontend or LangChain tool without writing a custom API endpoint.

The LangChain Sensor Query

With sensor data in a database and DB2Rest exposing it as an API, the LangChain pipeline:

from langchain import SQLDatabase, SQLDatabaseChain
from langchain.chat_models import ChatOpenAI

db = SQLDatabase.from_uri("postgresql://user:pass@localhost/sensors")
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
chain = SQLDatabaseChain.from_llm(llm, db, verbose=True)

result = chain.run(
    "What was the average VPD in the grow room for the last 48 hours, "
    "and was it within the optimal range of 0.8-1.2 kPa?"
)
print(result)

The output: a plain English answer, with the SQL query that produced it logged for verification. “The average VPD over the last 48 hours was 0.94 kPa, which is within the optimal range. The highest reading was 1.31 kPa at 14:30 yesterday.”

This is the bridge between physical sensing and actionable intelligence.

Ambient temperature data from an ASRock X300 — the kind of reading a LangChain agent can now query in plain English


Takeaway: ekuiper for edge stream processing with SQL rules. DB2Rest to expose sensor databases as APIs. LangChain + SQLDatabaseChain to query sensor data in natural language.