feat: add conditions support for MQTT monitor type

Add rich conditions support to MQTT monitor similar to DNS monitor,
allowing users to define flexible conditions on:
- topic: The MQTT topic that received the message
- message: The raw message content
- json_value: JSONata-extracted value from JSON payloads

This provides a more intuitive and powerful way to validate MQTT
messages compared to the basic keyword/json-query checks.

Maintains backward compatibility with existing keyword and json-query
check types.

Closes #5992
This commit is contained in:
mkdev11 2026-01-06 03:08:10 +02:00
parent 698521f089
commit cf1391db6b
2 changed files with 57 additions and 9 deletions

View File

@ -2,10 +2,22 @@ const { MonitorType } = require("./monitor-type");
const { log, UP } = require("../../src/util"); const { log, UP } = require("../../src/util");
const mqtt = require("mqtt"); const mqtt = require("mqtt");
const jsonata = require("jsonata"); const jsonata = require("jsonata");
const { ConditionVariable } = require("../monitor-conditions/variables");
const { defaultStringOperators, defaultNumberOperators } = require("../monitor-conditions/operators");
const { ConditionExpressionGroup } = require("../monitor-conditions/expression");
const { evaluateExpressionGroup } = require("../monitor-conditions/evaluator");
class MqttMonitorType extends MonitorType { class MqttMonitorType extends MonitorType {
name = "mqtt"; name = "mqtt";
supportsConditions = true;
conditionVariables = [
new ConditionVariable("topic", defaultStringOperators),
new ConditionVariable("message", defaultStringOperators),
new ConditionVariable("json_value", defaultStringOperators.concat(defaultNumberOperators)),
];
/** /**
* @inheritdoc * @inheritdoc
*/ */
@ -23,7 +35,46 @@ class MqttMonitorType extends MonitorType {
monitor.mqttCheckType = "keyword"; monitor.mqttCheckType = "keyword";
} }
if (monitor.mqttCheckType === "keyword") { // Prepare conditions evaluation
const conditions = ConditionExpressionGroup.fromMonitor(monitor);
const hasConditions = conditions && conditions.children && conditions.children.length > 0;
// Parse JSON if needed for conditions
let parsedMessage = null;
let jsonValue = null;
if (hasConditions || monitor.mqttCheckType === "json-query") {
try {
parsedMessage = JSON.parse(receivedMessage);
if (monitor.jsonPath) {
let expression = jsonata(monitor.jsonPath);
jsonValue = await expression.evaluate(parsedMessage);
}
} catch (e) {
// Not valid JSON, that's okay for keyword check
if (monitor.mqttCheckType === "json-query") {
throw new Error("Invalid JSON in MQTT message: " + e.message);
}
}
}
// If conditions are defined, use them
if (hasConditions) {
const conditionData = {
topic: messageTopic,
message: receivedMessage,
json_value: jsonValue?.toString() ?? "",
};
const conditionsResult = evaluateExpressionGroup(conditions, conditionData);
if (conditionsResult) {
heartbeat.msg = `Topic: ${messageTopic}; Message: ${receivedMessage}`;
heartbeat.status = UP;
} else {
throw new Error(`Conditions not met - Topic: ${messageTopic}; Message: ${receivedMessage}`);
}
} else if (monitor.mqttCheckType === "keyword") {
// Legacy keyword check
if (receivedMessage != null && receivedMessage.includes(monitor.mqttSuccessMessage)) { if (receivedMessage != null && receivedMessage.includes(monitor.mqttSuccessMessage)) {
heartbeat.msg = `Topic: ${messageTopic}; Message: ${receivedMessage}`; heartbeat.msg = `Topic: ${messageTopic}; Message: ${receivedMessage}`;
heartbeat.status = UP; heartbeat.status = UP;
@ -31,17 +82,12 @@ class MqttMonitorType extends MonitorType {
throw Error(`Message Mismatch - Topic: ${monitor.mqttTopic}; Message: ${receivedMessage}`); throw Error(`Message Mismatch - Topic: ${monitor.mqttTopic}; Message: ${receivedMessage}`);
} }
} else if (monitor.mqttCheckType === "json-query") { } else if (monitor.mqttCheckType === "json-query") {
const parsedMessage = JSON.parse(receivedMessage); // Legacy json-query check
if (jsonValue?.toString() === monitor.expectedValue) {
let expression = jsonata(monitor.jsonPath);
let result = await expression.evaluate(parsedMessage);
if (result?.toString() === monitor.expectedValue) {
heartbeat.msg = "Message received, expected value is found"; heartbeat.msg = "Message received, expected value is found";
heartbeat.status = UP; heartbeat.status = UP;
} else { } else {
throw new Error("Message received but value is not equal to expected value, value was: [" + result + "]"); throw new Error("Message received but value is not equal to expected value, value was: [" + jsonValue + "]");
} }
} else { } else {
throw Error("Unknown MQTT Check Type"); throw Error("Unknown MQTT Check Type");

View File

@ -1121,6 +1121,8 @@
"less than or equal to": "less than or equal to", "less than or equal to": "less than or equal to",
"greater than or equal to": "greater than or equal to", "greater than or equal to": "greater than or equal to",
"record": "record", "record": "record",
"message": "message",
"json_value": "JSON value",
"Notification Channel": "Notification Channel", "Notification Channel": "Notification Channel",
"Sound": "Sound", "Sound": "Sound",
"Alphanumerical string and hyphens only": "Alphanumerical string and hyphens only", "Alphanumerical string and hyphens only": "Alphanumerical string and hyphens only",