mirror of
https://github.com/sern-handler/handler
synced 2026-06-06 01:16:55 +00:00
refactor to not type leak and job cancellation
This commit is contained in:
@@ -67,8 +67,7 @@ export async function makeDependencies (conf: ValidDependencyConfig) {
|
||||
container.addSingleton('@sern/errors', new __Services.DefaultErrorHandling);
|
||||
container.addSingleton('@sern/modules', new Map);
|
||||
container.addSingleton('@sern/emitter', new EventEmitter)
|
||||
container.addWiredSingleton('@sern/scheduler',
|
||||
(deps) => new __Services.CronScheduler(deps as unknown as Dependencies))
|
||||
container.addSingleton('@sern/scheduler', new __Services.TaskScheduler)
|
||||
conf(dependencyBuilder(container));
|
||||
await container.ready();
|
||||
}
|
||||
@@ -152,5 +151,7 @@ export interface CoreDependencies {
|
||||
* by module.meta.id -> Module
|
||||
*/
|
||||
'@sern/modules': Map<string, Module>;
|
||||
|
||||
'@sern/scheduler': __Services.TaskScheduler
|
||||
}
|
||||
|
||||
|
||||
@@ -1,46 +0,0 @@
|
||||
import { CronJob } from 'cron';
|
||||
import { Err, Ok, type Result } from 'ts-results-es'
|
||||
export class TaskScheduler {
|
||||
private __tasks: Map<string, CronJob> = new Map();
|
||||
|
||||
scheduleTask(taskName: string, cronExpression: string | Date, task: () => void, tz: string| undefined): Result<void, string> {
|
||||
if (this.__tasks.has(taskName)) {
|
||||
return Err("while scheduling a task \
|
||||
found another task of same name. Not scheduling " +
|
||||
taskName + "again." );
|
||||
}
|
||||
try {
|
||||
const job = CronJob.from({ cronTime: cronExpression, onTick: task, timeZone: tz });
|
||||
job.start();
|
||||
this.__tasks.set(taskName, job);
|
||||
return Ok.EMPTY;
|
||||
} catch (error) {
|
||||
return Err(`while scheduling a task ${taskName} ` + error);
|
||||
}
|
||||
}
|
||||
|
||||
private stopTask(taskName: string): boolean {
|
||||
const job = this.__tasks.get(taskName);
|
||||
if (job) {
|
||||
job.stop();
|
||||
this.__tasks.delete(taskName);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private restartTask(taskName: string): boolean {
|
||||
const job = this.__tasks.get(taskName);
|
||||
if (job) {
|
||||
job.start();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
tasks(): string[] {
|
||||
return Array.from(this.__tasks.keys());
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,5 +1,5 @@
|
||||
import type { LogPayload, Logging, ErrorHandling, Emitter } from '../interfaces';
|
||||
import { AnyFunction, UnpackedDependencies } from '../../types/utility';
|
||||
import type { LogPayload, Logging, ErrorHandling } from '../interfaces';
|
||||
import { CronJob } from 'cron';
|
||||
|
||||
/**
|
||||
* @internal
|
||||
@@ -40,30 +40,46 @@ export class DefaultLogging implements Logging {
|
||||
}
|
||||
}
|
||||
|
||||
export class CronScheduler {
|
||||
tasks: string[] = [];
|
||||
constructor(private deps: UnpackedDependencies) {}
|
||||
// addListener(eventName: string | symbol, listener: AnyFunction): this {
|
||||
// const retrievedModule = this.modules.get(eventName);
|
||||
// if(!retrievedModule) throw Error("Adding task: module " +eventName +"was not found");
|
||||
// const { pattern, name, runOnInit, timezone } = retrievedModule;
|
||||
// cron.schedule(pattern,
|
||||
// (date) => listener({ date, deps: this.deps }),
|
||||
// { name, runOnInit, timezone, scheduled: true });
|
||||
// return this;
|
||||
// }
|
||||
// removeListener(eventName: string | symbol, listener: AnyFunction) {
|
||||
// const retrievedModule = this.modules.get(eventName);
|
||||
// if(!retrievedModule) throw Error("Removing cron: module " +eventName +"was not found");
|
||||
// const task = cron.getTasks().get(retrievedModule.name!)
|
||||
// if(!task) throw Error("Finding cron task with"+ retrievedModule.name + " not found");
|
||||
// task.stop();
|
||||
// return this;
|
||||
// }
|
||||
// emit(eventName: string | symbol, ...payload: any[]): boolean {
|
||||
// const retrievedModule = this.modules.get(eventName);
|
||||
// if(!retrievedModule) throw Error("Removing cron: module " +eventName +"was not found");
|
||||
// const task= cron.getTasks().get(retrievedModule.name!)
|
||||
// return task?.emit(eventName, payload) ?? false;
|
||||
// }
|
||||
|
||||
export class TaskScheduler {
|
||||
private __tasks: Map<string, CronJob> = new Map();
|
||||
|
||||
schedule(taskName: string, cronExpression: string | Date, task: () => void, tz: string| undefined) {
|
||||
if (this.__tasks.has(taskName)) {
|
||||
throw Error("while scheduling a task \
|
||||
found another task of same name. Not scheduling " +
|
||||
taskName + "again." );
|
||||
}
|
||||
try {
|
||||
const job = CronJob.from({ cronTime: cronExpression, onTick: task, timeZone: tz });
|
||||
job.start();
|
||||
this.__tasks.set(taskName, job);
|
||||
} catch (error) {
|
||||
throw Error(`while scheduling a task ${taskName} ` + error);
|
||||
}
|
||||
}
|
||||
|
||||
kill(taskName: string): boolean {
|
||||
const job = this.__tasks.get(taskName);
|
||||
if (job) {
|
||||
job.stop();
|
||||
this.__tasks.delete(taskName);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private restartTask(taskName: string): boolean {
|
||||
const job = this.__tasks.get(taskName);
|
||||
if (job) {
|
||||
job.start();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
get tasks(): string[] {
|
||||
return Array.from(this.__tasks.keys());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import { TaskScheduler } from "../core/schedule"
|
||||
import * as Files from '../core/module-loading'
|
||||
import { UnpackedDependencies } from "../types/utility";
|
||||
import { ScheduledTask } from "../types/core-modules";
|
||||
@@ -7,21 +6,21 @@ import { relative } from "path";
|
||||
import { fileURLToPath } from "url";
|
||||
|
||||
export const registerTasks = async (tasksPath: string, deps: UnpackedDependencies) => {
|
||||
const taskManager = new TaskScheduler()
|
||||
|
||||
const taskManager = deps['@sern/scheduler']
|
||||
for await (const f of Files.readRecursive(tasksPath)) {
|
||||
let { module } = await Files.importModule<ScheduledTask & { meta: { absPath: string } }>(f);
|
||||
let { module } = await Files.importModule<ScheduledTask>(f);
|
||||
|
||||
//module.name is assigned by Files.importModule<>
|
||||
// the id created for the task is unique
|
||||
const uuid = module.name!+"/"+relative(tasksPath,fileURLToPath(f))
|
||||
taskManager.scheduleTask(uuid, module.pattern, function(this: CronJob) {
|
||||
taskManager.schedule(uuid, module.trigger, function(this: CronJob) {
|
||||
module.execute({
|
||||
deps,
|
||||
runningTasks: taskManager.tasks(),
|
||||
id: uuid,
|
||||
lastTimeExecution: this.lastExecution,
|
||||
nextTimeExecution: this.nextDate().toJSDate()
|
||||
})
|
||||
}, module.timezone).unwrap()
|
||||
}, module.timezone)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -225,15 +225,27 @@ export interface SernSubCommandGroupData extends BaseApplicationCommandOptionsDa
|
||||
|
||||
|
||||
interface ScheduledTaskContext {
|
||||
/**
|
||||
* An object of dependencies configured in `makeDependencies`
|
||||
*/
|
||||
deps: UnpackedDependencies,
|
||||
/**
|
||||
* the uuid of the current task being run
|
||||
*/
|
||||
id: string;
|
||||
/**
|
||||
* the last time this task was executed. If this is the first time, it is null.
|
||||
*/
|
||||
lastTimeExecution: Date | null;
|
||||
runningTasks: string[];
|
||||
/**
|
||||
* The next time this task will be executed.
|
||||
*/
|
||||
nextTimeExecution: Date | null;
|
||||
}
|
||||
|
||||
export interface ScheduledTask {
|
||||
name?: string;
|
||||
pattern: string | Date;
|
||||
trigger: string | Date;
|
||||
description?: string;
|
||||
timezone?: string;
|
||||
execute(tasks: ScheduledTaskContext): Awaitable<void>
|
||||
|
||||
Reference in New Issue
Block a user