/*
This file is part of GNU Taler
(C) 2024 Taler Systems SA
GNU Taler is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
Foundation; either version 3, or (at your option) any later version.
GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
GNU Taler; see the file COPYING. If not, see
*/
/**
* @fileoverview Wrappers/proxies to make various interfaces observable.
*/
/**
* Imports.
*/
import { IDBDatabase } from "@gnu-taler/idb-bridge";
import {
ObservabilityContext,
ObservabilityEventType,
} from "@gnu-taler/taler-util";
import { TaskIdStr } from "./common.js";
import { TalerCryptoInterface } from "./index.js";
import {
DbAccess,
DbReadOnlyTransaction,
DbReadWriteTransaction,
StoreNames,
} from "./query.js";
import { TaskScheduler } from "./shepherd.js";
/**
* Task scheduler with extra observability events.
*/
export class ObservableTaskScheduler implements TaskScheduler {
constructor(
private impl: TaskScheduler,
private oc: ObservabilityContext,
) {}
private taskDepCache = new Set();
private declareDep(taskId: TaskIdStr): void {
if (this.taskDepCache.size > 500) {
this.taskDepCache.clear();
}
if (!this.taskDepCache.has(taskId)) {
this.taskDepCache.add(taskId);
this.oc.observe({
type: ObservabilityEventType.DeclareTaskDependency,
taskId,
});
}
}
shutdown(): Promise {
return this.impl.shutdown();
}
getActiveTasks(): TaskIdStr[] {
return this.impl.getActiveTasks();
}
isIdle(): boolean {
return this.impl.isIdle();
}
ensureRunning(): Promise {
return this.impl.ensureRunning();
}
startShepherdTask(taskId: TaskIdStr): void {
this.declareDep(taskId);
this.oc.observe({
type: ObservabilityEventType.TaskStart,
taskId,
});
return this.impl.startShepherdTask(taskId);
}
stopShepherdTask(taskId: TaskIdStr): void {
this.declareDep(taskId);
this.oc.observe({
type: ObservabilityEventType.TaskStop,
taskId,
});
return this.impl.stopShepherdTask(taskId);
}
resetTaskRetries(taskId: TaskIdStr): Promise {
this.declareDep(taskId);
if (this.taskDepCache.size > 500) {
this.taskDepCache.clear();
}
this.oc.observe({
type: ObservabilityEventType.TaskReset,
taskId,
});
return this.impl.resetTaskRetries(taskId);
}
async reload(): Promise {
return this.impl.reload();
}
}
const locRegex = /\s*at\s*([a-zA-Z0-9_.!]*)\s*/;
export function getCallerInfo(up: number = 2): string {
const stack = new Error().stack ?? "";
const identifies: string[] = [];
for (const line of stack.split("\n")) {
let l = line.match(locRegex);
if (l) {
identifies.push(l[1]);
}
}
return identifies.slice(up, up + 2).join("/");
}
export class ObservableDbAccess implements DbAccess {
constructor(
private impl: DbAccess,
private oc: ObservabilityContext,
) {}
idbHandle(): IDBDatabase {
return this.impl.idbHandle();
}
async runAllStoresReadWriteTx(
options: {
label?: string;
},
txf: (
tx: DbReadWriteTransaction[]>,
) => Promise,
): Promise {
const location = getCallerInfo();
this.oc.observe({
type: ObservabilityEventType.DbQueryStart,
name: "",
location,
});
try {
const ret = await this.impl.runAllStoresReadWriteTx(options, txf);
this.oc.observe({
type: ObservabilityEventType.DbQueryFinishSuccess,
name: "",
location,
});
return ret;
} catch (e) {
this.oc.observe({
type: ObservabilityEventType.DbQueryFinishError,
name: "",
location,
});
throw e;
}
}
async runAllStoresReadOnlyTx(
options: {
label?: string;
},
txf: (
tx: DbReadOnlyTransaction[]>,
) => Promise,
): Promise {
const location = getCallerInfo();
this.oc.observe({
type: ObservabilityEventType.DbQueryStart,
name: options.label ?? "",
location,
});
try {
const ret = await this.impl.runAllStoresReadOnlyTx(options, txf);
this.oc.observe({
type: ObservabilityEventType.DbQueryFinishSuccess,
name: options.label ?? "",
location,
});
return ret;
} catch (e) {
this.oc.observe({
type: ObservabilityEventType.DbQueryFinishError,
name: options.label ?? "",
location,
});
throw e;
}
}
async runReadWriteTx[]>(
opts: {
storeNames: StoreNameArray;
label?: string;
},
txf: (tx: DbReadWriteTransaction) => Promise,
): Promise {
const location = getCallerInfo();
this.oc.observe({
type: ObservabilityEventType.DbQueryStart,
name: opts.label ?? "",
location,
});
try {
const ret = await this.impl.runReadWriteTx(opts, txf);
this.oc.observe({
type: ObservabilityEventType.DbQueryFinishSuccess,
name: opts.label ?? "",
location,
});
return ret;
} catch (e) {
this.oc.observe({
type: ObservabilityEventType.DbQueryFinishError,
name: opts.label ?? "",
location,
});
throw e;
}
}
async runReadOnlyTx[]>(
opts: {
storeNames: StoreNameArray;
label?: string;
},
txf: (tx: DbReadOnlyTransaction) => Promise,
): Promise {
const location = getCallerInfo();
try {
this.oc.observe({
type: ObservabilityEventType.DbQueryStart,
name: opts.label ?? "",
location,
});
const ret = await this.impl.runReadOnlyTx(opts, txf);
this.oc.observe({
type: ObservabilityEventType.DbQueryFinishSuccess,
name: opts.label ?? "",
location,
});
return ret;
} catch (e) {
this.oc.observe({
type: ObservabilityEventType.DbQueryFinishError,
name: opts.label ?? "",
location,
});
throw e;
}
}
}
export function observeTalerCrypto(
impl: TalerCryptoInterface,
oc: ObservabilityContext,
): TalerCryptoInterface {
return Object.fromEntries(
Object.keys(impl).map((name) => {
return [
name,
async (req: any) => {
oc.observe({
type: ObservabilityEventType.CryptoStart,
operation: name,
});
try {
const res = await (impl as any)[name](req);
oc.observe({
type: ObservabilityEventType.CryptoFinishSuccess,
operation: name,
});
return res;
} catch (e) {
oc.observe({
type: ObservabilityEventType.CryptoFinishError,
operation: name,
});
throw e;
}
},
];
}),
) as any;
}