This post explains creating an n8n flow to receive parsed P1 reader data from Smartmeter2mqtt and save it to QuestDB. Ultimately, QuestDB can be used as a data source for tools like Grafana or Budibase.
Process data from Smartmeter2mqtt sent to the n8n flow via webhook
Schedule data retention for each QuestDB table using a cron node
Create the tables
smarthome_power_consumption
Power is a measure of electricity at a single point of time. Data retention is 30 days
smarthome_energy_consumption
Electrical Energy is a unit of measurement over time (historical). Usually expressed in kilowatt hours (kWh). Data retention is 5 years
smarthome_gas_consumption_all
Gas consumption all (approx. every 5 min.). Data retention is 30 days
smarthome_gas_consumption_historical
Gas consumption historical (every hour). Data retention is 5 years
The tables were created with the following SQL queries:
As you can see the timestamp (powerTs or gasTs) of the measured value(s) is used as designated timestamp which enables partitioning tables by time.
You can of course execute the SQL queries in the web console of QuestDB but I choose to execute the queries once with the n8n QuestDB nodes and save the queries in the n8n flow:
Process the data from Smartmeter2mqtt that is sent to the n8n flow using a webhook
The webrequest output of Smartmeter2mqtt will send the data (message) every 30 seconds to the webhook of the flow. When starting the n8n and Smartmeter2mqtt docker containers, empty messages can be sent to the webhook so the first step is to check whether the JSON body of the message contains fields (i.e. whether it is not empty):
The expression that checks if the crc field exists in the JSON body:
Then insert the power consumption. Because the insert operation of the QuestDB node didn’t work quite right I used the execute query operation and it works fine:
And then the energy consumption. Since we use this table for historical data, we store the value once an hour. This is checked using a query that returns a count():
As an example the query with the count():
As you can see, the last record is requested and the powerTs timestamp is compared with the current date and time.
Then the historical gas consumption whose data is also saved once an hour:
And finally all gas consumption. Since the gas consumption data only can change every five minutes, unlike the power consumption which changes every 30 seconds, I use a query to check whether the data contains a unique value. In this way, the data is only written if it is unique:
This query compares the gas_totalUse of the last row with the same field from the message received via the webhook. Remember that you have to convert the value with CAST to a float data type, otherwise the comparison will not work properly.
Schedule the data retention for each QuestDB table using a cron node
Now that the data is flowing in and is being stored in the tables, I’ve also added a cron trigger that will delete old data at 9am every day. This ensures that:
the data from the smarthome_power_consumption and smarthome_gas_consumption_all tables is stored for a maximum of 30 days
and the data from the smarthome_energy_consumption and smarthome_gas_consumption_historical tables is stored for a maximum of 5 years
For example, the SQL query for the power consumption table looks like this:
With the dateadd function you can set exactly how you want the data retetion to take place.
No comments found for this note.
Join the discussion for this note on Github. Comments appear on this page instantly.