Chapter 15 — AIoT: Asking Questions of Your Sensor Data with LangChain
Temperature and humidity readings in a database are data. A question like “was the grow room humidity in the optimal range for the past 48 hours?” is intelligence.
The gap between the two used to require writing queries, building dashboards, and reading charts. Now it requires a LangChain pipeline and a DHT22.
The AIoT Architecture
The full pipeline:
DHT22 → Raspberry Pi Pico W → MQTT → InfluxDB → LangChain SQL Agent → Answer
graph LR
DHT22[🌡️ DHT22] -->|GPIO| PicoW[Pico W]
PicoW -->|MQTT publish| Broker[MQTT Broker]
Broker -->|subscribe| Influx[(InfluxDB\ntime-series)]
Influx -->|SQL| LangChain[LangChain\nSQL Agent]
LangChain <-->|query + answer| LLM[LLM\ngpt-4o-mini]
LangChain --> Answer[💬 Plain English\nAnswer]
Broker -->|stream| ekuiper[ekuiper\nEdge Rules Engine]
ekuiper -->|alert: avg > 28°C| HA[Home Assistant]
HA --> Notify[📱 Notification]
style LangChain fill:#e8f5e9,stroke:#388e3c
style ekuiper fill:#fff3e0,stroke:#e65100
style HA fill:#f3e5f5,stroke:#6a1b9a
Each component does one job:
- The sensor reads physical conditions
- The microcontroller digitizes and transmits them
- MQTT routes the data to storage
- InfluxDB stores time-series data with efficient querying
- LangChain translates natural language into database queries and returns answers
ekuiper: Edge Stream Processing
ekuiper (formerly EMQ X Kuiper) is an edge streaming processing engine for IoT. It runs on a Raspberry Pi and processes MQTT message streams with SQL-like rules.
A rule that calculates a rolling average and triggers an alert:
SELECT AVG(temperature) as avg_temp
FROM mqtt_stream
WHERE topic = 'home/sensors/temperature'
GROUP BY TUMBLINGWINDOW(mi, 5)
HAVING avg_temp > 28
When the 5-minute rolling average exceeds 28°C, ekuiper publishes an alert to another MQTT topic, which Home Assistant picks up and sends as a notification. No Python. No custom code.
DB2Rest: Sensor Data as an API
DB2Rest wraps a database (PostgreSQL, MySQL, SQLite) in a REST API automatically — no backend code required. If your sensor readings land in a PostgreSQL table, DB2Rest exposes them as:
GET /sensor_readings?filter=location:eq:living_room&sort=-timestamp&limit=100
This makes sensor data accessible to any frontend or LangChain tool without writing a custom API endpoint.
The LangChain Sensor Query
With sensor data in a database and DB2Rest exposing it as an API, the LangChain pipeline:
from langchain import SQLDatabase, SQLDatabaseChain
from langchain.chat_models import ChatOpenAI
db = SQLDatabase.from_uri("postgresql://user:pass@localhost/sensors")
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
chain = SQLDatabaseChain.from_llm(llm, db, verbose=True)
result = chain.run(
"What was the average VPD in the grow room for the last 48 hours, "
"and was it within the optimal range of 0.8-1.2 kPa?"
)
print(result)
The output: a plain English answer, with the SQL query that produced it logged for verification. “The average VPD over the last 48 hours was 0.94 kPa, which is within the optimal range. The highest reading was 1.31 kPa at 14:30 yesterday.”
This is the bridge between physical sensing and actionable intelligence.

Takeaway: ekuiper for edge stream processing with SQL rules. DB2Rest to expose sensor databases as APIs. LangChain + SQLDatabaseChain to query sensor data in natural language.