diff --git a/apps/web/package.json b/apps/web/package.json index 2502c66..5ecb3d4 100644 --- a/apps/web/package.json +++ b/apps/web/package.json @@ -44,6 +44,8 @@ "media-chrome": "^4.8.0", "next": "^15.2.3", "next-themes": "^0.4.4", + "pg": "^8.14.1", + "pg-boss": "^10.1.6", "react": "19", "react-dom": "19", "react-hook-form": "^7.54.2", diff --git a/apps/web/src/app/(protected)/api/stream/follow/route.ts b/apps/web/src/app/(protected)/api/stream/follow/route.ts index 130629f..f9a1ccd 100644 --- a/apps/web/src/app/(protected)/api/stream/follow/route.ts +++ b/apps/web/src/app/(protected)/api/stream/follow/route.ts @@ -1,5 +1,3 @@ -import { getPersonalChannel } from '@/lib/auth/personalChannel'; -import { resolveOwnedChannels } from '@/lib/auth/resolve'; import { validateRequest } from '@/lib/auth/validate'; import { prisma } from '@hctv/db'; import { NextRequest } from 'next/server'; diff --git a/apps/web/src/instrumentation.ts b/apps/web/src/instrumentation.ts index 8290c76..7f49062 100644 --- a/apps/web/src/instrumentation.ts +++ b/apps/web/src/instrumentation.ts @@ -2,4 +2,14 @@ export async function register() { if (process.env.NEXT_RUNTIME === 'nodejs') { await (await import('@/lib/instrumentation/streamInfo')).default(); } -} \ No newline at end of file + if (process.env.NEXT_RUNTIME === 'nodejs') { + const { getPgBoss } = await import('@/lib/workers'); + const { registerWorkers } = await import('@/lib/workers/register'); + + await getPgBoss(); + + await registerWorkers(); + + console.log('pgboss workers registered'); + } +} diff --git a/apps/web/src/lib/services/livekit.ts b/apps/web/src/lib/services/livekit.ts deleted file mode 100644 index 066ed61..0000000 --- a/apps/web/src/lib/services/livekit.ts +++ /dev/null @@ -1,13 +0,0 @@ -import { IngressClient, RoomServiceClient } from 'livekit-server-sdk'; - -export const roomService = new RoomServiceClient( - process.env.NEXT_PUBLIC_LIVEKIT_URL!, - process.env.LIVEKIT_API_KEY, - process.env.LIVEKIT_SECRET -); - -export const ingressClient = new IngressClient( - process.env.NEXT_PUBLIC_LIVEKIT_URL!, - process.env.LIVEKIT_API_KEY, - process.env.LIVEKIT_SECRET -); diff --git a/apps/web/src/lib/services/slackNotifier.ts b/apps/web/src/lib/services/slackNotifier.ts new file mode 100644 index 0000000..c394ab1 --- /dev/null +++ b/apps/web/src/lib/services/slackNotifier.ts @@ -0,0 +1,4 @@ +import { WebClient } from '@slack/web-api'; + +const client = new WebClient(process.env.SLACK_NOTIFIER_TOKEN); +export default client; \ No newline at end of file diff --git a/apps/web/src/lib/workers/index.ts b/apps/web/src/lib/workers/index.ts new file mode 100644 index 0000000..a7b43c6 --- /dev/null +++ b/apps/web/src/lib/workers/index.ts @@ -0,0 +1,91 @@ +import PgBoss from 'pg-boss'; + +export type JobName = + | 'notifier:sendMsg'; + +export interface JobDefinitions { + 'notifier:sendMsg': { + payload: { + msg: string; + channelId: string; + }; + result: { + success: boolean; + error?: string; + }; + }; +} + +export type PayloadFor = JobDefinitions[T]['payload']; +export type ResultFor = JobDefinitions[T]['result']; + +export class TypedPgBoss { + private instance: PgBoss; + + constructor(connectionString: string) { + this.instance = new PgBoss(connectionString); + } + + async start(): Promise { + return this.instance.start(); + } + + async stop(): Promise { + return this.instance.stop(); + } + + async send( + name: T, + payload: PayloadFor, + options?: PgBoss.SendOptions + ): Promise { + return options ? this.instance.send(name, payload, options) : this.instance.send(name, payload); + } + + async schedule( + name: T, + payload: PayloadFor, + cron: string, + options?: PgBoss.ScheduleOptions + ): Promise { + return this.instance.schedule(name, cron, payload, options); + } + + async work( + name: T, + handler: (job: PgBoss.Job>) => Promise> | void, + options?: PgBoss.WorkOptions + ): Promise { + const wrappedHandler: PgBoss.WorkHandler = async (job) => { + return await handler(job as unknown as PgBoss.Job>); + }; + + return this.instance.work(name, options || {}, wrappedHandler); + } + + getInstance(): PgBoss { + return this.instance; + } +} + +let pgBossInstance: TypedPgBoss | null = null; + +export async function getPgBoss(): Promise { + if (!pgBossInstance) { + if (!process.env.DATABASE_URL) { + throw new Error('DATABASE_URL environment variable is not set'); + } + pgBossInstance = new TypedPgBoss(process.env.DATABASE_URL); + await pgBossInstance.start(); + console.log('PgBoss started successfully'); + } + return pgBossInstance; +} + +export async function closePgBoss(): Promise { + if (pgBossInstance) { + await pgBossInstance.getInstance().stop(); + pgBossInstance = null; + console.log('PgBoss stopped successfully'); + } +} \ No newline at end of file diff --git a/apps/web/src/lib/workers/register.ts b/apps/web/src/lib/workers/register.ts new file mode 100644 index 0000000..c6260b8 --- /dev/null +++ b/apps/web/src/lib/workers/register.ts @@ -0,0 +1,21 @@ +import { getPgBoss } from '@/lib/workers'; +import snClient from '../services/slackNotifier'; + +export async function registerWorkers() { + const boss = await getPgBoss(); + + await boss.work('notifier:sendMsg', async (job) => { + console.log('Processing job:', job.id); + const { data } = job; + + await snClient.chat.postMessage({ + channel: data.channelId, + text: data.msg, + }).catch(e => { + return { success: false, error: e.message }; + }); + return { success: true }; + }); + + console.log('All workers registered successfully'); +} \ No newline at end of file diff --git a/apps/web/src/lib/workers/types.ts b/apps/web/src/lib/workers/types.ts new file mode 100644 index 0000000..76b69ab --- /dev/null +++ b/apps/web/src/lib/workers/types.ts @@ -0,0 +1,64 @@ +// Define a union type of all possible job names +export type JobName = + | 'email:send' + | 'video:process' + | 'notification:push' + | 'user:sync' + // Add more job names as needed + +// Define payload and result types for each job +export interface JobDefinitions { + 'email:send': { + payload: { + to: string; + subject: string; + body: string; + attachments?: Array<{name: string, content: string}>; + }; + result: { + sent: boolean; + messageId?: string; + error?: string; + }; + }; + + 'video:process': { + payload: { + videoId: string; + formats: string[]; + resolution?: string; + }; + result: { + success: boolean; + processedFormats: string[]; + duration: number; + }; + }; + + 'notification:push': { + payload: { + userId: string; + message: string; + data?: Record; + }; + result: { + delivered: boolean; + deviceCount: number; + }; + }; + + 'user:sync': { + payload: { + userId: string; + externalSystems: string[]; + }; + result: { + syncedSystems: string[]; + failedSystems: string[]; + }; + }; +} + +export type PayloadFor = JobDefinitions[T]['payload']; + +export type ResultFor = JobDefinitions[T]['result']; \ No newline at end of file diff --git a/dev/docker-compose.yml b/dev/docker-compose.yml index 6905307..0ae1791 100644 --- a/dev/docker-compose.yml +++ b/dev/docker-compose.yml @@ -25,23 +25,23 @@ services: - /dev/shm/hls:/dev/shm/hls image: flv-module entrypoint: - - /bin/sh - - -c - - | - # Process the template file - mkdir -p /usr/local/nginx/conf - envsubst '$${API_AUTH}' < /etc/nginx/templates/nginx.conf.template > /usr/local/nginx/conf/nginx.conf - - echo "Setting UID to $${UID} and GID to $${GID}" - usermod -u $${UID} nginx || echo "failed to change uid" - groupmod -g $${GID} nginx || echo "failed to change gid" + - /bin/sh + - -c + - | + # Process the template file + mkdir -p /usr/local/nginx/conf + envsubst '$${API_AUTH}' < /etc/nginx/templates/nginx.conf.template > /usr/local/nginx/conf/nginx.conf - mkdir -p /usr/local/nginx/proxy_temp /usr/local/nginx/client_body_temp - chown -R nginx:nginx /usr/local/nginx - mkdir -p /var/www/html - chown -R nginx:nginx /var/www/html + echo "Setting UID to $${UID} and GID to $${GID}" + usermod -u $${UID} nginx || echo "failed to change uid" + groupmod -g $${GID} nginx || echo "failed to change gid" - echo "testing nginx config..." - /usr/local/nginx/sbin/nginx -t + mkdir -p /usr/local/nginx/proxy_temp /usr/local/nginx/client_body_temp + chown -R nginx:nginx /usr/local/nginx + mkdir -p /var/www/html + chown -R nginx:nginx /var/www/html - /usr/local/nginx/sbin/nginx -g 'daemon off;' \ No newline at end of file + echo "testing nginx config..." + /usr/local/nginx/sbin/nginx -t + + /usr/local/nginx/sbin/nginx -g 'daemon off;' diff --git a/yarn.lock b/yarn.lock index 3ef389a..17772aa 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2370,6 +2370,13 @@ cosmiconfig@^8.1.3: parse-json "^5.2.0" path-type "^4.0.0" +cron-parser@^4.9.0: + version "4.9.0" + resolved "https://registry.yarnpkg.com/cron-parser/-/cron-parser-4.9.0.tgz#0340694af3e46a0894978c6f52a6dbb5c0f11ad5" + integrity sha512-p0SaNjrHOnQeR8/VnfGbmg9te2kfyYSQ7Sc/j/6DtPL3JQvKxmjO9TSjNFpujqV3vEYYBvNNvXSxzyksBWAx1Q== + dependencies: + luxon "^3.2.1" + cross-spawn@^7.0.2, cross-spawn@^7.0.3, cross-spawn@^7.0.6: version "7.0.6" resolved "https://registry.yarnpkg.com/cross-spawn/-/cross-spawn-7.0.6.tgz#8a58fe78f00dcd70c370451759dfbfaf03e8ee9f" @@ -3994,6 +4001,11 @@ lucide-react@^0.473.0: resolved "https://registry.yarnpkg.com/lucide-react/-/lucide-react-0.473.0.tgz#f01dcde458e55bce766a282f4f5894970a9f24ac" integrity sha512-KW6u5AKeIjkvrxXZ6WuCu9zHE/gEYSXCay+Gre2ZoInD0Je/e3RBtP4OHpJVJ40nDklSvjVKjgH7VU8/e2dzRw== +luxon@^3.2.1: + version "3.6.0" + resolved "https://registry.yarnpkg.com/luxon/-/luxon-3.6.0.tgz#e84453dbdbd716b5eac95bee702b379863059f83" + integrity sha512-WE7p0p7W1xji9qxkLYsvcIxZyfP48GuFrWIBQZIsbjCyf65dG1rv4n83HcOyEyhvzxJCrUoObCRNFgRNIQ5KNA== + map-obj@5.0.0: version "5.0.0" resolved "https://registry.yarnpkg.com/map-obj/-/map-obj-5.0.0.tgz#126c98596b63927d7360f287cccc67177aa1938b" @@ -4439,6 +4451,71 @@ path-type@^4.0.0: resolved "https://registry.yarnpkg.com/path-type/-/path-type-4.0.0.tgz#84ed01c0a7ba380afe09d90a8c180dcd9d03043b" integrity sha512-gDKb8aZMDeD/tZWs9P6+q0J9Mwkdl6xMV8TjnGP3qJVJ06bdMgkbBlLU8IdfOsIsFz2BW1rNVT3XuNEl8zPAvw== +pg-boss@^10.1.6: + version "10.1.6" + resolved "https://registry.yarnpkg.com/pg-boss/-/pg-boss-10.1.6.tgz#6dea4742fb953b7985ae4b09a6c4d79047801715" + integrity sha512-O3ujMF4aL/EDucyTlCpJ2bqNJendWnMKwuLaWDqVVt3CDSJ6/iBRvj3PMiO5exwlqHYLQkZnHCutzdc32nPpOA== + dependencies: + cron-parser "^4.9.0" + pg "^8.13.0" + serialize-error "^8.1.0" + +pg-cloudflare@^1.1.1: + version "1.1.1" + resolved "https://registry.yarnpkg.com/pg-cloudflare/-/pg-cloudflare-1.1.1.tgz#e6d5833015b170e23ae819e8c5d7eaedb472ca98" + integrity sha512-xWPagP/4B6BgFO+EKz3JONXv3YDgvkbVrGw2mTo3D6tVDQRh1e7cqVGvyR3BE+eQgAvx1XhW/iEASj4/jCWl3Q== + +pg-connection-string@^2.7.0: + version "2.7.0" + resolved "https://registry.yarnpkg.com/pg-connection-string/-/pg-connection-string-2.7.0.tgz#f1d3489e427c62ece022dba98d5262efcb168b37" + integrity sha512-PI2W9mv53rXJQEOb8xNR8lH7Hr+EKa6oJa38zsK0S/ky2er16ios1wLKhZyxzD7jUReiWokc9WK5nxSnC7W1TA== + +pg-int8@1.0.1: + version "1.0.1" + resolved "https://registry.yarnpkg.com/pg-int8/-/pg-int8-1.0.1.tgz#943bd463bf5b71b4170115f80f8efc9a0c0eb78c" + integrity sha512-WCtabS6t3c8SkpDBUlb1kjOs7l66xsGdKpIPZsg4wR+B3+u9UAum2odSsF9tnvxg80h4ZxLWMy4pRjOsFIqQpw== + +pg-pool@^3.8.0: + version "3.8.0" + resolved "https://registry.yarnpkg.com/pg-pool/-/pg-pool-3.8.0.tgz#e6bce7fc4506a8d6106551363fc5283e5445b776" + integrity sha512-VBw3jiVm6ZOdLBTIcXLNdSotb6Iy3uOCwDGFAksZCXmi10nyRvnP2v3jl4d+IsLYRyXf6o9hIm/ZtUzlByNUdw== + +pg-protocol@^1.8.0: + version "1.8.0" + resolved "https://registry.yarnpkg.com/pg-protocol/-/pg-protocol-1.8.0.tgz#c707101dd07813868035a44571488e4b98639d48" + integrity sha512-jvuYlEkL03NRvOoyoRktBK7+qU5kOvlAwvmrH8sr3wbLrOdVWsRxQfz8mMy9sZFsqJ1hEWNfdWKI4SAmoL+j7g== + +pg-types@^2.1.0: + version "2.2.0" + resolved "https://registry.yarnpkg.com/pg-types/-/pg-types-2.2.0.tgz#2d0250d636454f7cfa3b6ae0382fdfa8063254a3" + integrity sha512-qTAAlrEsl8s4OiEQY69wDvcMIdQN6wdz5ojQiOy6YRMuynxenON0O5oCpJI6lshc6scgAY8qvJ2On/p+CXY0GA== + dependencies: + pg-int8 "1.0.1" + postgres-array "~2.0.0" + postgres-bytea "~1.0.0" + postgres-date "~1.0.4" + postgres-interval "^1.1.0" + +pg@^8.13.0, pg@^8.14.1: + version "8.14.1" + resolved "https://registry.yarnpkg.com/pg/-/pg-8.14.1.tgz#2e3d1f287b64797cdfc8d1ba000f61a7ff8d66ed" + integrity sha512-0TdbqfjwIun9Fm/r89oB7RFQ0bLgduAhiIqIXOsyKoiC/L54DbuAAzIEN/9Op0f1Po9X7iCPXGoa/Ah+2aI8Xw== + dependencies: + pg-connection-string "^2.7.0" + pg-pool "^3.8.0" + pg-protocol "^1.8.0" + pg-types "^2.1.0" + pgpass "1.x" + optionalDependencies: + pg-cloudflare "^1.1.1" + +pgpass@1.x: + version "1.0.5" + resolved "https://registry.yarnpkg.com/pgpass/-/pgpass-1.0.5.tgz#9b873e4a564bb10fa7a7dbd55312728d422a223d" + integrity sha512-FdW9r/jQZhSeohs1Z3sI1yxFQNFvMcnmfuj4WBMUTxOrAyLMaTcE1aAMBiTlbMNaXvBCQuVi0R7hd8udDSP7ug== + dependencies: + split2 "^4.1.0" + picocolors@^1.0.0, picocolors@^1.1.1: version "1.1.1" resolved "https://registry.yarnpkg.com/picocolors/-/picocolors-1.1.1.tgz#3d321af3eab939b083c8f929a1d12cda81c26b6b" @@ -4538,6 +4615,28 @@ postcss@^8, postcss@^8.4.24, postcss@^8.4.47: picocolors "^1.1.1" source-map-js "^1.2.1" +postgres-array@~2.0.0: + version "2.0.0" + resolved "https://registry.yarnpkg.com/postgres-array/-/postgres-array-2.0.0.tgz#48f8fce054fbc69671999329b8834b772652d82e" + integrity sha512-VpZrUqU5A69eQyW2c5CA1jtLecCsN2U/bD6VilrFDWq5+5UIEVO7nazS3TEcHf1zuPYO/sqGvUvW62g86RXZuA== + +postgres-bytea@~1.0.0: + version "1.0.0" + resolved "https://registry.yarnpkg.com/postgres-bytea/-/postgres-bytea-1.0.0.tgz#027b533c0aa890e26d172d47cf9ccecc521acd35" + integrity sha512-xy3pmLuQqRBZBXDULy7KbaitYqLcmxigw14Q5sj8QBVLqEwXfeybIKVWiqAXTlcvdvb0+xkOtDbfQMOf4lST1w== + +postgres-date@~1.0.4: + version "1.0.7" + resolved "https://registry.yarnpkg.com/postgres-date/-/postgres-date-1.0.7.tgz#51bc086006005e5061c591cee727f2531bf641a8" + integrity sha512-suDmjLVQg78nMK2UZ454hAG+OAW+HQPZ6n++TNDUX+L0+uUlLywnoxJKDou51Zm+zTCjrCl0Nq6J9C5hP9vK/Q== + +postgres-interval@^1.1.0: + version "1.2.0" + resolved "https://registry.yarnpkg.com/postgres-interval/-/postgres-interval-1.2.0.tgz#b460c82cb1587507788819a06aa0fffdb3544695" + integrity sha512-9ZhXKM/rw350N1ovuWHbGxnGh/SNJ4cnxHiM0rxE4VN41wsg8P8zWn9hv/buK00RP4WvlOyr/RBDiptyxVbkZQ== + dependencies: + xtend "^4.0.0" + prelude-ls@^1.2.1: version "1.2.1" resolved "https://registry.yarnpkg.com/prelude-ls/-/prelude-ls-1.2.1.tgz#debc6489d7a6e6b0e7611888cec880337d316396" @@ -4912,6 +5011,13 @@ semver@^7.6.0, semver@^7.6.3: resolved "https://registry.yarnpkg.com/semver/-/semver-7.7.1.tgz#abd5098d82b18c6c81f6074ff2647fd3e7220c9f" integrity sha512-hlq8tAfn0m/61p4BVRcPzIGr6LKiMwo4VM6dGi6pt4qcRkmNzTcWq6eCEjEh+qXjkMDvPlOFFSGwQjoEa6gyMA== +serialize-error@^8.1.0: + version "8.1.0" + resolved "https://registry.yarnpkg.com/serialize-error/-/serialize-error-8.1.0.tgz#3a069970c712f78634942ddd50fbbc0eaebe2f67" + integrity sha512-3NnuWfM6vBYoy5gZFvHiYsVbafvI9vZv/+jlIigFn4oP4zjNPK3LhcY0xSCgeb1a5L8jO71Mit9LlNoi2UfDDQ== + dependencies: + type-fest "^0.20.2" + set-function-length@^1.2.2: version "1.2.2" resolved "https://registry.yarnpkg.com/set-function-length/-/set-function-length-1.2.2.tgz#aac72314198eaed975cf77b2c3b6b880695e5449" @@ -5096,6 +5202,11 @@ source-map@~0.6.1: resolved "https://registry.yarnpkg.com/source-map/-/source-map-0.6.1.tgz#74722af32e9614e9c287a8d0bbde48b5e2f1a263" integrity sha512-UjgapumWlbMhkBgzT7Ykc5YXUT46F0iKu8SGXq0bcwP5dz/h0Plj6enJqjz1Zbq2l5WaqYnrVbwWOWMyF3F47g== +split2@^4.1.0: + version "4.2.0" + resolved "https://registry.yarnpkg.com/split2/-/split2-4.2.0.tgz#c9c5920904d148bab0b9f67145f245a86aadbfa4" + integrity sha512-UcjcJOWknrNkF6PLX83qcHM6KHgVKNkV62Y8a5uYDVv9ydGQVwAHMKqHdJje1VTWpljG0WYpCDhrCdAOYH4TWg== + stable-hash@^0.0.5: version "0.0.5" resolved "https://registry.yarnpkg.com/stable-hash/-/stable-hash-0.0.5.tgz#94e8837aaeac5b4d0f631d2972adef2924b40269" @@ -5816,6 +5927,11 @@ ws@^8.17.0, ws@^8.18.1: resolved "https://registry.yarnpkg.com/ws/-/ws-8.18.1.tgz#ea131d3784e1dfdff91adb0a4a116b127515e3cb" integrity sha512-RKW2aJZMXeMxVpnZ6bck+RswznaxmzdULiBr6KY7XkTnW8uvt0iT9H5DkHUChXrc+uurzwa0rVI16n/Xzjdz1w== +xtend@^4.0.0: + version "4.0.2" + resolved "https://registry.yarnpkg.com/xtend/-/xtend-4.0.2.tgz#bb72779f5fa465186b1f438f674fa347fdb5db54" + integrity sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ== + yallist@^3.0.2: version "3.1.1" resolved "https://registry.yarnpkg.com/yallist/-/yallist-3.1.1.tgz#dbb7daf9bfd8bac9ab45ebf602b8cbad0d5d08fd"