mirror of
https://github.com/sern-handler/handler
synced 2026-06-17 13:22:17 +00:00
removerxjs
This commit is contained in:
2658
package-lock.json
generated
2658
package-lock.json
generated
File diff suppressed because it is too large
Load Diff
@@ -38,8 +38,7 @@
|
||||
"@sern/ioc": "^1.1.0",
|
||||
"callsites": "^3.1.0",
|
||||
"cron": "^3.1.7",
|
||||
"deepmerge": "^4.3.1",
|
||||
"rxjs": "^7.8.1"
|
||||
"deepmerge": "^4.3.1"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@faker-js/faker": "^8.0.1",
|
||||
|
||||
@@ -70,7 +70,7 @@ export async function makeDependencies (conf: ValidDependencyConfig) {
|
||||
}
|
||||
container.addSingleton('@sern/errors', new __Services.DefaultErrorHandling);
|
||||
container.addSingleton('@sern/modules', new Map);
|
||||
container.addSingleton('@sern/emitter', new EventEmitter)
|
||||
container.addSingleton('@sern/emitter', new EventEmitter({ captureRejections: true }))
|
||||
container.addSingleton('@sern/scheduler', new __Services.TaskScheduler)
|
||||
conf(dependencyBuilder(container));
|
||||
await container.ready();
|
||||
|
||||
@@ -1,40 +1,12 @@
|
||||
// @ts-nocheck
|
||||
|
||||
import type { Message, BaseInteraction } from 'discord.js';
|
||||
import util from 'node:util';
|
||||
import {
|
||||
EMPTY, type Observable, concatMap, filter,
|
||||
throwError, fromEvent, map, type OperatorFunction,
|
||||
catchError, finalize, pipe, from, take, share, of,
|
||||
} from 'rxjs';
|
||||
import * as Id from '../core/id'
|
||||
import type { Emitter, ErrorHandling, Logging } from '../core/interfaces';
|
||||
import type { Emitter, Logging } from '../core/interfaces';
|
||||
import { SernError } from '../core/structures/enums'
|
||||
import { Err, Ok, Result, wrapAsync } from '../core/structures/result';
|
||||
import type { UnpackedDependencies } from '../types/utility';
|
||||
import type { CommandModule, Module, Processed } from '../types/core-modules';
|
||||
import * as assert from 'node:assert';
|
||||
import { Context } from '../core/structures/context';
|
||||
import { CommandType } from '../core/structures/enums'
|
||||
import { Ok, wrapAsync} from '../core/structures/result';
|
||||
import type { Module } from '../types/core-modules';
|
||||
import { inspect } from 'node:util'
|
||||
import { disposeAll } from '../core/ioc';
|
||||
import { resultPayload, isAutocomplete, treeSearch, fmt } from '../core/functions'
|
||||
import { resultPayload, } from '../core/functions'
|
||||
|
||||
import merge from 'deepmerge'
|
||||
|
||||
function handleError<C>(crashHandler: ErrorHandling, emitter: Emitter, logging?: Logging) {
|
||||
return (pload: unknown, caught: Observable<C>) => {
|
||||
// This is done to fit the ErrorHandling contract
|
||||
if(!emitter.emit('error', pload)) {
|
||||
const err = pload instanceof Error ? pload : Error(util.inspect(pload, { colors: true }));
|
||||
logging?.error({ message: util.inspect(pload) });
|
||||
crashHandler.updateAlive(err);
|
||||
}
|
||||
return caught;
|
||||
};
|
||||
}
|
||||
const arrayify= <T>(src: T) =>
|
||||
Array.isArray(src) ? src : [src];
|
||||
|
||||
interface ExecutePayload {
|
||||
module: Module;
|
||||
@@ -42,146 +14,6 @@ interface ExecutePayload {
|
||||
[key: string]: unknown
|
||||
}
|
||||
|
||||
export const filterTap = <K, R>(onErr: (e: R) => void): OperatorFunction<Result<K, R>, K> =>
|
||||
concatMap(result => {
|
||||
if(result.ok){
|
||||
return of(result.value)
|
||||
}
|
||||
onErr(result.error);
|
||||
return EMPTY;
|
||||
})
|
||||
|
||||
export const sharedEventStream = <T>(e: Emitter, eventName: string) =>
|
||||
(fromEvent(e, eventName) as Observable<T>).pipe(share());
|
||||
|
||||
function intoPayload(module: Module, deps: Dependencies) {
|
||||
return pipe(map(arrayify),
|
||||
map(args => ({ module, args, deps })),
|
||||
map(p => p.args));
|
||||
}
|
||||
/**
|
||||
* Creates an observable from { source }
|
||||
* @param module
|
||||
* @param source
|
||||
*/
|
||||
export function eventDispatcher(deps: Dependencies, module: Module, source: unknown) {
|
||||
assert.ok(source && typeof source === 'object',
|
||||
`${source} cannot be constructed into an event listener`);
|
||||
const execute: OperatorFunction<unknown[]|undefined, unknown> =
|
||||
concatMap(async args => {
|
||||
if(args) return Reflect.apply(module.execute, null, args);
|
||||
});
|
||||
|
||||
//@ts-ignore
|
||||
let ev = fromEvent(source ,module.name!);
|
||||
//@ts-ignore
|
||||
if(module['once']) {
|
||||
ev = ev.pipe(take(1))
|
||||
}
|
||||
return ev.pipe(intoPayload(module, deps),
|
||||
execute);
|
||||
}
|
||||
|
||||
interface DispatchPayload {
|
||||
module: Processed<CommandModule>;
|
||||
event: BaseInteraction;
|
||||
defaultPrefix?: string;
|
||||
deps: Dependencies;
|
||||
params?: string
|
||||
};
|
||||
export function createDispatcher({ module, event, defaultPrefix, deps, params }: DispatchPayload): ExecutePayload {
|
||||
assert.ok(CommandType.Text !== module.type,
|
||||
SernError.MismatchEvent + 'Found text command in interaction stream');
|
||||
|
||||
if(isAutocomplete(event)) {
|
||||
assert.ok(module.type === CommandType.Slash
|
||||
|| module.type === CommandType.Both, "Autocomplete option on non command interaction");
|
||||
const option = treeSearch(event, module.options);
|
||||
assert.ok(option, SernError.NotSupportedInteraction + ` There is no autocomplete tag for ` + inspect(module));
|
||||
const { command } = option;
|
||||
return { module: command as Processed<Module>, //autocomplete is not a true "module" warning cast!
|
||||
args: [event],
|
||||
deps };
|
||||
}
|
||||
switch (module.type) {
|
||||
case CommandType.Slash:
|
||||
case CommandType.Both: {
|
||||
return { module, args: [Context.wrap(event, defaultPrefix)], deps };
|
||||
}
|
||||
default: return { module, args: [event], deps, params };
|
||||
}
|
||||
}
|
||||
function createGenericHandler<Source, Narrowed extends Source, Output>(
|
||||
source: Observable<Source>,
|
||||
makeModule: (event: Narrowed) => Promise<Output>,
|
||||
) {
|
||||
return (pred: (i: Source) => i is Narrowed) =>
|
||||
source.pipe(
|
||||
filter(pred), // only handle this stream if it passes pred
|
||||
concatMap(makeModule)); // create a payload, preparing to execute
|
||||
}
|
||||
|
||||
export function createMessageHandler(
|
||||
source: Observable<Message>,
|
||||
defaultPrefix: string,
|
||||
deps: Dependencies,
|
||||
) {
|
||||
const mg = deps['@sern/modules'];
|
||||
return createGenericHandler(source, async event => {
|
||||
const [prefix] = fmt(event.content, defaultPrefix);
|
||||
let module= mg.get(`${prefix}_T`) ?? mg.get(`${prefix}_B`) as Module;
|
||||
if(!module) {
|
||||
return Err('Possibly undefined behavior: could not find a static id to resolve');
|
||||
}
|
||||
return Ok({ args: [Context.wrap(event, defaultPrefix)], module, deps })
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Wraps the task in a Result as a try / catch.
|
||||
* if the task is ok, an event is emitted and the stream becomes empty
|
||||
* if the task is an error, throw an error down the stream which will be handled by catchError
|
||||
* thank u kingomes
|
||||
* @param emitter reference to SernEmitter that will emit a successful execution of module
|
||||
* @param module the module that will be executed with task
|
||||
* @param task the deferred execution which will be called
|
||||
*/
|
||||
export function executeModule(emitter: Emitter, { module, args }: ExecutePayload) {
|
||||
return from(wrapAsync(async () => module.execute(...args)))
|
||||
.pipe(concatMap(result => {
|
||||
if (result.ok){
|
||||
emitter.emit('module.activate', resultPayload('success', module));
|
||||
return EMPTY;
|
||||
}
|
||||
return throwError(() => resultPayload('failure', module, result.error));
|
||||
}))
|
||||
};
|
||||
|
||||
/**
|
||||
* A higher order function that
|
||||
* - calls all control plugins.
|
||||
* - any failures results to { config.onStop } being called
|
||||
* - if all results are ok, the stream is converted to { config.onNext }
|
||||
* config.onNext will be returned if everything is okay.
|
||||
* @param config
|
||||
* @returns function which calls all plugins and returns onNext or fail
|
||||
*/
|
||||
export function createResultResolver<Output>(config: {
|
||||
onStop?: (module: Module, err?: string) => unknown;
|
||||
onNext: (args: ExecutePayload, map: Record<string, unknown>) => Output;
|
||||
}) {
|
||||
const { onStop, onNext } = config;
|
||||
return async (payload: ExecutePayload) => {
|
||||
const task = await callPlugins(payload);
|
||||
if (!task) throw Error("Plugin did not return anything.");
|
||||
if(!task.ok) {
|
||||
onStop?.(payload.module, String(task.error));
|
||||
} else {
|
||||
return onNext(payload, task.value) as Output;
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
function isObject(item: unknown) {
|
||||
return (item && typeof item === 'object' && !Array.isArray(item));
|
||||
@@ -206,6 +38,22 @@ export async function callInitPlugins(_module: Module, deps: Dependencies, emit?
|
||||
return module
|
||||
}
|
||||
|
||||
export function executeModule(emitter: Emitter, { module, args } : ExecutePayload) {
|
||||
//do not await. this will block sern
|
||||
|
||||
const moduleCalled = wrapAsync(async () => {
|
||||
return module.execute(...args);
|
||||
})
|
||||
moduleCalled
|
||||
.then(() => {
|
||||
emitter.emit('module.activate', resultPayload('success', module) )
|
||||
})
|
||||
.catch(err => {
|
||||
emitter.emit('error', resultPayload('failure', module, err))
|
||||
})
|
||||
};
|
||||
|
||||
|
||||
export async function callPlugins({ args, module }: ExecutePayload) {
|
||||
let state = {};
|
||||
for(const plugin of module.onEvent??[]) {
|
||||
@@ -219,34 +67,3 @@ export async function callPlugins({ args, module }: ExecutePayload) {
|
||||
}
|
||||
return Ok(state);
|
||||
}
|
||||
/**
|
||||
* Creates an executable task ( execute the command ) if all control plugins are successful
|
||||
* this needs to go
|
||||
* @param onStop emits a failure response to the SernEmitter
|
||||
*/
|
||||
export function intoTask(onStop: (m: Module) => unknown) {
|
||||
const onNext = ({ args, module, deps, params }: ExecutePayload, state: Record<string, unknown>) => {
|
||||
|
||||
return {
|
||||
module,
|
||||
args: [...args, { state,
|
||||
deps,
|
||||
params,
|
||||
type: module.type,
|
||||
module: { name: module.name,
|
||||
description: module.description,
|
||||
locals: module.locals,
|
||||
meta: module.meta } }],
|
||||
deps
|
||||
}
|
||||
|
||||
};
|
||||
return createResultResolver({ onStop, onNext });
|
||||
}
|
||||
|
||||
export const handleCrash = ({ "@sern/errors": err, '@sern/emitter': sem, '@sern/logger': log } : UnpackedDependencies, metadata: string) =>
|
||||
pipe(catchError(handleError(err, sem, log)),
|
||||
finalize(() => {
|
||||
log?.info({ message: 'A stream closed: ' + metadata });
|
||||
disposeAll(log);
|
||||
}))
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import type { Module } from '../types/core-modules'
|
||||
import { callPlugins } from './event-utils';
|
||||
import { callPlugins, executeModule } from './event-utils';
|
||||
import { SernError } from '../core/structures/enums'
|
||||
import { createSDT, isAutocomplete, isCommand, isMessageComponent, isModal, treeSearch } from '../core/functions'
|
||||
import { createSDT, isAutocomplete, isCommand, isMessageComponent, isModal, resultPayload, treeSearch } from '../core/functions'
|
||||
import { UnpackedDependencies } from '../types/utility';
|
||||
import * as Id from '../core/id'
|
||||
import { Context } from '../core/structures/context';
|
||||
@@ -12,7 +12,7 @@ export function interactionHandler(deps: UnpackedDependencies, defaultPrefix?: s
|
||||
//i wish javascript had clojure destructuring
|
||||
const { '@sern/client': client,
|
||||
'@sern/modules': moduleManager,
|
||||
'@sern/emitter': emitter } = deps
|
||||
'@sern/emitter': reporter } = deps
|
||||
|
||||
client.on('interactionCreate', async (event) => {
|
||||
|
||||
@@ -45,17 +45,18 @@ export function interactionHandler(deps: UnpackedDependencies, defaultPrefix?: s
|
||||
}
|
||||
const result = await callPlugins(payload)
|
||||
if(!result.ok) {
|
||||
throw Error(result.error ?? SernError.PluginFailure)
|
||||
reporter.emit('module.activate', resultPayload('failure', module, result.error ?? SernError.PluginFailure))
|
||||
return
|
||||
}
|
||||
if(payload.args.length != 2) {
|
||||
throw Error ('assdfasd')
|
||||
throw Error ('Invalid payload')
|
||||
}
|
||||
//@ts-ignore assigning final state from plugin
|
||||
payload.args[1].state = result.value
|
||||
|
||||
|
||||
// will be blocking if long task + await
|
||||
// todo, add to task queue
|
||||
module.execute(...payload.args)
|
||||
|
||||
executeModule(reporter, { module, args: payload.args });
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
import type { Message } from 'discord.js';
|
||||
import { callPlugins} from './event-utils';
|
||||
import { callPlugins, executeModule } from './event-utils';
|
||||
import { SernError } from '../core/structures/enums'
|
||||
import { createSDT, fmt } from '../core/functions'
|
||||
import { createSDT, fmt, resultPayload } from '../core/functions'
|
||||
import { UnpackedDependencies } from '../types/utility';
|
||||
import type { Module } from '../types/core-modules';
|
||||
import { Context } from '../core/structures/context';
|
||||
|
||||
/**
|
||||
* Ignores messages from any person / bot except itself
|
||||
* @param prefix
|
||||
@@ -40,15 +41,13 @@ export function messageHandler (deps: UnpackedDependencies, defaultPrefix?: stri
|
||||
const payload = { module, args: [Context.wrap(message, defaultPrefix), createSDT(module, deps, undefined)] }
|
||||
const result = await callPlugins(payload)
|
||||
if (!result.ok) {
|
||||
// const result = resultPayload('failure', module, SernError.PluginFailure);
|
||||
// emitter.emit('module.activate', result);
|
||||
throw Error(result.error ?? SernError.PluginFailure)
|
||||
emitter.emit('module.activate', resultPayload('failure', module, result.error ?? SernError.PluginFailure))
|
||||
return
|
||||
}
|
||||
//@ts-ignore
|
||||
payload.args[1].state = result.value
|
||||
//todo, add to task queue
|
||||
module.execute(...payload.args)
|
||||
|
||||
executeModule(emitter, { module, args: payload.args })
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import { concatMap, map } from "rxjs"
|
||||
import { Presence } from "../core/presences";
|
||||
import { Services } from "../core/ioc";
|
||||
import * as Files from "../core/module-loading";
|
||||
@@ -9,7 +8,6 @@ const parseConfig = async (conf: Promise<Presence.Result>, setPresence: SetPrese
|
||||
|
||||
if ('repeat' in result) {
|
||||
const { onRepeat, repeat } = result;
|
||||
|
||||
// Validate configuration
|
||||
if (repeat === undefined) {
|
||||
throw new Error("repeat option is undefined");
|
||||
@@ -17,7 +15,6 @@ const parseConfig = async (conf: Promise<Presence.Result>, setPresence: SetPrese
|
||||
if (onRepeat === undefined) {
|
||||
throw new Error("onRepeat callback is undefined, but repeat exists");
|
||||
}
|
||||
|
||||
// Initial state
|
||||
let currentState = result;
|
||||
const processState = async (state: typeof currentState) => {
|
||||
@@ -46,7 +43,7 @@ const parseConfig = async (conf: Promise<Presence.Result>, setPresence: SetPrese
|
||||
|
||||
processState(currentState)
|
||||
.then(newState => {
|
||||
console.log(newState)
|
||||
//console.log(newState)
|
||||
currentState = newState;
|
||||
return setPresence(currentState)
|
||||
})
|
||||
@@ -62,19 +59,22 @@ const parseConfig = async (conf: Promise<Presence.Result>, setPresence: SetPrese
|
||||
}
|
||||
// Handle event-based repeat
|
||||
else {
|
||||
const handler = async () => {
|
||||
currentState = await onRepeat(currentState);
|
||||
await setPresence(currentState);
|
||||
};
|
||||
let has_registered = false;
|
||||
return new Promise((resolve) => {
|
||||
const [target, eventName] = repeat;
|
||||
|
||||
// Immediately return initial state
|
||||
onRepeat(currentState);
|
||||
processState(currentState);
|
||||
|
||||
// Set up event listener
|
||||
const handler = async () => {
|
||||
currentState = await onRepeat(currentState);
|
||||
};
|
||||
|
||||
target.addListener(eventName, handler);
|
||||
|
||||
if(!has_registered) {
|
||||
target.addListener(eventName, handler);
|
||||
has_registered=true;
|
||||
}
|
||||
// Optional: Return cleanup function
|
||||
return () => target.removeListener(eventName, handler);
|
||||
});
|
||||
@@ -93,6 +93,7 @@ export const presenceHandler = async (path: string, setPresence: SetPresence) =>
|
||||
const fetchedServices = Services(...module.inject ?? []);
|
||||
return async () => module.execute(...fetchedServices);
|
||||
})
|
||||
|
||||
return parseConfig(presence(), setPresence);
|
||||
|
||||
}
|
||||
|
||||
@@ -1,22 +1,11 @@
|
||||
import { EventType, SernError } from '../core/structures/enums';
|
||||
import { callInitPlugins, eventDispatcher, handleCrash } from './event-utils'
|
||||
import { callInitPlugins } from './event-utils'
|
||||
import { EventModule, Module } from '../types/core-modules';
|
||||
import * as Files from '../core/module-loading'
|
||||
import type { UnpackedDependencies } from '../types/utility';
|
||||
import { from, map, mergeAll } from 'rxjs';
|
||||
|
||||
const intoDispatcher = (deps: UnpackedDependencies) =>
|
||||
(module : EventModule) => {
|
||||
switch (module.type) {
|
||||
case EventType.Sern:
|
||||
return eventDispatcher(deps, module, deps['@sern/emitter']);
|
||||
case EventType.Discord:
|
||||
return eventDispatcher(deps, module, deps['@sern/client']);
|
||||
case EventType.External:
|
||||
return eventDispatcher(deps, module, deps[module.emitter]);
|
||||
default: throw Error(SernError.InvalidModuleType + ' while creating event handler');
|
||||
}
|
||||
};
|
||||
import type { Emitter } from '../core/interfaces';
|
||||
import { inspect } from 'util'
|
||||
import { resultPayload } from '../core/functions';
|
||||
|
||||
export default async function(deps: UnpackedDependencies, eventDir: string) {
|
||||
const eventModules: EventModule[] = [];
|
||||
@@ -25,9 +14,43 @@ export default async function(deps: UnpackedDependencies, eventDir: string) {
|
||||
await callInitPlugins(module, deps)
|
||||
eventModules.push(module as EventModule);
|
||||
}
|
||||
from(eventModules)
|
||||
.pipe(map(intoDispatcher(deps)),
|
||||
mergeAll(), // all eventListeners are turned on
|
||||
handleCrash(deps, "event modules"))
|
||||
.subscribe();
|
||||
const logger = deps['@sern/logger'], report = deps['@sern/emitter'];
|
||||
for (const module of eventModules) {
|
||||
let source: Emitter;
|
||||
|
||||
switch (module.type) {
|
||||
case EventType.Sern:
|
||||
source=deps['@sern/emitter'];
|
||||
break
|
||||
case EventType.Discord:
|
||||
source=deps['@sern/client'];
|
||||
break
|
||||
case EventType.External:
|
||||
source=deps[module.emitter] as Emitter;
|
||||
break
|
||||
default: throw Error(SernError.InvalidModuleType + ' while creating event handler');
|
||||
}
|
||||
if(!source && typeof source !== 'object') {
|
||||
throw Error(`${source} cannot be constructed into an event listener`)
|
||||
}
|
||||
|
||||
if(!('addListener' in source && 'removeListener' in source)) {
|
||||
throw Error('source must implement Emitter')
|
||||
}
|
||||
const execute = async (...args: any[]) => {
|
||||
try {
|
||||
if(args) {
|
||||
if('once' in module) { source.removeListener(module.name!, execute); }
|
||||
await Reflect.apply(module.execute, null, args);
|
||||
}
|
||||
} catch(e) {
|
||||
const err = e instanceof Error ? e : Error(inspect(e, { colors: true }));
|
||||
if(!report.emit('error', resultPayload('failure', module, err))) {
|
||||
logger?.error({ message: inspect(err) });
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
source.addListener(module.name!, execute)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
//side effect: global container
|
||||
import { useContainerRaw } from '@sern/ioc/global';
|
||||
// set asynchronous capturing of errors
|
||||
import events from 'node:events'
|
||||
events.captureRejections = true;
|
||||
|
||||
import callsites from 'callsites';
|
||||
import * as Files from './core/module-loading';
|
||||
|
||||
@@ -1,13 +1,11 @@
|
||||
//@ts-nocheck
|
||||
import { beforeEach, describe, expect, it, test } from 'vitest';
|
||||
import { callInitPlugins, eventDispatcher } from '../src/handlers/event-utils';
|
||||
import { callInitPlugins } from '../src/handlers/event-utils';
|
||||
|
||||
import { Client } from 'discord.js'
|
||||
import { faker } from '@faker-js/faker';
|
||||
import { Module } from '../src/types/core-modules';
|
||||
import { Processed } from '../src/types/core-modules';
|
||||
import { EventEmitter } from 'events';
|
||||
import { CommandControlPlugin, CommandInitPlugin, CommandType, controller } from '../src';
|
||||
import { CommandControlPlugin, CommandType, controller } from '../src';
|
||||
import { createRandomModule, createRandomInitPlugin } from './setup/util';
|
||||
|
||||
|
||||
@@ -19,23 +17,6 @@ function mockDeps() {
|
||||
}
|
||||
}
|
||||
|
||||
describe('eventDispatcher standard', () => {
|
||||
let m: Processed<Module>;
|
||||
let ee: EventEmitter;
|
||||
beforeEach(() => {
|
||||
ee = new EventEmitter();
|
||||
m = createRandomModule();
|
||||
});
|
||||
|
||||
it('should throw', () => {
|
||||
expect(() => eventDispatcher(mockDeps(), m, 'not event emitter')).toThrowError();
|
||||
});
|
||||
|
||||
it("Shouldn't throw", () => {
|
||||
expect(() => eventDispatcher(mockDeps(), m, ee)).not.toThrowError();
|
||||
});
|
||||
});
|
||||
|
||||
describe('calling init plugins', async () => {
|
||||
let deps;
|
||||
beforeEach(() => {
|
||||
|
||||
Reference in New Issue
Block a user