feat: Async gzip compress/decompress and perf optimization
* https://nodejs.org/docs/latest-v20.x/api/zlib.html#zlib * https://nodejs.org/docs/latest-v20.x/api/zlib.html#threadpool-usage-and-performance-considerations This is not a 'real async', but a non-blocking approach which offloads work to libuv thread pool.
This commit is contained in:
parent
3911e29801
commit
31d86ad55d
@ -9,7 +9,6 @@ const io = server.io;
|
||||
const { setting } = require("./util-server");
|
||||
const checkVersion = require("./check-version");
|
||||
const Database = require("./database");
|
||||
const Heartbeat = require("./model/heartbeat");
|
||||
|
||||
/**
|
||||
* Send list of notification providers to client
|
||||
@ -55,16 +54,7 @@ async function sendHeartbeatList(socket, monitorID, toUser = false, overwrite =
|
||||
[monitorID]
|
||||
);
|
||||
|
||||
let result = list.reverse().map((row) => {
|
||||
if (row.response) {
|
||||
return {
|
||||
...row,
|
||||
response: Heartbeat.decodeResponseValue(row.response),
|
||||
};
|
||||
}
|
||||
|
||||
return row;
|
||||
});
|
||||
let result = list.reverse();
|
||||
|
||||
if (toUser) {
|
||||
io.to(socket.userID).emit("heartbeatList", monitorID, result, overwrite);
|
||||
|
||||
@ -1,5 +1,7 @@
|
||||
const { BeanModel } = require("redbean-node/dist/bean-model");
|
||||
const zlib = require("node:zlib");
|
||||
const { promisify } = require("node:util");
|
||||
const gunzip = promisify(zlib.gunzip);
|
||||
|
||||
/**
|
||||
* status:
|
||||
@ -37,7 +39,26 @@ class Heartbeat extends BeanModel {
|
||||
important: this._important,
|
||||
duration: this._duration,
|
||||
retries: this._retries,
|
||||
response: Heartbeat.decodeResponseValue(this._response),
|
||||
response: this._response,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Return an object that ready to parse to JSON
|
||||
* @param {{ decodeResponse?: boolean }} opts Options for JSON serialization
|
||||
* @returns {Promise<object>} Object ready to parse
|
||||
*/
|
||||
async toJSONAsync(opts) {
|
||||
return {
|
||||
monitorID: this._monitorId,
|
||||
status: this._status,
|
||||
time: this._time,
|
||||
msg: this._msg,
|
||||
ping: this._ping,
|
||||
important: this._important,
|
||||
duration: this._duration,
|
||||
retries: this._retries,
|
||||
response: opts?.decodeResponse ? (await Heartbeat.decodeResponseValue(this._response)) : undefined,
|
||||
};
|
||||
}
|
||||
|
||||
@ -46,13 +67,14 @@ class Heartbeat extends BeanModel {
|
||||
* @param {string|null} response Encoded response payload.
|
||||
* @returns {string|null} Decoded response payload.
|
||||
*/
|
||||
static decodeResponseValue(response) {
|
||||
static async decodeResponseValue(response) {
|
||||
if (!response) {
|
||||
return response;
|
||||
}
|
||||
|
||||
try {
|
||||
return zlib.gunzipSync(Buffer.from(response, "base64")).toString("utf8");
|
||||
// Offload gzip decode from main event loop to libuv thread pool
|
||||
return (await gunzip(Buffer.from(response, "base64"))).toString("utf8");
|
||||
} catch (error) {
|
||||
return response;
|
||||
}
|
||||
|
||||
@ -59,6 +59,8 @@ const { HttpsCookieAgent } = require("http-cookie-agent/http");
|
||||
const https = require("https");
|
||||
const http = require("http");
|
||||
const zlib = require("node:zlib");
|
||||
const { promisify } = require("node:util");
|
||||
const gzip = promisify(zlib.gzip);
|
||||
const DomainExpiry = require("./domain_expiry");
|
||||
|
||||
const rootCertificates = rootCertificatesFingerprints();
|
||||
@ -645,7 +647,7 @@ class Monitor extends BeanModel {
|
||||
bean.ping = dayjs().valueOf() - startTime;
|
||||
|
||||
if (this.getSaveResponse()) {
|
||||
this.saveResponseData(bean, res.data);
|
||||
await this.saveResponseData(bean, res.data);
|
||||
}
|
||||
|
||||
// fallback for if kelog event is not emitted, but we may still have tlsInfo,
|
||||
@ -960,7 +962,7 @@ class Monitor extends BeanModel {
|
||||
}
|
||||
|
||||
if (this.getSaveErrorResponse() && error?.response?.data !== undefined) {
|
||||
this.saveResponseData(bean, error.response.data);
|
||||
await this.saveResponseData(bean, error.response.data);
|
||||
}
|
||||
|
||||
// If UP come in here, it must be upside down mode
|
||||
@ -1152,7 +1154,7 @@ class Monitor extends BeanModel {
|
||||
* @param {unknown} data Response payload.
|
||||
* @returns {void}
|
||||
*/
|
||||
saveResponseData(bean, data) {
|
||||
async saveResponseData(bean, data) {
|
||||
if (data === undefined) {
|
||||
return;
|
||||
}
|
||||
@ -1171,7 +1173,8 @@ class Monitor extends BeanModel {
|
||||
responseData = responseData.substring(0, maxSize) + "... (truncated)";
|
||||
}
|
||||
|
||||
bean.response = zlib.gzipSync(Buffer.from(responseData, "utf8")).toString("base64");
|
||||
// Offload gzip compression from main event loop to libuv thread pool
|
||||
bean.response = (await gzip(Buffer.from(responseData, "utf8"))).toString("base64");
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1477,7 +1480,7 @@ class Monitor extends BeanModel {
|
||||
* Send a notification about a monitor
|
||||
* @param {boolean} isFirstBeat Is this beat the first of this monitor?
|
||||
* @param {Monitor} monitor The monitor to send a notification about
|
||||
* @param {Bean} bean Status information about monitor
|
||||
* @param {import("./heartbeat")} bean Status information about monitor
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
static async sendNotification(isFirstBeat, monitor, bean) {
|
||||
@ -1495,7 +1498,7 @@ class Monitor extends BeanModel {
|
||||
|
||||
for (let notification of notificationList) {
|
||||
try {
|
||||
const heartbeatJSON = bean.toJSON();
|
||||
const heartbeatJSON = await bean.toJSONAsync({ decodeResponse: true });
|
||||
const monitorData = [{ id: monitor.id, active: monitor.active, name: monitor.name }];
|
||||
const preloadData = await Monitor.preparePreloadData(monitorData);
|
||||
// Prevent if the msg is undefined, notifications such as Discord cannot send out.
|
||||
|
||||
@ -14,23 +14,23 @@ describe("Monitor response saving", () => {
|
||||
assert.strictEqual(monitor.getSaveErrorResponse(), false);
|
||||
});
|
||||
|
||||
test("saveResponseData stores and truncates response", () => {
|
||||
test("saveResponseData stores and truncates response", async () => {
|
||||
const monitor = Object.create(Monitor.prototype);
|
||||
monitor.response_max_length = 5;
|
||||
|
||||
const bean = {};
|
||||
monitor.saveResponseData(bean, "abcdef");
|
||||
await monitor.saveResponseData(bean, "abcdef");
|
||||
|
||||
assert.strictEqual(Heartbeat.decodeResponseValue(bean.response), "abcde... (truncated)");
|
||||
assert.strictEqual(await Heartbeat.decodeResponseValue(bean.response), "abcde... (truncated)");
|
||||
});
|
||||
|
||||
test("saveResponseData stringifies objects", () => {
|
||||
test("saveResponseData stringifies objects", async () => {
|
||||
const monitor = Object.create(Monitor.prototype);
|
||||
monitor.response_max_length = RESPONSE_BODY_LENGTH_DEFAULT;
|
||||
|
||||
const bean = {};
|
||||
monitor.saveResponseData(bean, { ok: true });
|
||||
await monitor.saveResponseData(bean, { ok: true });
|
||||
|
||||
assert.strictEqual(Heartbeat.decodeResponseValue(bean.response), JSON.stringify({ ok: true }));
|
||||
assert.strictEqual(await Heartbeat.decodeResponseValue(bean.response), JSON.stringify({ ok: true }));
|
||||
});
|
||||
});
|
||||
|
||||
Loading…
Reference in New Issue
Block a user