refactor: split MQTT check() into smaller functions
Per CommanderStorm's feedback, refactor the check() method into: - checkKeyword() - for legacy keyword matching - checkJsonQuery() - for legacy JSONata query - checkConditions() - for new conditions system Also add test cases for MQTT conditions: - Message condition with contains operator - Topic condition with equals operator - Condition mismatch rejection - Multiple conditions with AND logic This improves readability and maintainability.
This commit is contained in:
parent
e22784aaea
commit
8b145d2522
@ -31,66 +31,98 @@ class MqttMonitorType extends MonitorType {
|
||||
});
|
||||
|
||||
if (monitor.mqttCheckType == null || monitor.mqttCheckType === "") {
|
||||
// use old default
|
||||
monitor.mqttCheckType = "keyword";
|
||||
}
|
||||
|
||||
// Prepare conditions evaluation (only if monitor has conditions defined)
|
||||
// Check if conditions are defined
|
||||
const conditions = monitor.conditions ? ConditionExpressionGroup.fromMonitor(monitor) : null;
|
||||
const hasConditions = conditions && conditions.children && conditions.children.length > 0;
|
||||
|
||||
// Parse JSON if needed for conditions
|
||||
let parsedMessage = null;
|
||||
if (hasConditions) {
|
||||
await this.checkConditions(monitor, heartbeat, messageTopic, receivedMessage, conditions);
|
||||
} else if (monitor.mqttCheckType === "keyword") {
|
||||
this.checkKeyword(monitor, heartbeat, messageTopic, receivedMessage);
|
||||
} else if (monitor.mqttCheckType === "json-query") {
|
||||
await this.checkJsonQuery(monitor, heartbeat, receivedMessage);
|
||||
} else {
|
||||
throw new Error("Unknown MQTT Check Type");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check using keyword matching
|
||||
* @param {object} monitor Monitor object
|
||||
* @param {object} heartbeat Heartbeat object
|
||||
* @param {string} messageTopic Received MQTT topic
|
||||
* @param {string} receivedMessage Received MQTT message
|
||||
* @returns {void}
|
||||
* @throws {Error} If keyword is not found in message
|
||||
*/
|
||||
checkKeyword(monitor, heartbeat, messageTopic, receivedMessage) {
|
||||
if (receivedMessage != null && receivedMessage.includes(monitor.mqttSuccessMessage)) {
|
||||
heartbeat.msg = `Topic: ${messageTopic}; Message: ${receivedMessage}`;
|
||||
heartbeat.status = UP;
|
||||
} else {
|
||||
throw new Error(`Message Mismatch - Topic: ${monitor.mqttTopic}; Message: ${receivedMessage}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check using JSONata query
|
||||
* @param {object} monitor Monitor object
|
||||
* @param {object} heartbeat Heartbeat object
|
||||
* @param {string} receivedMessage Received MQTT message
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
async checkJsonQuery(monitor, heartbeat, receivedMessage) {
|
||||
const parsedMessage = JSON.parse(receivedMessage);
|
||||
const expression = jsonata(monitor.jsonPath);
|
||||
const result = await expression.evaluate(parsedMessage);
|
||||
|
||||
if (result?.toString() === monitor.expectedValue) {
|
||||
heartbeat.msg = "Message received, expected value is found";
|
||||
heartbeat.status = UP;
|
||||
} else {
|
||||
throw new Error("Message received but value is not equal to expected value, value was: [" + result + "]");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check using conditions system
|
||||
* @param {object} monitor Monitor object
|
||||
* @param {object} heartbeat Heartbeat object
|
||||
* @param {string} messageTopic Received MQTT topic
|
||||
* @param {string} receivedMessage Received MQTT message
|
||||
* @param {ConditionExpressionGroup} conditions Parsed conditions
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
async checkConditions(monitor, heartbeat, messageTopic, receivedMessage, conditions) {
|
||||
let jsonValue = null;
|
||||
if (hasConditions || monitor.mqttCheckType === "json-query") {
|
||||
|
||||
// Parse JSON and extract value if jsonPath is defined
|
||||
if (monitor.jsonPath) {
|
||||
try {
|
||||
parsedMessage = JSON.parse(receivedMessage);
|
||||
if (monitor.jsonPath) {
|
||||
let expression = jsonata(monitor.jsonPath);
|
||||
jsonValue = await expression.evaluate(parsedMessage);
|
||||
}
|
||||
const parsedMessage = JSON.parse(receivedMessage);
|
||||
const expression = jsonata(monitor.jsonPath);
|
||||
jsonValue = await expression.evaluate(parsedMessage);
|
||||
} catch (e) {
|
||||
// Not valid JSON, that's okay for keyword check
|
||||
if (monitor.mqttCheckType === "json-query") {
|
||||
throw new Error("Invalid JSON in MQTT message: " + e.message);
|
||||
}
|
||||
// JSON parsing failed, jsonValue remains null
|
||||
}
|
||||
}
|
||||
|
||||
// If conditions are defined, use them
|
||||
if (hasConditions) {
|
||||
const conditionData = {
|
||||
topic: messageTopic,
|
||||
message: receivedMessage,
|
||||
json_value: jsonValue?.toString() ?? "",
|
||||
};
|
||||
const conditionData = {
|
||||
topic: messageTopic,
|
||||
message: receivedMessage,
|
||||
json_value: jsonValue?.toString() ?? "",
|
||||
};
|
||||
|
||||
const conditionsResult = evaluateExpressionGroup(conditions, conditionData);
|
||||
const conditionsResult = evaluateExpressionGroup(conditions, conditionData);
|
||||
|
||||
if (conditionsResult) {
|
||||
heartbeat.msg = `Topic: ${messageTopic}; Message: ${receivedMessage}`;
|
||||
heartbeat.status = UP;
|
||||
} else {
|
||||
throw new Error(`Conditions not met - Topic: ${messageTopic}; Message: ${receivedMessage}`);
|
||||
}
|
||||
} else if (monitor.mqttCheckType === "keyword") {
|
||||
// Legacy keyword check
|
||||
if (receivedMessage != null && receivedMessage.includes(monitor.mqttSuccessMessage)) {
|
||||
heartbeat.msg = `Topic: ${messageTopic}; Message: ${receivedMessage}`;
|
||||
heartbeat.status = UP;
|
||||
} else {
|
||||
throw Error(`Message Mismatch - Topic: ${monitor.mqttTopic}; Message: ${receivedMessage}`);
|
||||
}
|
||||
} else if (monitor.mqttCheckType === "json-query") {
|
||||
// Legacy json-query check
|
||||
if (jsonValue?.toString() === monitor.expectedValue) {
|
||||
heartbeat.msg = "Message received, expected value is found";
|
||||
heartbeat.status = UP;
|
||||
} else {
|
||||
throw new Error("Message received but value is not equal to expected value, value was: [" + jsonValue + "]");
|
||||
}
|
||||
if (conditionsResult) {
|
||||
heartbeat.msg = `Topic: ${messageTopic}; Message: ${receivedMessage}`;
|
||||
heartbeat.status = UP;
|
||||
} else {
|
||||
throw Error("Unknown MQTT Check Type");
|
||||
throw new Error(`Conditions not met - Topic: ${messageTopic}; Message: ${receivedMessage}`);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -12,9 +12,10 @@ const { UP, PENDING } = require("../../../src/util");
|
||||
* @param {string} receivedMessage what message is received from the mqtt channel
|
||||
* @param {string} monitorTopic which MQTT topic is monitored (wildcards are allowed)
|
||||
* @param {string} publishTopic to which MQTT topic the message is sent
|
||||
* @param {string|null} conditions JSON string of conditions or null
|
||||
* @returns {Promise<Heartbeat>} the heartbeat produced by the check
|
||||
*/
|
||||
async function testMqtt(mqttSuccessMessage, mqttCheckType, receivedMessage, monitorTopic = "test", publishTopic = "test") {
|
||||
async function testMqtt(mqttSuccessMessage, mqttCheckType, receivedMessage, monitorTopic = "test", publishTopic = "test", conditions = null) {
|
||||
const hiveMQContainer = await new HiveMQContainer().start();
|
||||
const connectionString = hiveMQContainer.getConnectionString();
|
||||
const mqttMonitorType = new MqttMonitorType();
|
||||
@ -30,6 +31,7 @@ async function testMqtt(mqttSuccessMessage, mqttCheckType, receivedMessage, moni
|
||||
mqttSuccessMessage: mqttSuccessMessage, // for keywords
|
||||
expectedValue: mqttSuccessMessage, // for json-query
|
||||
mqttCheckType: mqttCheckType,
|
||||
conditions: conditions, // for conditions system
|
||||
};
|
||||
const heartbeat = {
|
||||
msg: "",
|
||||
@ -157,4 +159,62 @@ describe("MqttMonitorType", {
|
||||
new Error("Message received but value is not equal to expected value, value was: [present]")
|
||||
);
|
||||
});
|
||||
|
||||
// Conditions system tests
|
||||
test("check() sets status to UP when message condition matches (contains)", async () => {
|
||||
const conditions = JSON.stringify([
|
||||
{
|
||||
variable: "message",
|
||||
operator: "contains",
|
||||
value: "KEYWORD"
|
||||
}
|
||||
]);
|
||||
const heartbeat = await testMqtt("", null, "-> KEYWORD <-", "test", "test", conditions);
|
||||
assert.strictEqual(heartbeat.status, UP);
|
||||
assert.strictEqual(heartbeat.msg, "Topic: test; Message: -> KEYWORD <-");
|
||||
});
|
||||
|
||||
test("check() sets status to UP when topic condition matches (equals)", async () => {
|
||||
const conditions = JSON.stringify([
|
||||
{
|
||||
variable: "topic",
|
||||
operator: "equals",
|
||||
value: "sensors/temp"
|
||||
}
|
||||
]);
|
||||
const heartbeat = await testMqtt("", null, "any message", "sensors/temp", "sensors/temp", conditions);
|
||||
assert.strictEqual(heartbeat.status, UP);
|
||||
});
|
||||
|
||||
test("check() rejects when message condition does not match", async () => {
|
||||
const conditions = JSON.stringify([
|
||||
{
|
||||
variable: "message",
|
||||
operator: "contains",
|
||||
value: "EXPECTED"
|
||||
}
|
||||
]);
|
||||
await assert.rejects(
|
||||
testMqtt("", null, "actual message without keyword", "test", "test", conditions),
|
||||
new Error("Conditions not met - Topic: test; Message: actual message without keyword")
|
||||
);
|
||||
});
|
||||
|
||||
test("check() sets status to UP with multiple conditions (AND)", async () => {
|
||||
const conditions = JSON.stringify([
|
||||
{
|
||||
variable: "topic",
|
||||
operator: "equals",
|
||||
value: "test"
|
||||
},
|
||||
{
|
||||
variable: "message",
|
||||
operator: "contains",
|
||||
value: "success",
|
||||
andOr: "and"
|
||||
}
|
||||
]);
|
||||
const heartbeat = await testMqtt("", null, "operation success", "test", "test", conditions);
|
||||
assert.strictEqual(heartbeat.status, UP);
|
||||
});
|
||||
});
|
||||
|
||||
Loading…
Reference in New Issue
Block a user