feat: initial workers implementation

This commit is contained in:
2025-03-31 18:16:21 +02:00
parent e83d0cf713
commit 836b5b6951
10 changed files with 326 additions and 33 deletions

View File

@@ -44,6 +44,8 @@
"media-chrome": "^4.8.0",
"next": "^15.2.3",
"next-themes": "^0.4.4",
"pg": "^8.14.1",
"pg-boss": "^10.1.6",
"react": "19",
"react-dom": "19",
"react-hook-form": "^7.54.2",

View File

@@ -1,5 +1,3 @@
import { getPersonalChannel } from '@/lib/auth/personalChannel';
import { resolveOwnedChannels } from '@/lib/auth/resolve';
import { validateRequest } from '@/lib/auth/validate';
import { prisma } from '@hctv/db';
import { NextRequest } from 'next/server';

View File

@@ -2,4 +2,14 @@ export async function register() {
if (process.env.NEXT_RUNTIME === 'nodejs') {
await (await import('@/lib/instrumentation/streamInfo')).default();
}
}
if (process.env.NEXT_RUNTIME === 'nodejs') {
const { getPgBoss } = await import('@/lib/workers');
const { registerWorkers } = await import('@/lib/workers/register');
await getPgBoss();
await registerWorkers();
console.log('pgboss workers registered');
}
}

View File

@@ -1,13 +0,0 @@
import { IngressClient, RoomServiceClient } from 'livekit-server-sdk';
export const roomService = new RoomServiceClient(
process.env.NEXT_PUBLIC_LIVEKIT_URL!,
process.env.LIVEKIT_API_KEY,
process.env.LIVEKIT_SECRET
);
export const ingressClient = new IngressClient(
process.env.NEXT_PUBLIC_LIVEKIT_URL!,
process.env.LIVEKIT_API_KEY,
process.env.LIVEKIT_SECRET
);

View File

@@ -0,0 +1,4 @@
import { WebClient } from '@slack/web-api';
const client = new WebClient(process.env.SLACK_NOTIFIER_TOKEN);
export default client;

View File

@@ -0,0 +1,91 @@
import PgBoss from 'pg-boss';
export type JobName =
| 'notifier:sendMsg';
export interface JobDefinitions {
'notifier:sendMsg': {
payload: {
msg: string;
channelId: string;
};
result: {
success: boolean;
error?: string;
};
};
}
export type PayloadFor<T extends JobName> = JobDefinitions[T]['payload'];
export type ResultFor<T extends JobName> = JobDefinitions[T]['result'];
export class TypedPgBoss {
private instance: PgBoss;
constructor(connectionString: string) {
this.instance = new PgBoss(connectionString);
}
async start(): Promise<PgBoss> {
return this.instance.start();
}
async stop(): Promise<void> {
return this.instance.stop();
}
async send<T extends JobName>(
name: T,
payload: PayloadFor<T>,
options?: PgBoss.SendOptions
): Promise<string | null> {
return options ? this.instance.send(name, payload, options) : this.instance.send(name, payload);
}
async schedule<T extends JobName>(
name: T,
payload: PayloadFor<T>,
cron: string,
options?: PgBoss.ScheduleOptions
): Promise<void> {
return this.instance.schedule(name, cron, payload, options);
}
async work<T extends JobName>(
name: T,
handler: (job: PgBoss.Job<PayloadFor<T>>) => Promise<ResultFor<T>> | void,
options?: PgBoss.WorkOptions
): Promise<string> {
const wrappedHandler: PgBoss.WorkHandler<unknown> = async (job) => {
return await handler(job as unknown as PgBoss.Job<PayloadFor<T>>);
};
return this.instance.work(name, options || {}, wrappedHandler);
}
getInstance(): PgBoss {
return this.instance;
}
}
let pgBossInstance: TypedPgBoss | null = null;
export async function getPgBoss(): Promise<TypedPgBoss> {
if (!pgBossInstance) {
if (!process.env.DATABASE_URL) {
throw new Error('DATABASE_URL environment variable is not set');
}
pgBossInstance = new TypedPgBoss(process.env.DATABASE_URL);
await pgBossInstance.start();
console.log('PgBoss started successfully');
}
return pgBossInstance;
}
export async function closePgBoss(): Promise<void> {
if (pgBossInstance) {
await pgBossInstance.getInstance().stop();
pgBossInstance = null;
console.log('PgBoss stopped successfully');
}
}

View File

@@ -0,0 +1,21 @@
import { getPgBoss } from '@/lib/workers';
import snClient from '../services/slackNotifier';
export async function registerWorkers() {
const boss = await getPgBoss();
await boss.work('notifier:sendMsg', async (job) => {
console.log('Processing job:', job.id);
const { data } = job;
await snClient.chat.postMessage({
channel: data.channelId,
text: data.msg,
}).catch(e => {
return { success: false, error: e.message };
});
return { success: true };
});
console.log('All workers registered successfully');
}

View File

@@ -0,0 +1,64 @@
// Define a union type of all possible job names
export type JobName =
| 'email:send'
| 'video:process'
| 'notification:push'
| 'user:sync'
// Add more job names as needed
// Define payload and result types for each job
export interface JobDefinitions {
'email:send': {
payload: {
to: string;
subject: string;
body: string;
attachments?: Array<{name: string, content: string}>;
};
result: {
sent: boolean;
messageId?: string;
error?: string;
};
};
'video:process': {
payload: {
videoId: string;
formats: string[];
resolution?: string;
};
result: {
success: boolean;
processedFormats: string[];
duration: number;
};
};
'notification:push': {
payload: {
userId: string;
message: string;
data?: Record<string, any>;
};
result: {
delivered: boolean;
deviceCount: number;
};
};
'user:sync': {
payload: {
userId: string;
externalSystems: string[];
};
result: {
syncedSystems: string[];
failedSystems: string[];
};
};
}
export type PayloadFor<T extends JobName> = JobDefinitions[T]['payload'];
export type ResultFor<T extends JobName> = JobDefinitions[T]['result'];

View File

@@ -25,23 +25,23 @@ services:
- /dev/shm/hls:/dev/shm/hls
image: flv-module
entrypoint:
- /bin/sh
- -c
- |
# Process the template file
mkdir -p /usr/local/nginx/conf
envsubst '$${API_AUTH}' < /etc/nginx/templates/nginx.conf.template > /usr/local/nginx/conf/nginx.conf
echo "Setting UID to $${UID} and GID to $${GID}"
usermod -u $${UID} nginx || echo "failed to change uid"
groupmod -g $${GID} nginx || echo "failed to change gid"
- /bin/sh
- -c
- |
# Process the template file
mkdir -p /usr/local/nginx/conf
envsubst '$${API_AUTH}' < /etc/nginx/templates/nginx.conf.template > /usr/local/nginx/conf/nginx.conf
mkdir -p /usr/local/nginx/proxy_temp /usr/local/nginx/client_body_temp
chown -R nginx:nginx /usr/local/nginx
mkdir -p /var/www/html
chown -R nginx:nginx /var/www/html
echo "Setting UID to $${UID} and GID to $${GID}"
usermod -u $${UID} nginx || echo "failed to change uid"
groupmod -g $${GID} nginx || echo "failed to change gid"
echo "testing nginx config..."
/usr/local/nginx/sbin/nginx -t
mkdir -p /usr/local/nginx/proxy_temp /usr/local/nginx/client_body_temp
chown -R nginx:nginx /usr/local/nginx
mkdir -p /var/www/html
chown -R nginx:nginx /var/www/html
/usr/local/nginx/sbin/nginx -g 'daemon off;'
echo "testing nginx config..."
/usr/local/nginx/sbin/nginx -t
/usr/local/nginx/sbin/nginx -g 'daemon off;'

116
yarn.lock
View File

@@ -2370,6 +2370,13 @@ cosmiconfig@^8.1.3:
parse-json "^5.2.0"
path-type "^4.0.0"
cron-parser@^4.9.0:
version "4.9.0"
resolved "https://registry.yarnpkg.com/cron-parser/-/cron-parser-4.9.0.tgz#0340694af3e46a0894978c6f52a6dbb5c0f11ad5"
integrity sha512-p0SaNjrHOnQeR8/VnfGbmg9te2kfyYSQ7Sc/j/6DtPL3JQvKxmjO9TSjNFpujqV3vEYYBvNNvXSxzyksBWAx1Q==
dependencies:
luxon "^3.2.1"
cross-spawn@^7.0.2, cross-spawn@^7.0.3, cross-spawn@^7.0.6:
version "7.0.6"
resolved "https://registry.yarnpkg.com/cross-spawn/-/cross-spawn-7.0.6.tgz#8a58fe78f00dcd70c370451759dfbfaf03e8ee9f"
@@ -3994,6 +4001,11 @@ lucide-react@^0.473.0:
resolved "https://registry.yarnpkg.com/lucide-react/-/lucide-react-0.473.0.tgz#f01dcde458e55bce766a282f4f5894970a9f24ac"
integrity sha512-KW6u5AKeIjkvrxXZ6WuCu9zHE/gEYSXCay+Gre2ZoInD0Je/e3RBtP4OHpJVJ40nDklSvjVKjgH7VU8/e2dzRw==
luxon@^3.2.1:
version "3.6.0"
resolved "https://registry.yarnpkg.com/luxon/-/luxon-3.6.0.tgz#e84453dbdbd716b5eac95bee702b379863059f83"
integrity sha512-WE7p0p7W1xji9qxkLYsvcIxZyfP48GuFrWIBQZIsbjCyf65dG1rv4n83HcOyEyhvzxJCrUoObCRNFgRNIQ5KNA==
map-obj@5.0.0:
version "5.0.0"
resolved "https://registry.yarnpkg.com/map-obj/-/map-obj-5.0.0.tgz#126c98596b63927d7360f287cccc67177aa1938b"
@@ -4439,6 +4451,71 @@ path-type@^4.0.0:
resolved "https://registry.yarnpkg.com/path-type/-/path-type-4.0.0.tgz#84ed01c0a7ba380afe09d90a8c180dcd9d03043b"
integrity sha512-gDKb8aZMDeD/tZWs9P6+q0J9Mwkdl6xMV8TjnGP3qJVJ06bdMgkbBlLU8IdfOsIsFz2BW1rNVT3XuNEl8zPAvw==
pg-boss@^10.1.6:
version "10.1.6"
resolved "https://registry.yarnpkg.com/pg-boss/-/pg-boss-10.1.6.tgz#6dea4742fb953b7985ae4b09a6c4d79047801715"
integrity sha512-O3ujMF4aL/EDucyTlCpJ2bqNJendWnMKwuLaWDqVVt3CDSJ6/iBRvj3PMiO5exwlqHYLQkZnHCutzdc32nPpOA==
dependencies:
cron-parser "^4.9.0"
pg "^8.13.0"
serialize-error "^8.1.0"
pg-cloudflare@^1.1.1:
version "1.1.1"
resolved "https://registry.yarnpkg.com/pg-cloudflare/-/pg-cloudflare-1.1.1.tgz#e6d5833015b170e23ae819e8c5d7eaedb472ca98"
integrity sha512-xWPagP/4B6BgFO+EKz3JONXv3YDgvkbVrGw2mTo3D6tVDQRh1e7cqVGvyR3BE+eQgAvx1XhW/iEASj4/jCWl3Q==
pg-connection-string@^2.7.0:
version "2.7.0"
resolved "https://registry.yarnpkg.com/pg-connection-string/-/pg-connection-string-2.7.0.tgz#f1d3489e427c62ece022dba98d5262efcb168b37"
integrity sha512-PI2W9mv53rXJQEOb8xNR8lH7Hr+EKa6oJa38zsK0S/ky2er16ios1wLKhZyxzD7jUReiWokc9WK5nxSnC7W1TA==
pg-int8@1.0.1:
version "1.0.1"
resolved "https://registry.yarnpkg.com/pg-int8/-/pg-int8-1.0.1.tgz#943bd463bf5b71b4170115f80f8efc9a0c0eb78c"
integrity sha512-WCtabS6t3c8SkpDBUlb1kjOs7l66xsGdKpIPZsg4wR+B3+u9UAum2odSsF9tnvxg80h4ZxLWMy4pRjOsFIqQpw==
pg-pool@^3.8.0:
version "3.8.0"
resolved "https://registry.yarnpkg.com/pg-pool/-/pg-pool-3.8.0.tgz#e6bce7fc4506a8d6106551363fc5283e5445b776"
integrity sha512-VBw3jiVm6ZOdLBTIcXLNdSotb6Iy3uOCwDGFAksZCXmi10nyRvnP2v3jl4d+IsLYRyXf6o9hIm/ZtUzlByNUdw==
pg-protocol@^1.8.0:
version "1.8.0"
resolved "https://registry.yarnpkg.com/pg-protocol/-/pg-protocol-1.8.0.tgz#c707101dd07813868035a44571488e4b98639d48"
integrity sha512-jvuYlEkL03NRvOoyoRktBK7+qU5kOvlAwvmrH8sr3wbLrOdVWsRxQfz8mMy9sZFsqJ1hEWNfdWKI4SAmoL+j7g==
pg-types@^2.1.0:
version "2.2.0"
resolved "https://registry.yarnpkg.com/pg-types/-/pg-types-2.2.0.tgz#2d0250d636454f7cfa3b6ae0382fdfa8063254a3"
integrity sha512-qTAAlrEsl8s4OiEQY69wDvcMIdQN6wdz5ojQiOy6YRMuynxenON0O5oCpJI6lshc6scgAY8qvJ2On/p+CXY0GA==
dependencies:
pg-int8 "1.0.1"
postgres-array "~2.0.0"
postgres-bytea "~1.0.0"
postgres-date "~1.0.4"
postgres-interval "^1.1.0"
pg@^8.13.0, pg@^8.14.1:
version "8.14.1"
resolved "https://registry.yarnpkg.com/pg/-/pg-8.14.1.tgz#2e3d1f287b64797cdfc8d1ba000f61a7ff8d66ed"
integrity sha512-0TdbqfjwIun9Fm/r89oB7RFQ0bLgduAhiIqIXOsyKoiC/L54DbuAAzIEN/9Op0f1Po9X7iCPXGoa/Ah+2aI8Xw==
dependencies:
pg-connection-string "^2.7.0"
pg-pool "^3.8.0"
pg-protocol "^1.8.0"
pg-types "^2.1.0"
pgpass "1.x"
optionalDependencies:
pg-cloudflare "^1.1.1"
pgpass@1.x:
version "1.0.5"
resolved "https://registry.yarnpkg.com/pgpass/-/pgpass-1.0.5.tgz#9b873e4a564bb10fa7a7dbd55312728d422a223d"
integrity sha512-FdW9r/jQZhSeohs1Z3sI1yxFQNFvMcnmfuj4WBMUTxOrAyLMaTcE1aAMBiTlbMNaXvBCQuVi0R7hd8udDSP7ug==
dependencies:
split2 "^4.1.0"
picocolors@^1.0.0, picocolors@^1.1.1:
version "1.1.1"
resolved "https://registry.yarnpkg.com/picocolors/-/picocolors-1.1.1.tgz#3d321af3eab939b083c8f929a1d12cda81c26b6b"
@@ -4538,6 +4615,28 @@ postcss@^8, postcss@^8.4.24, postcss@^8.4.47:
picocolors "^1.1.1"
source-map-js "^1.2.1"
postgres-array@~2.0.0:
version "2.0.0"
resolved "https://registry.yarnpkg.com/postgres-array/-/postgres-array-2.0.0.tgz#48f8fce054fbc69671999329b8834b772652d82e"
integrity sha512-VpZrUqU5A69eQyW2c5CA1jtLecCsN2U/bD6VilrFDWq5+5UIEVO7nazS3TEcHf1zuPYO/sqGvUvW62g86RXZuA==
postgres-bytea@~1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/postgres-bytea/-/postgres-bytea-1.0.0.tgz#027b533c0aa890e26d172d47cf9ccecc521acd35"
integrity sha512-xy3pmLuQqRBZBXDULy7KbaitYqLcmxigw14Q5sj8QBVLqEwXfeybIKVWiqAXTlcvdvb0+xkOtDbfQMOf4lST1w==
postgres-date@~1.0.4:
version "1.0.7"
resolved "https://registry.yarnpkg.com/postgres-date/-/postgres-date-1.0.7.tgz#51bc086006005e5061c591cee727f2531bf641a8"
integrity sha512-suDmjLVQg78nMK2UZ454hAG+OAW+HQPZ6n++TNDUX+L0+uUlLywnoxJKDou51Zm+zTCjrCl0Nq6J9C5hP9vK/Q==
postgres-interval@^1.1.0:
version "1.2.0"
resolved "https://registry.yarnpkg.com/postgres-interval/-/postgres-interval-1.2.0.tgz#b460c82cb1587507788819a06aa0fffdb3544695"
integrity sha512-9ZhXKM/rw350N1ovuWHbGxnGh/SNJ4cnxHiM0rxE4VN41wsg8P8zWn9hv/buK00RP4WvlOyr/RBDiptyxVbkZQ==
dependencies:
xtend "^4.0.0"
prelude-ls@^1.2.1:
version "1.2.1"
resolved "https://registry.yarnpkg.com/prelude-ls/-/prelude-ls-1.2.1.tgz#debc6489d7a6e6b0e7611888cec880337d316396"
@@ -4912,6 +5011,13 @@ semver@^7.6.0, semver@^7.6.3:
resolved "https://registry.yarnpkg.com/semver/-/semver-7.7.1.tgz#abd5098d82b18c6c81f6074ff2647fd3e7220c9f"
integrity sha512-hlq8tAfn0m/61p4BVRcPzIGr6LKiMwo4VM6dGi6pt4qcRkmNzTcWq6eCEjEh+qXjkMDvPlOFFSGwQjoEa6gyMA==
serialize-error@^8.1.0:
version "8.1.0"
resolved "https://registry.yarnpkg.com/serialize-error/-/serialize-error-8.1.0.tgz#3a069970c712f78634942ddd50fbbc0eaebe2f67"
integrity sha512-3NnuWfM6vBYoy5gZFvHiYsVbafvI9vZv/+jlIigFn4oP4zjNPK3LhcY0xSCgeb1a5L8jO71Mit9LlNoi2UfDDQ==
dependencies:
type-fest "^0.20.2"
set-function-length@^1.2.2:
version "1.2.2"
resolved "https://registry.yarnpkg.com/set-function-length/-/set-function-length-1.2.2.tgz#aac72314198eaed975cf77b2c3b6b880695e5449"
@@ -5096,6 +5202,11 @@ source-map@~0.6.1:
resolved "https://registry.yarnpkg.com/source-map/-/source-map-0.6.1.tgz#74722af32e9614e9c287a8d0bbde48b5e2f1a263"
integrity sha512-UjgapumWlbMhkBgzT7Ykc5YXUT46F0iKu8SGXq0bcwP5dz/h0Plj6enJqjz1Zbq2l5WaqYnrVbwWOWMyF3F47g==
split2@^4.1.0:
version "4.2.0"
resolved "https://registry.yarnpkg.com/split2/-/split2-4.2.0.tgz#c9c5920904d148bab0b9f67145f245a86aadbfa4"
integrity sha512-UcjcJOWknrNkF6PLX83qcHM6KHgVKNkV62Y8a5uYDVv9ydGQVwAHMKqHdJje1VTWpljG0WYpCDhrCdAOYH4TWg==
stable-hash@^0.0.5:
version "0.0.5"
resolved "https://registry.yarnpkg.com/stable-hash/-/stable-hash-0.0.5.tgz#94e8837aaeac5b4d0f631d2972adef2924b40269"
@@ -5816,6 +5927,11 @@ ws@^8.17.0, ws@^8.18.1:
resolved "https://registry.yarnpkg.com/ws/-/ws-8.18.1.tgz#ea131d3784e1dfdff91adb0a4a116b127515e3cb"
integrity sha512-RKW2aJZMXeMxVpnZ6bck+RswznaxmzdULiBr6KY7XkTnW8uvt0iT9H5DkHUChXrc+uurzwa0rVI16n/Xzjdz1w==
xtend@^4.0.0:
version "4.0.2"
resolved "https://registry.yarnpkg.com/xtend/-/xtend-4.0.2.tgz#bb72779f5fa465186b1f438f674fa347fdb5db54"
integrity sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ==
yallist@^3.0.2:
version "3.1.1"
resolved "https://registry.yarnpkg.com/yallist/-/yallist-3.1.1.tgz#dbb7daf9bfd8bac9ab45ebf602b8cbad0d5d08fd"