/** * This file holds sern's rxjs operators used for processing data. * Each function should be modular and testable, not bound to discord / sern * and independent of each other */ import { concatMap, defaultIfEmpty, EMPTY, every, fromEvent, map, Observable, of, OperatorFunction, pipe, share, switchMap, } from 'rxjs'; import type { PluginResult, VoidResult } from '../types/plugin'; import { Result } from 'ts-results-es'; import { Awaitable } from '../types/handler'; import { EventEmitter } from 'node:events'; /** * if {src} is true, mapTo V, else ignore * @param item */ export function filterMapTo(item: () => V): OperatorFunction { return concatMap(shouldKeep => (shouldKeep ? of(item()) : EMPTY)); } export function filterMap( cb: (i: In) => Awaitable>, ): OperatorFunction { return pipe( switchMap(async input => cb(input)), concatMap(s => { if (s.ok) { return of(s.val); } return EMPTY; }), ); } /** * Calls any plugin with {args}. * @param args if an array, its spread and plugin called. */ export function callPlugin(args: unknown): OperatorFunction< { execute: (...args: unknown[]) => PluginResult; }, VoidResult > { return concatMap(async plugin => { if (Array.isArray(args)) { return plugin.execute(...args); } return plugin.execute(args); }); } export const arrayifySource = map(src => (Array.isArray(src) ? (src as unknown[]) : [src])); /** * If the current value in Result stream is an error, calls callback. * This also extracts the Ok value from Result * @param cb * @returns Observable<{ module: T; absPath: string }> */ export function errTap(cb: (err: Err) => void): OperatorFunction, Ok> { return concatMap(result => { if (result.ok) { return of(result.val); } else { cb(result.val as Err); return EMPTY; } }); } /** * Checks if the stream of results is all ok. */ export const everyPluginOk: OperatorFunction = pipe( every(result => result.ok), defaultIfEmpty(true), ); export const sharedObservable = (e: EventEmitter, eventName: string) => { return (fromEvent(e, eventName) as Observable).pipe(share()); };