From 8b145d2522e873934a9f212168cc739ca59462ca Mon Sep 17 00:00:00 2001 From: mkdev11 Date: Tue, 6 Jan 2026 03:48:31 +0200 Subject: [PATCH] refactor: split MQTT check() into smaller functions Per CommanderStorm's feedback, refactor the check() method into: - checkKeyword() - for legacy keyword matching - checkJsonQuery() - for legacy JSONata query - checkConditions() - for new conditions system Also add test cases for MQTT conditions: - Message condition with contains operator - Topic condition with equals operator - Condition mismatch rejection - Multiple conditions with AND logic This improves readability and maintainability. --- server/monitor-types/mqtt.js | 122 +++++++++++++++--------- test/backend-test/monitors/test-mqtt.js | 62 +++++++++++- 2 files changed, 138 insertions(+), 46 deletions(-) diff --git a/server/monitor-types/mqtt.js b/server/monitor-types/mqtt.js index dd3863a89..f1c1ad23c 100644 --- a/server/monitor-types/mqtt.js +++ b/server/monitor-types/mqtt.js @@ -31,66 +31,98 @@ class MqttMonitorType extends MonitorType { }); if (monitor.mqttCheckType == null || monitor.mqttCheckType === "") { - // use old default monitor.mqttCheckType = "keyword"; } - // Prepare conditions evaluation (only if monitor has conditions defined) + // Check if conditions are defined const conditions = monitor.conditions ? ConditionExpressionGroup.fromMonitor(monitor) : null; const hasConditions = conditions && conditions.children && conditions.children.length > 0; - // Parse JSON if needed for conditions - let parsedMessage = null; + if (hasConditions) { + await this.checkConditions(monitor, heartbeat, messageTopic, receivedMessage, conditions); + } else if (monitor.mqttCheckType === "keyword") { + this.checkKeyword(monitor, heartbeat, messageTopic, receivedMessage); + } else if (monitor.mqttCheckType === "json-query") { + await this.checkJsonQuery(monitor, heartbeat, receivedMessage); + } else { + throw new Error("Unknown MQTT Check Type"); + } + } + + /** + * Check using keyword matching + * @param {object} monitor Monitor object + * @param {object} heartbeat Heartbeat object + * @param {string} messageTopic Received MQTT topic + * @param {string} receivedMessage Received MQTT message + * @returns {void} + * @throws {Error} If keyword is not found in message + */ + checkKeyword(monitor, heartbeat, messageTopic, receivedMessage) { + if (receivedMessage != null && receivedMessage.includes(monitor.mqttSuccessMessage)) { + heartbeat.msg = `Topic: ${messageTopic}; Message: ${receivedMessage}`; + heartbeat.status = UP; + } else { + throw new Error(`Message Mismatch - Topic: ${monitor.mqttTopic}; Message: ${receivedMessage}`); + } + } + + /** + * Check using JSONata query + * @param {object} monitor Monitor object + * @param {object} heartbeat Heartbeat object + * @param {string} receivedMessage Received MQTT message + * @returns {Promise} + */ + async checkJsonQuery(monitor, heartbeat, receivedMessage) { + const parsedMessage = JSON.parse(receivedMessage); + const expression = jsonata(monitor.jsonPath); + const result = await expression.evaluate(parsedMessage); + + if (result?.toString() === monitor.expectedValue) { + heartbeat.msg = "Message received, expected value is found"; + heartbeat.status = UP; + } else { + throw new Error("Message received but value is not equal to expected value, value was: [" + result + "]"); + } + } + + /** + * Check using conditions system + * @param {object} monitor Monitor object + * @param {object} heartbeat Heartbeat object + * @param {string} messageTopic Received MQTT topic + * @param {string} receivedMessage Received MQTT message + * @param {ConditionExpressionGroup} conditions Parsed conditions + * @returns {Promise} + */ + async checkConditions(monitor, heartbeat, messageTopic, receivedMessage, conditions) { let jsonValue = null; - if (hasConditions || monitor.mqttCheckType === "json-query") { + + // Parse JSON and extract value if jsonPath is defined + if (monitor.jsonPath) { try { - parsedMessage = JSON.parse(receivedMessage); - if (monitor.jsonPath) { - let expression = jsonata(monitor.jsonPath); - jsonValue = await expression.evaluate(parsedMessage); - } + const parsedMessage = JSON.parse(receivedMessage); + const expression = jsonata(monitor.jsonPath); + jsonValue = await expression.evaluate(parsedMessage); } catch (e) { - // Not valid JSON, that's okay for keyword check - if (monitor.mqttCheckType === "json-query") { - throw new Error("Invalid JSON in MQTT message: " + e.message); - } + // JSON parsing failed, jsonValue remains null } } - // If conditions are defined, use them - if (hasConditions) { - const conditionData = { - topic: messageTopic, - message: receivedMessage, - json_value: jsonValue?.toString() ?? "", - }; + const conditionData = { + topic: messageTopic, + message: receivedMessage, + json_value: jsonValue?.toString() ?? "", + }; - const conditionsResult = evaluateExpressionGroup(conditions, conditionData); + const conditionsResult = evaluateExpressionGroup(conditions, conditionData); - if (conditionsResult) { - heartbeat.msg = `Topic: ${messageTopic}; Message: ${receivedMessage}`; - heartbeat.status = UP; - } else { - throw new Error(`Conditions not met - Topic: ${messageTopic}; Message: ${receivedMessage}`); - } - } else if (monitor.mqttCheckType === "keyword") { - // Legacy keyword check - if (receivedMessage != null && receivedMessage.includes(monitor.mqttSuccessMessage)) { - heartbeat.msg = `Topic: ${messageTopic}; Message: ${receivedMessage}`; - heartbeat.status = UP; - } else { - throw Error(`Message Mismatch - Topic: ${monitor.mqttTopic}; Message: ${receivedMessage}`); - } - } else if (monitor.mqttCheckType === "json-query") { - // Legacy json-query check - if (jsonValue?.toString() === monitor.expectedValue) { - heartbeat.msg = "Message received, expected value is found"; - heartbeat.status = UP; - } else { - throw new Error("Message received but value is not equal to expected value, value was: [" + jsonValue + "]"); - } + if (conditionsResult) { + heartbeat.msg = `Topic: ${messageTopic}; Message: ${receivedMessage}`; + heartbeat.status = UP; } else { - throw Error("Unknown MQTT Check Type"); + throw new Error(`Conditions not met - Topic: ${messageTopic}; Message: ${receivedMessage}`); } } diff --git a/test/backend-test/monitors/test-mqtt.js b/test/backend-test/monitors/test-mqtt.js index a361e868e..75d8e1d95 100644 --- a/test/backend-test/monitors/test-mqtt.js +++ b/test/backend-test/monitors/test-mqtt.js @@ -12,9 +12,10 @@ const { UP, PENDING } = require("../../../src/util"); * @param {string} receivedMessage what message is received from the mqtt channel * @param {string} monitorTopic which MQTT topic is monitored (wildcards are allowed) * @param {string} publishTopic to which MQTT topic the message is sent + * @param {string|null} conditions JSON string of conditions or null * @returns {Promise} the heartbeat produced by the check */ -async function testMqtt(mqttSuccessMessage, mqttCheckType, receivedMessage, monitorTopic = "test", publishTopic = "test") { +async function testMqtt(mqttSuccessMessage, mqttCheckType, receivedMessage, monitorTopic = "test", publishTopic = "test", conditions = null) { const hiveMQContainer = await new HiveMQContainer().start(); const connectionString = hiveMQContainer.getConnectionString(); const mqttMonitorType = new MqttMonitorType(); @@ -30,6 +31,7 @@ async function testMqtt(mqttSuccessMessage, mqttCheckType, receivedMessage, moni mqttSuccessMessage: mqttSuccessMessage, // for keywords expectedValue: mqttSuccessMessage, // for json-query mqttCheckType: mqttCheckType, + conditions: conditions, // for conditions system }; const heartbeat = { msg: "", @@ -157,4 +159,62 @@ describe("MqttMonitorType", { new Error("Message received but value is not equal to expected value, value was: [present]") ); }); + + // Conditions system tests + test("check() sets status to UP when message condition matches (contains)", async () => { + const conditions = JSON.stringify([ + { + variable: "message", + operator: "contains", + value: "KEYWORD" + } + ]); + const heartbeat = await testMqtt("", null, "-> KEYWORD <-", "test", "test", conditions); + assert.strictEqual(heartbeat.status, UP); + assert.strictEqual(heartbeat.msg, "Topic: test; Message: -> KEYWORD <-"); + }); + + test("check() sets status to UP when topic condition matches (equals)", async () => { + const conditions = JSON.stringify([ + { + variable: "topic", + operator: "equals", + value: "sensors/temp" + } + ]); + const heartbeat = await testMqtt("", null, "any message", "sensors/temp", "sensors/temp", conditions); + assert.strictEqual(heartbeat.status, UP); + }); + + test("check() rejects when message condition does not match", async () => { + const conditions = JSON.stringify([ + { + variable: "message", + operator: "contains", + value: "EXPECTED" + } + ]); + await assert.rejects( + testMqtt("", null, "actual message without keyword", "test", "test", conditions), + new Error("Conditions not met - Topic: test; Message: actual message without keyword") + ); + }); + + test("check() sets status to UP with multiple conditions (AND)", async () => { + const conditions = JSON.stringify([ + { + variable: "topic", + operator: "equals", + value: "test" + }, + { + variable: "message", + operator: "contains", + value: "success", + andOr: "and" + } + ]); + const heartbeat = await testMqtt("", null, "operation success", "test", "test", conditions); + assert.strictEqual(heartbeat.status, UP); + }); });