feat: added monitoring for postgres query result (#6736)
Co-authored-by: Dalton Pearson <dalton.pearson@praemo.com>
This commit is contained in:
parent
e022b5f976
commit
d7296c6629
@ -3,26 +3,61 @@ const { log, UP } = require("../../src/util");
|
|||||||
const dayjs = require("dayjs");
|
const dayjs = require("dayjs");
|
||||||
const postgresConParse = require("pg-connection-string").parse;
|
const postgresConParse = require("pg-connection-string").parse;
|
||||||
const { Client } = require("pg");
|
const { Client } = require("pg");
|
||||||
|
const { ConditionVariable } = require("../monitor-conditions/variables");
|
||||||
|
const { defaultStringOperators } = require("../monitor-conditions/operators");
|
||||||
|
const { ConditionExpressionGroup } = require("../monitor-conditions/expression");
|
||||||
|
const { evaluateExpressionGroup } = require("../monitor-conditions/evaluator");
|
||||||
|
|
||||||
class PostgresMonitorType extends MonitorType {
|
class PostgresMonitorType extends MonitorType {
|
||||||
name = "postgres";
|
name = "postgres";
|
||||||
|
|
||||||
|
supportsConditions = true;
|
||||||
|
conditionVariables = [new ConditionVariable("result", defaultStringOperators)];
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @inheritdoc
|
* @inheritdoc
|
||||||
*/
|
*/
|
||||||
async check(monitor, heartbeat, _server) {
|
async check(monitor, heartbeat, _server) {
|
||||||
let startTime = dayjs().valueOf();
|
|
||||||
|
|
||||||
let query = monitor.databaseQuery;
|
let query = monitor.databaseQuery;
|
||||||
// No query provided by user, use SELECT 1
|
// No query provided by user, use SELECT 1
|
||||||
if (!query || (typeof query === "string" && query.trim() === "")) {
|
if (!query || (typeof query === "string" && query.trim() === "")) {
|
||||||
query = "SELECT 1";
|
query = "SELECT 1";
|
||||||
}
|
}
|
||||||
await this.postgresQuery(monitor.databaseConnectionString, query);
|
|
||||||
|
|
||||||
heartbeat.msg = "";
|
const conditions = monitor.conditions ? ConditionExpressionGroup.fromMonitor(monitor) : null;
|
||||||
heartbeat.status = UP;
|
const hasConditions = conditions && conditions.children && conditions.children.length > 0;
|
||||||
|
|
||||||
|
const startTime = dayjs().valueOf();
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (hasConditions) {
|
||||||
|
// When conditions are enabled, expect a single value result
|
||||||
|
const result = await this.postgresQuerySingleValue(monitor.databaseConnectionString, query);
|
||||||
heartbeat.ping = dayjs().valueOf() - startTime;
|
heartbeat.ping = dayjs().valueOf() - startTime;
|
||||||
|
|
||||||
|
const conditionsResult = evaluateExpressionGroup(conditions, { result: String(result) });
|
||||||
|
|
||||||
|
if (!conditionsResult) {
|
||||||
|
throw new Error(`Query result did not meet the specified conditions (${result})`);
|
||||||
|
}
|
||||||
|
|
||||||
|
heartbeat.status = UP;
|
||||||
|
heartbeat.msg = "Query did meet specified conditions";
|
||||||
|
} else {
|
||||||
|
// Backwards compatible: just check connection and return row count
|
||||||
|
const result = await this.postgresQuery(monitor.databaseConnectionString, query);
|
||||||
|
heartbeat.ping = dayjs().valueOf() - startTime;
|
||||||
|
heartbeat.status = UP;
|
||||||
|
heartbeat.msg = result;
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
heartbeat.ping = dayjs().valueOf() - startTime;
|
||||||
|
// Re-throw condition errors as-is, wrap database errors
|
||||||
|
if (error.message.includes("did not meet the specified conditions")) {
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
throw new Error(`Database connection/query failed: ${error.message}`);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -76,6 +111,75 @@ class PostgresMonitorType extends MonitorType {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run a query on Postgres
|
||||||
|
* @param {string} connectionString The database connection string
|
||||||
|
* @param {string} query The query to validate the database with
|
||||||
|
* @returns {Promise<(string[] | object[] | object)>} Response from
|
||||||
|
* server
|
||||||
|
*/
|
||||||
|
async postgresQuerySingleValue(connectionString, query) {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
const config = postgresConParse(connectionString);
|
||||||
|
|
||||||
|
// Fix #3868, which true/false is not parsed to boolean
|
||||||
|
if (typeof config.ssl === "string") {
|
||||||
|
config.ssl = config.ssl === "true";
|
||||||
|
}
|
||||||
|
|
||||||
|
if (config.password === "") {
|
||||||
|
// See https://github.com/brianc/node-postgres/issues/1927
|
||||||
|
reject(new Error("Password is undefined."));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const client = new Client(config);
|
||||||
|
|
||||||
|
client.on("error", (error) => {
|
||||||
|
log.debug(this.name, "Error caught in the error event handler.");
|
||||||
|
reject(error);
|
||||||
|
});
|
||||||
|
|
||||||
|
client.connect((err) => {
|
||||||
|
if (err) {
|
||||||
|
reject(err);
|
||||||
|
client.end();
|
||||||
|
} else {
|
||||||
|
// Connected here
|
||||||
|
try {
|
||||||
|
client.query(query, (err, res) => {
|
||||||
|
if (err) {
|
||||||
|
reject(err);
|
||||||
|
} else {
|
||||||
|
// Check if we have results
|
||||||
|
if (!res.rows || res.rows.length === 0) {
|
||||||
|
reject(new Error("Query returned no results"));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Check if we have multiple rows
|
||||||
|
if (res.rows.length > 1) {
|
||||||
|
reject(new Error("Multiple values were found, expected only one value"));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const firstRow = res.rows[0];
|
||||||
|
const columnNames = Object.keys(firstRow);
|
||||||
|
// Check if we have multiple columns
|
||||||
|
if (columnNames.length > 1) {
|
||||||
|
reject(new Error("Multiple columns were found, expected only one value"));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
resolve(firstRow[columnNames[0]]);
|
||||||
|
}
|
||||||
|
client.end();
|
||||||
|
});
|
||||||
|
} catch (e) {
|
||||||
|
reject(e);
|
||||||
|
client.end();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = {
|
module.exports = {
|
||||||
|
|||||||
@ -49,5 +49,203 @@ describe(
|
|||||||
|
|
||||||
await assert.rejects(postgresMonitor.check(monitor, heartbeat, {}), regex);
|
await assert.rejects(postgresMonitor.check(monitor, heartbeat, {}), regex);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
test("check() sets status to UP when custom query returns single value", async () => {
|
||||||
|
// The default timeout of 30 seconds might not be enough for the container to start
|
||||||
|
const postgresContainer = await new PostgreSqlContainer("postgres:latest")
|
||||||
|
.withStartupTimeout(60000)
|
||||||
|
.start();
|
||||||
|
|
||||||
|
const postgresMonitor = new PostgresMonitorType();
|
||||||
|
const monitor = {
|
||||||
|
databaseConnectionString: postgresContainer.getConnectionUri(),
|
||||||
|
databaseQuery: "SELECT 42",
|
||||||
|
conditions: "[]",
|
||||||
|
};
|
||||||
|
|
||||||
|
const heartbeat = {
|
||||||
|
msg: "",
|
||||||
|
status: PENDING,
|
||||||
|
};
|
||||||
|
|
||||||
|
try {
|
||||||
|
await postgresMonitor.check(monitor, heartbeat, {});
|
||||||
|
assert.strictEqual(heartbeat.status, UP, `Expected status ${UP} but got ${heartbeat.status}`);
|
||||||
|
} finally {
|
||||||
|
await postgresContainer.stop();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
test("check() sets status to UP when custom query result meets condition", async () => {
|
||||||
|
const postgresContainer = await new PostgreSqlContainer("postgres:latest")
|
||||||
|
.withStartupTimeout(60000)
|
||||||
|
.start();
|
||||||
|
|
||||||
|
const postgresMonitor = new PostgresMonitorType();
|
||||||
|
const monitor = {
|
||||||
|
databaseConnectionString: postgresContainer.getConnectionUri(),
|
||||||
|
databaseQuery: "SELECT 42 AS value",
|
||||||
|
conditions: JSON.stringify([
|
||||||
|
{
|
||||||
|
type: "expression",
|
||||||
|
andOr: "and",
|
||||||
|
variable: "result",
|
||||||
|
operator: "equals",
|
||||||
|
value: "42",
|
||||||
|
},
|
||||||
|
]),
|
||||||
|
};
|
||||||
|
|
||||||
|
const heartbeat = {
|
||||||
|
msg: "",
|
||||||
|
status: PENDING,
|
||||||
|
};
|
||||||
|
|
||||||
|
try {
|
||||||
|
await postgresMonitor.check(monitor, heartbeat, {});
|
||||||
|
assert.strictEqual(heartbeat.status, UP, `Expected status ${UP} but got ${heartbeat.status}`);
|
||||||
|
} finally {
|
||||||
|
await postgresContainer.stop();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
test("check() rejects when custom query result does not meet condition", async () => {
|
||||||
|
const postgresContainer = await new PostgreSqlContainer("postgres:latest")
|
||||||
|
.withStartupTimeout(60000)
|
||||||
|
.start();
|
||||||
|
|
||||||
|
const postgresMonitor = new PostgresMonitorType();
|
||||||
|
const monitor = {
|
||||||
|
databaseConnectionString: postgresContainer.getConnectionUri(),
|
||||||
|
databaseQuery: "SELECT 99 AS value",
|
||||||
|
conditions: JSON.stringify([
|
||||||
|
{
|
||||||
|
type: "expression",
|
||||||
|
andOr: "and",
|
||||||
|
variable: "result",
|
||||||
|
operator: "equals",
|
||||||
|
value: "42",
|
||||||
|
},
|
||||||
|
]),
|
||||||
|
};
|
||||||
|
|
||||||
|
const heartbeat = {
|
||||||
|
msg: "",
|
||||||
|
status: PENDING,
|
||||||
|
};
|
||||||
|
|
||||||
|
try {
|
||||||
|
await assert.rejects(
|
||||||
|
postgresMonitor.check(monitor, heartbeat, {}),
|
||||||
|
new Error("Query result did not meet the specified conditions (99)")
|
||||||
|
);
|
||||||
|
assert.strictEqual(heartbeat.status, PENDING, `Expected status should not be ${heartbeat.status}`);
|
||||||
|
} finally {
|
||||||
|
await postgresContainer.stop();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
test("check() rejects when query returns no results with conditions", async () => {
|
||||||
|
const postgresContainer = await new PostgreSqlContainer("postgres:latest")
|
||||||
|
.withStartupTimeout(60000)
|
||||||
|
.start();
|
||||||
|
|
||||||
|
const postgresMonitor = new PostgresMonitorType();
|
||||||
|
const monitor = {
|
||||||
|
databaseConnectionString: postgresContainer.getConnectionUri(),
|
||||||
|
databaseQuery: "SELECT 1 WHERE 1 = 0",
|
||||||
|
conditions: JSON.stringify([
|
||||||
|
{
|
||||||
|
type: "expression",
|
||||||
|
andOr: "and",
|
||||||
|
variable: "result",
|
||||||
|
operator: "equals",
|
||||||
|
value: "1",
|
||||||
|
},
|
||||||
|
]),
|
||||||
|
};
|
||||||
|
|
||||||
|
const heartbeat = {
|
||||||
|
msg: "",
|
||||||
|
status: PENDING,
|
||||||
|
};
|
||||||
|
|
||||||
|
try {
|
||||||
|
await assert.rejects(
|
||||||
|
postgresMonitor.check(monitor, heartbeat, {}),
|
||||||
|
new Error("Database connection/query failed: Query returned no results")
|
||||||
|
);
|
||||||
|
assert.strictEqual(heartbeat.status, PENDING, `Expected status should not be ${heartbeat.status}`);
|
||||||
|
} finally {
|
||||||
|
await postgresContainer.stop();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
test("check() rejects when query returns multiple rows with conditions", async () => {
|
||||||
|
const postgresContainer = await new PostgreSqlContainer("postgres:latest")
|
||||||
|
.withStartupTimeout(60000)
|
||||||
|
.start();
|
||||||
|
|
||||||
|
const postgresMonitor = new PostgresMonitorType();
|
||||||
|
const monitor = {
|
||||||
|
databaseConnectionString: postgresContainer.getConnectionUri(),
|
||||||
|
databaseQuery: "SELECT 1 UNION ALL SELECT 2",
|
||||||
|
conditions: JSON.stringify([
|
||||||
|
{
|
||||||
|
type: "expression",
|
||||||
|
andOr: "and",
|
||||||
|
variable: "result",
|
||||||
|
operator: "equals",
|
||||||
|
value: "1",
|
||||||
|
},
|
||||||
|
]),
|
||||||
|
};
|
||||||
|
|
||||||
|
const heartbeat = {
|
||||||
|
msg: "",
|
||||||
|
status: PENDING,
|
||||||
|
};
|
||||||
|
|
||||||
|
try {
|
||||||
|
await assert.rejects(
|
||||||
|
postgresMonitor.check(monitor, heartbeat, {}),
|
||||||
|
new Error("Database connection/query failed: Multiple values were found, expected only one value")
|
||||||
|
);
|
||||||
|
assert.strictEqual(heartbeat.status, PENDING, `Expected status should not be ${heartbeat.status}`);
|
||||||
|
} finally {
|
||||||
|
await postgresContainer.stop();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
test("check() rejects when query returns multiple columns with conditions", async () => {
|
||||||
|
const postgresContainer = await new PostgreSqlContainer("postgres:latest")
|
||||||
|
.withStartupTimeout(60000)
|
||||||
|
.start();
|
||||||
|
|
||||||
|
const postgresMonitor = new PostgresMonitorType();
|
||||||
|
const monitor = {
|
||||||
|
databaseConnectionString: postgresContainer.getConnectionUri(),
|
||||||
|
databaseQuery: "SELECT 1 AS col1, 2 AS col2",
|
||||||
|
conditions: JSON.stringify([
|
||||||
|
{
|
||||||
|
type: "expression",
|
||||||
|
andOr: "and",
|
||||||
|
variable: "result",
|
||||||
|
operator: "equals",
|
||||||
|
value: "1",
|
||||||
|
},
|
||||||
|
]),
|
||||||
|
};
|
||||||
|
|
||||||
|
const heartbeat = {
|
||||||
|
msg: "",
|
||||||
|
status: PENDING,
|
||||||
|
};
|
||||||
|
|
||||||
|
try {
|
||||||
|
await assert.rejects(
|
||||||
|
postgresMonitor.check(monitor, heartbeat, {}),
|
||||||
|
new Error("Database connection/query failed: Multiple columns were found, expected only one value")
|
||||||
|
);
|
||||||
|
assert.strictEqual(heartbeat.status, PENDING, `Expected status should not be ${heartbeat.status}`);
|
||||||
|
} finally {
|
||||||
|
await postgresContainer.stop();
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user