From 648d3296e2f8e864a97aff766d9adced230404d4 Mon Sep 17 00:00:00 2001 From: Izan Gil <66965250+SrIzan10@users.noreply.github.com> Date: Mon, 31 Mar 2025 20:40:15 +0200 Subject: [PATCH] feat: user notifications working, moving to bullmq --- .../(protected)/api/stream/follow/route.ts | 9 +++ apps/web/src/lib/workers/index.ts | 55 +++++++++++++------ apps/web/src/lib/workers/register.ts | 6 +- package.json | 3 +- .../migration.sql | 2 + packages/db/prisma/schema.prisma | 2 + 6 files changed, 54 insertions(+), 23 deletions(-) create mode 100644 packages/db/prisma/migrations/20250331173121_notification_db1/migration.sql diff --git a/apps/web/src/app/(protected)/api/stream/follow/route.ts b/apps/web/src/app/(protected)/api/stream/follow/route.ts index f9a1ccd..7d55d70 100644 --- a/apps/web/src/app/(protected)/api/stream/follow/route.ts +++ b/apps/web/src/app/(protected)/api/stream/follow/route.ts @@ -1,4 +1,5 @@ import { validateRequest } from '@/lib/auth/validate'; +import { getPgBoss } from '@/lib/workers'; import { prisma } from '@hctv/db'; import { NextRequest } from 'next/server'; @@ -41,6 +42,7 @@ export async function GET(request: NextRequest) { export async function POST(request: NextRequest) { const { user } = await validateRequest(); + const boss = await getPgBoss(); const searchParams = new URL(request.url).searchParams; const username = searchParams.get('username'); if (!user) { @@ -99,6 +101,13 @@ export async function POST(request: NextRequest) { }, }, }); + + const jobId = await boss.send('notifier:sendMsg', { + text: `You started following \`${username}\`!\n_Stream notifications are enabled by default. If you want to disable them, you can do so in \`Profile > Notifications\`._`, + channel: user.slack_id, + }); + + console.log(`Job sent with ID: ${jobId} for user ${user.id} following ${username}`); } return new Response(JSON.stringify({ following: !isFollowing }), { status: 200 }); diff --git a/apps/web/src/lib/workers/index.ts b/apps/web/src/lib/workers/index.ts index a7b43c6..19b9ac0 100644 --- a/apps/web/src/lib/workers/index.ts +++ b/apps/web/src/lib/workers/index.ts @@ -1,3 +1,4 @@ +import type { ChatPostMessageArguments } from '@slack/web-api'; import PgBoss from 'pg-boss'; export type JobName = @@ -5,10 +6,7 @@ export type JobName = export interface JobDefinitions { 'notifier:sendMsg': { - payload: { - msg: string; - channelId: string; - }; + payload: ChatPostMessageArguments; result: { success: boolean; error?: string; @@ -39,7 +37,8 @@ export class TypedPgBoss { payload: PayloadFor, options?: PgBoss.SendOptions ): Promise { - return options ? this.instance.send(name, payload, options) : this.instance.send(name, payload); + await this.instance.createQueue(name); + return this.instance.send(name, payload, options!); } async schedule( @@ -56,8 +55,14 @@ export class TypedPgBoss { handler: (job: PgBoss.Job>) => Promise> | void, options?: PgBoss.WorkOptions ): Promise { - const wrappedHandler: PgBoss.WorkHandler = async (job) => { - return await handler(job as unknown as PgBoss.Job>); + const wrappedHandler: PgBoss.WorkHandler = async (job: PgBoss.Job | PgBoss.Job[]) => { + const singleJob = Array.isArray(job) ? job[0] : job; + const processedJob = {...singleJob}; + if (Array.isArray(singleJob.data) && singleJob.data.length === 1) { + processedJob.data = singleJob.data[0]; + } + + return await handler(processedJob as PgBoss.Job>); }; return this.instance.work(name, options || {}, wrappedHandler); @@ -68,24 +73,40 @@ export class TypedPgBoss { } } -let pgBossInstance: TypedPgBoss | null = null; +const globalForPgBoss = global as unknown as { pgBoss: TypedPgBoss | null }; +// Initialize if it doesn't exist yet +if (!globalForPgBoss.pgBoss) { + globalForPgBoss.pgBoss = null; +} + +// Get or create the singleton instance export async function getPgBoss(): Promise { - if (!pgBossInstance) { + if (!globalForPgBoss.pgBoss) { if (!process.env.DATABASE_URL) { throw new Error('DATABASE_URL environment variable is not set'); } - pgBossInstance = new TypedPgBoss(process.env.DATABASE_URL); - await pgBossInstance.start(); - console.log('PgBoss started successfully'); + + console.log('Creating new PgBoss instance...'); + const newBoss = new TypedPgBoss(process.env.DATABASE_URL); + + try { + await newBoss.start(); + console.log('PgBoss started successfully'); + globalForPgBoss.pgBoss = newBoss; + } catch (error) { + console.error('Failed to start PgBoss:', error); + throw error; + } } - return pgBossInstance; + + return globalForPgBoss.pgBoss; } export async function closePgBoss(): Promise { - if (pgBossInstance) { - await pgBossInstance.getInstance().stop(); - pgBossInstance = null; + if (globalForPgBoss.pgBoss) { + await globalForPgBoss.pgBoss.stop(); + globalForPgBoss.pgBoss = null; console.log('PgBoss stopped successfully'); } -} \ No newline at end of file +} diff --git a/apps/web/src/lib/workers/register.ts b/apps/web/src/lib/workers/register.ts index c6260b8..2e275ce 100644 --- a/apps/web/src/lib/workers/register.ts +++ b/apps/web/src/lib/workers/register.ts @@ -6,12 +6,8 @@ export async function registerWorkers() { await boss.work('notifier:sendMsg', async (job) => { console.log('Processing job:', job.id); - const { data } = job; - await snClient.chat.postMessage({ - channel: data.channelId, - text: data.msg, - }).catch(e => { + await snClient.chat.postMessage(job.data).catch(e => { return { success: false, error: e.message }; }); return { success: true }; diff --git a/package.json b/package.json index 3295cb4..daf9a02 100644 --- a/package.json +++ b/package.json @@ -13,7 +13,8 @@ "lint": "turbo run lint", "docker:web": "dotenvx run -f .env.docker -- docker buildx build --platform linux/amd64 -f apps/web/Dockerfile . --secret id=TURBO_TOKEN,env=TURBO_TOKEN --secret id=TURBO_TEAM,env=TURBO_TEAM --no-cache", "docker:chat": "dotenvx run -f .env.docker -- docker buildx build --platform linux/amd64 -f apps/chat/Dockerfile . --secret id=TURBO_TOKEN,env=TURBO_TOKEN --secret id=TURBO_TEAM,env=TURBO_TEAM --no-cache", - "act": "act --secret-file .env.ci" + "act": "act --secret-file .env.ci", + "db:migrate": "yarn workspace @hctv/db db:migrate" }, "devDependencies": { "turbo": "^2.4.4" diff --git a/packages/db/prisma/migrations/20250331173121_notification_db1/migration.sql b/packages/db/prisma/migrations/20250331173121_notification_db1/migration.sql new file mode 100644 index 0000000..bdb5cd1 --- /dev/null +++ b/packages/db/prisma/migrations/20250331173121_notification_db1/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "Follow" ADD COLUMN "notifyStream" BOOLEAN NOT NULL DEFAULT true; diff --git a/packages/db/prisma/schema.prisma b/packages/db/prisma/schema.prisma index 256e8c0..4eb2678 100644 --- a/packages/db/prisma/schema.prisma +++ b/packages/db/prisma/schema.prisma @@ -88,6 +88,8 @@ model Follow { channel Channel @relation("ChannelFollowers", fields: [channelId], references: [id], onDelete: Cascade) channelId String + + notifyStream Boolean @default(true) @@unique([userId, channelId]) @@index([userId])