feat: user notifications working, moving to bullmq

This commit is contained in:
2025-03-31 20:40:15 +02:00
parent 836b5b6951
commit 648d3296e2
6 changed files with 54 additions and 23 deletions

View File

@@ -1,4 +1,5 @@
import { validateRequest } from '@/lib/auth/validate';
import { getPgBoss } from '@/lib/workers';
import { prisma } from '@hctv/db';
import { NextRequest } from 'next/server';
@@ -41,6 +42,7 @@ export async function GET(request: NextRequest) {
export async function POST(request: NextRequest) {
const { user } = await validateRequest();
const boss = await getPgBoss();
const searchParams = new URL(request.url).searchParams;
const username = searchParams.get('username');
if (!user) {
@@ -99,6 +101,13 @@ export async function POST(request: NextRequest) {
},
},
});
const jobId = await boss.send('notifier:sendMsg', {
text: `You started following \`${username}\`!\n_Stream notifications are enabled by default. If you want to disable them, you can do so in \`Profile > Notifications\`._`,
channel: user.slack_id,
});
console.log(`Job sent with ID: ${jobId} for user ${user.id} following ${username}`);
}
return new Response(JSON.stringify({ following: !isFollowing }), { status: 200 });

View File

@@ -1,3 +1,4 @@
import type { ChatPostMessageArguments } from '@slack/web-api';
import PgBoss from 'pg-boss';
export type JobName =
@@ -5,10 +6,7 @@ export type JobName =
export interface JobDefinitions {
'notifier:sendMsg': {
payload: {
msg: string;
channelId: string;
};
payload: ChatPostMessageArguments;
result: {
success: boolean;
error?: string;
@@ -39,7 +37,8 @@ export class TypedPgBoss {
payload: PayloadFor<T>,
options?: PgBoss.SendOptions
): Promise<string | null> {
return options ? this.instance.send(name, payload, options) : this.instance.send(name, payload);
await this.instance.createQueue(name);
return this.instance.send(name, payload, options!);
}
async schedule<T extends JobName>(
@@ -56,8 +55,14 @@ export class TypedPgBoss {
handler: (job: PgBoss.Job<PayloadFor<T>>) => Promise<ResultFor<T>> | void,
options?: PgBoss.WorkOptions
): Promise<string> {
const wrappedHandler: PgBoss.WorkHandler<unknown> = async (job) => {
return await handler(job as unknown as PgBoss.Job<PayloadFor<T>>);
const wrappedHandler: PgBoss.WorkHandler<unknown> = async (job: PgBoss.Job<unknown> | PgBoss.Job<unknown>[]) => {
const singleJob = Array.isArray(job) ? job[0] : job;
const processedJob = {...singleJob};
if (Array.isArray(singleJob.data) && singleJob.data.length === 1) {
processedJob.data = singleJob.data[0];
}
return await handler(processedJob as PgBoss.Job<PayloadFor<T>>);
};
return this.instance.work(name, options || {}, wrappedHandler);
@@ -68,24 +73,40 @@ export class TypedPgBoss {
}
}
let pgBossInstance: TypedPgBoss | null = null;
const globalForPgBoss = global as unknown as { pgBoss: TypedPgBoss | null };
// Initialize if it doesn't exist yet
if (!globalForPgBoss.pgBoss) {
globalForPgBoss.pgBoss = null;
}
// Get or create the singleton instance
export async function getPgBoss(): Promise<TypedPgBoss> {
if (!pgBossInstance) {
if (!globalForPgBoss.pgBoss) {
if (!process.env.DATABASE_URL) {
throw new Error('DATABASE_URL environment variable is not set');
}
pgBossInstance = new TypedPgBoss(process.env.DATABASE_URL);
await pgBossInstance.start();
console.log('PgBoss started successfully');
console.log('Creating new PgBoss instance...');
const newBoss = new TypedPgBoss(process.env.DATABASE_URL);
try {
await newBoss.start();
console.log('PgBoss started successfully');
globalForPgBoss.pgBoss = newBoss;
} catch (error) {
console.error('Failed to start PgBoss:', error);
throw error;
}
}
return pgBossInstance;
return globalForPgBoss.pgBoss;
}
export async function closePgBoss(): Promise<void> {
if (pgBossInstance) {
await pgBossInstance.getInstance().stop();
pgBossInstance = null;
if (globalForPgBoss.pgBoss) {
await globalForPgBoss.pgBoss.stop();
globalForPgBoss.pgBoss = null;
console.log('PgBoss stopped successfully');
}
}
}

View File

@@ -6,12 +6,8 @@ export async function registerWorkers() {
await boss.work('notifier:sendMsg', async (job) => {
console.log('Processing job:', job.id);
const { data } = job;
await snClient.chat.postMessage({
channel: data.channelId,
text: data.msg,
}).catch(e => {
await snClient.chat.postMessage(job.data).catch(e => {
return { success: false, error: e.message };
});
return { success: true };

View File

@@ -13,7 +13,8 @@
"lint": "turbo run lint",
"docker:web": "dotenvx run -f .env.docker -- docker buildx build --platform linux/amd64 -f apps/web/Dockerfile . --secret id=TURBO_TOKEN,env=TURBO_TOKEN --secret id=TURBO_TEAM,env=TURBO_TEAM --no-cache",
"docker:chat": "dotenvx run -f .env.docker -- docker buildx build --platform linux/amd64 -f apps/chat/Dockerfile . --secret id=TURBO_TOKEN,env=TURBO_TOKEN --secret id=TURBO_TEAM,env=TURBO_TEAM --no-cache",
"act": "act --secret-file .env.ci"
"act": "act --secret-file .env.ci",
"db:migrate": "yarn workspace @hctv/db db:migrate"
},
"devDependencies": {
"turbo": "^2.4.4"

View File

@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "Follow" ADD COLUMN "notifyStream" BOOLEAN NOT NULL DEFAULT true;

View File

@@ -88,6 +88,8 @@ model Follow {
channel Channel @relation("ChannelFollowers", fields: [channelId], references: [id], onDelete: Cascade)
channelId String
notifyStream Boolean @default(true)
@@unique([userId, channelId])
@@index([userId])