From 5add3b0e5d7cb3e9110cf7b02eb8cb8d8a34e4e7 Mon Sep 17 00:00:00 2001 From: Izan Gil <66965250+SrIzan10@users.noreply.github.com> Date: Tue, 27 Jan 2026 16:56:43 +0100 Subject: [PATCH] feat: multiple streaming servers --- apps/web/package.json | 2 +- .../(protected)/api/mediamtx/publish/route.ts | 9 +- .../(ui)/(protected)/api/stream/info/route.ts | 5 + .../channel/[channelName]/page.client.tsx | 27 +++- .../app/StreamPlayer/StreamPlayer.tsx | 6 +- apps/web/src/lib/hooks/useUserList.tsx | 69 +++++++++- .../src/lib/instrumentation/getLiveThumb.ts | 2 + .../web/src/lib/instrumentation/streamInfo.ts | 129 ++++++++++-------- apps/web/src/lib/utils/mediamtx/client.ts | 35 +++++ apps/web/src/lib/utils/mediamtx/regions.ts | 1 + apps/web/src/lib/utils/mediamtx/server.ts | 24 ++++ apps/web/src/lib/workers/worker/thumbnails.ts | 10 +- dev/docker-compose.yml | 23 +++- dev/mediamtx.yml | 3 +- .../migration.sql | 2 + packages/db/prisma/schema.prisma | 1 + 16 files changed, 269 insertions(+), 79 deletions(-) create mode 100644 apps/web/src/lib/utils/mediamtx/client.ts create mode 100644 apps/web/src/lib/utils/mediamtx/regions.ts create mode 100644 apps/web/src/lib/utils/mediamtx/server.ts create mode 100644 packages/db/prisma/migrations/20260126160053_ingest_region_ground_work/migration.sql diff --git a/apps/web/package.json b/apps/web/package.json index 769fa52..6b826d1 100644 --- a/apps/web/package.json +++ b/apps/web/package.json @@ -5,7 +5,7 @@ "type": "module", "scripts": { "dd": "docker compose --file ../../dev/docker-compose.yml up -d", - "dev": "next dev --turbo", + "dev": "next dev --turbo -H 0.0.0.0", "donly": "docker compose --file ../../dev/docker-compose.yml up", "build": "next build", "start": "next start", diff --git a/apps/web/src/app/(ui)/(protected)/api/mediamtx/publish/route.ts b/apps/web/src/app/(ui)/(protected)/api/mediamtx/publish/route.ts index 9c28684..334cbd1 100644 --- a/apps/web/src/app/(ui)/(protected)/api/mediamtx/publish/route.ts +++ b/apps/web/src/app/(ui)/(protected)/api/mediamtx/publish/route.ts @@ -1,12 +1,17 @@ import { prisma, getRedisConnection } from '@hctv/db'; import { NextRequest } from 'next/server'; import { z } from 'zod'; -import { lucia } from '@hctv/auth'; export async function POST(request: NextRequest) { const redis = getRedisConnection(); const body = await request.json(); + if (process.env.NODE_ENV !== 'production') { + console.log( + 'Mediamtx publish auth request:', + JSON.stringify(body, null, 2) + ) + } const parsed = schema.safeParse(body); if (!parsed.success) { @@ -56,7 +61,7 @@ export async function POST(request: NextRequest) { } } else if (action === 'read' && protocol === 'hls') { if (password === process.env.MEDIAMTX_PUBLISH_KEY) { - return new Response('authorized', { status: 200 }); + return new Response('authorized (hls read key for thumbs)', { status: 200 }); } const sessionExists = await redis.exists(`sessions:${password}`); if (!sessionExists) { diff --git a/apps/web/src/app/(ui)/(protected)/api/stream/info/route.ts b/apps/web/src/app/(ui)/(protected)/api/stream/info/route.ts index 9c42602..b44eaaa 100644 --- a/apps/web/src/app/(ui)/(protected)/api/stream/info/route.ts +++ b/apps/web/src/app/(ui)/(protected)/api/stream/info/route.ts @@ -9,6 +9,7 @@ export async function GET(request: NextRequest) { const shouldGetOwned = searchParams.get('owned') === 'true'; const allPersonalChannels = searchParams.get('personal') === 'true'; const isLive = searchParams.get('live') === 'true'; + const username = searchParams.get('username'); const { user } = await validateRequest(); if ((shouldGetOwned || allPersonalChannels) && !user) { @@ -18,6 +19,10 @@ export async function GET(request: NextRequest) { const where: Prisma.StreamInfoWhereInput = {}; const channelConditions: Prisma.ChannelWhereInput[] = []; + if (username) { + where.username = username; + } + if (shouldGetOwned && user) { channelConditions.push({ ownerId: user.id }); } diff --git a/apps/web/src/app/(ui)/(protected)/settings/channel/[channelName]/page.client.tsx b/apps/web/src/app/(ui)/(protected)/settings/channel/[channelName]/page.client.tsx index 91dce29..7483b7a 100644 --- a/apps/web/src/app/(ui)/(protected)/settings/channel/[channelName]/page.client.tsx +++ b/apps/web/src/app/(ui)/(protected)/settings/channel/[channelName]/page.client.tsx @@ -53,7 +53,15 @@ import { ChannelSelect } from '@/components/app/ChannelSelect/ChannelSelect'; import { useRouter } from 'next/navigation'; import Link from 'next/link'; import { useConfirm } from '@omit/react-confirm-dialog'; -import { MEDIAMTX_INGEST_ROUTE } from '@/lib/env'; +import { + Select, + SelectContent, + SelectItem, + SelectTrigger, + SelectValue, +} from '@/components/ui/select'; +import { getMediamtxClientEnvs } from '@/lib/utils/mediamtx/client'; +import type { MediaMTXRegion } from '@/lib/utils/mediamtx/regions'; interface ChannelSettingsClientProps { channel: Channel & { @@ -88,6 +96,7 @@ export default function ChannelSettingsClient({ const [selTab, setSelTab] = useQueryState('tab', parseAsString.withDefault('general')); const [isUploading, setIsUploading] = useState(false); const [uploadError, setUploadError] = useState(null); + const [region, setRegion] = useState('eu'); const channelList = useOwnedChannels(); const router = useRouter(); @@ -137,7 +146,8 @@ export default function ChannelSettingsClient({ toast.error('Stream key not available'); return ''; } - return `srt://${MEDIAMTX_INGEST_ROUTE}?streamid=publish:${channel.name}:thisusernameislongonpurposesoyoudontaccidentallyleakyourstreamkey:${streamKey}&pkt_size=1316`; + const { ingestRoute } = getMediamtxClientEnvs(region); + return `srt://${ingestRoute}?streamid=publish:${channel.name}:thisusernameislongonpurposesoyoudontaccidentallyleakyourstreamkey:${streamKey}&pkt_size=1316`; }; const copyStreamUrl = async () => { @@ -449,7 +459,18 @@ export default function ChannelSettingsClient({
- +
+ + +
{ @@ -41,7 +45,7 @@ export default function StreamPlayer() { }; // @ts-ignore - video.src = `${MEDIAMTX_URL}/${username}/index.m3u8`; + video.src = `${getMediamtxClientEnvs(userInfo?.streamRegion!).publicUrl}/${username}/index.m3u8`; } return () => { diff --git a/apps/web/src/lib/hooks/useUserList.tsx b/apps/web/src/lib/hooks/useUserList.tsx index d72dbe9..2093a9d 100644 --- a/apps/web/src/lib/hooks/useUserList.tsx +++ b/apps/web/src/lib/hooks/useUserList.tsx @@ -27,6 +27,7 @@ function createCacheKey(options: UseUserListOptions): string { if (options.owned) params.push('owned') if (options.personal) params.push('personal') if (options.live) params.push('live') + if (options.username) params.push(`user-${options.username}`) return params.length > 0 ? `stream-info:${params.join('-')}` @@ -76,6 +77,8 @@ export interface UseUserListOptions { personal?: boolean /** Only fetch live channels */ live?: boolean + /** Search for a specific user's streaminfo */ + username?: string /** Refresh interval in milliseconds */ refreshInterval?: number /** Cache time to live in milliseconds (default: 5 minutes) */ @@ -132,6 +135,7 @@ export function useUserList(options: UseUserListOptions = {}): UseUserListReturn owned = false, personal = false, live = false, + username, refreshInterval = 30000, cacheTTL = 5 * 60 * 1000, // 5 minutes revalidateOnFocus = false, @@ -151,8 +155,9 @@ export function useUserList(options: UseUserListOptions = {}): UseUserListReturn if (owned) searchParams.set('owned', 'true') if (personal) searchParams.set('personal', 'true') if (live) searchParams.set('live', 'true') + if (username) searchParams.set('username', username) return searchParams - }, [owned, personal, live]) + }, [owned, personal, live, username]) const queryString = params.toString() const url = `/api/stream/info${queryString ? `?${queryString}` : ''}` @@ -325,6 +330,67 @@ export function usePersonalChannels(refreshInterval?: number): UseUserListReturn }) } +export interface UseUserStreamInfoReturn extends Omit { + /** The found stream info for the specific user */ + streamInfo: StreamInfoResponse[0] | null + /** All matching channels (usually just one) */ + channels: StreamInfoResponse +} + +/** + * Hook to fetch stream info for a specific user + * Returns the first match if multiple channels exist for that user + */ +export function useUserStreamInfo( + username: string | undefined, + refresh = true, + refreshInterval?: number, +): UseUserStreamInfoReturn { + const result = useUserList({ + username, + refreshInterval: refresh ? (refreshInterval ?? 15000) : undefined, + cacheTTL: 2 * 60 * 1000, // 2 minutes cache + revalidateOnFocus: true, + isPaused: !username, // Don't fetch if no username provided + errorRetryCount: 3, + }) + + return { + ...result, + streamInfo: result.channels[0] || null, + } +} + +/** + * Lazy version that doesn't automatically fetch - useful for on-demand lookups + */ +export function useUserStreamInfoLazy(refreshInterval?: number) { + const result = useUserList({ + refreshInterval: refreshInterval ?? 15000, + cacheTTL: 2 * 60 * 1000, + revalidateOnFocus: true, + isPaused: true, // Start paused + errorRetryCount: 3, + }) + + const lookupUser = useCallback(async (username: string) => { + if (!username) return null + + try { + const response = await enhancedFetcher(`/api/stream/info?username=${encodeURIComponent(username)}`) + return response[0] || null + } catch (error) { + console.error('[useUserStreamInfoLazy] Error looking up user:', error) + throw error + } + }, []) + + return { + ...result, + lookupUser, + } +} + // Cache management utilities with proper error handling export const channelCacheUtils = { /** Clear all channel caches */ @@ -379,6 +445,7 @@ export const channelCacheUtils = { if (options.owned) params.set('owned', 'true') if (options.personal) params.set('personal', 'true') if (options.live) params.set('live', 'true') + if (options.username) params.set('username', options.username) const queryString = params.toString() const url = `/api/stream/info${queryString ? `?${queryString}` : ''}` diff --git a/apps/web/src/lib/instrumentation/getLiveThumb.ts b/apps/web/src/lib/instrumentation/getLiveThumb.ts index 8677f02..47d9a8e 100644 --- a/apps/web/src/lib/instrumentation/getLiveThumb.ts +++ b/apps/web/src/lib/instrumentation/getLiveThumb.ts @@ -14,8 +14,10 @@ export default async function getLiveThumb() { const thumbQueue = getThumbnailQueue(); for (const channel of liveChannelNames) { + const lc = liveChannels.find(c => c.channel.name === channel)!; await thumbQueue.add("getLiveThumb", { name: channel, + server: lc.streamRegion, }); } } \ No newline at end of file diff --git a/apps/web/src/lib/instrumentation/streamInfo.ts b/apps/web/src/lib/instrumentation/streamInfo.ts index f1a6227..b5d76a0 100644 --- a/apps/web/src/lib/instrumentation/streamInfo.ts +++ b/apps/web/src/lib/instrumentation/streamInfo.ts @@ -3,6 +3,7 @@ import { HttpFlv } from '../types/liveBackendJson'; import { getNotificationQueue } from '../workers'; import client from '../services/slackNotifier'; import type { paths } from '../types/mediamtx.d.ts'; +import { MEDIAMTX_SERVER_REGIONS } from '../utils/mediamtx/server'; export default async function runner() { // if there are no users it explodes so yeah @@ -49,37 +50,43 @@ export async function initializeStreamInfo(channelId?: string) { export async function syncStream() { try { - const response = await fetch(`${process.env.MEDIAMTX_API}/v3/paths/list?itemsPerPage=1000`); + const regions = Object.keys(MEDIAMTX_SERVER_REGIONS) as Array< + keyof typeof MEDIAMTX_SERVER_REGIONS + >; - if (!response.ok) { - console.error(`Failed to fetch stream stats: ${response.status} ${response.statusText}`); - return; + const allActiveStreams = new Map(); + + for (const r of regions) { + const region = MEDIAMTX_SERVER_REGIONS[r]; + const response = await fetch(`${region.apiUrl}/v3/paths/list?itemsPerPage=1000`); + + if (!response.ok) { + console.error( + `Failed to fetch ${r} stream stats: ${response.status} ${response.statusText}` + ); + continue; + } + + type ResponseType = + paths['/v3/paths/list']['get']['responses']['200']['content']['application/json']; + const data = (await response.json()) as ResponseType; + + if (data?.items) { + for (const stream of data.items) { + if (stream.ready && stream.name) { + allActiveStreams.set(stream.name, r); + } + } + } } - type ResponseType = paths['/v3/paths/list']['get']['responses']['200']['content']['application/json']; - const data = await response.json() as ResponseType; - - if (!data) { - return; - } - - const activeStreams = data.items!; - + // handle streams going offline const currentLiveStreams = await prisma.streamInfo.findMany({ where: { isLive: true }, }); - const activeStreamMap = new Map(); - for (const stream of activeStreams) { - activeStreamMap.set(stream.name, { - isLive: stream.ready, - }); - } - for (const dbStream of currentLiveStreams) { - const streamStats = activeStreamMap.get(dbStream.username); - - if (!streamStats || !streamStats.isLive) { + if (!allActiveStreams.has(dbStream.username)) { await prisma.streamInfo.update({ where: { username: dbStream.username }, data: { @@ -91,50 +98,52 @@ export async function syncStream() { } } - for (const stream of activeStreams) { - if (stream.ready) { - const existingStream = await prisma.streamInfo.findUnique({ - where: { username: stream.name }, - include: { channel: true }, + // handle streams going online + for (const [username, regionKey] of allActiveStreams) { + const existingStream = await prisma.streamInfo.findUnique({ + where: { username }, + include: { channel: true }, + }); + + if (existingStream && !existingStream.isLive) { + console.log(`Stream ${username} is now live in region ${regionKey}`); + await prisma.streamInfo.update({ + where: { username }, + data: { + isLive: true, + startedAt: new Date(), + streamRegion: regionKey, + }, }); - if (existingStream && !existingStream.isLive) { - await prisma.streamInfo.update({ - where: { username: stream.name }, - data: { - isLive: true, - startedAt: new Date(), - }, - }); + const subscribedFollowers = await prisma.follow.findMany({ + where: { + channelId: existingStream.channelId, + notifyStream: true, + }, + include: { + user: true, + }, + }); - const subscribedFollowers = await prisma.follow.findMany({ - where: { - channelId: existingStream.channelId, - notifyStream: true, - }, - include: { - user: true, - }, - }); - - const queue = getNotificationQueue(); + const queue = getNotificationQueue(); - if (!existingStream.channel.is247) { - queue.add(`streamStartChannel:${existingStream.username}`, { - text: `${existingStream.username} is now *live*, streaming *${existingStream.title}* (${existingStream.category})!\n`, - channel: process.env.NOTIFICATION_CHANNEL_ID!, + if (!existingStream.channel.is247) { + queue.add(`streamStartChannel:${existingStream.username}`, { + text: `${existingStream.username} is now *live*, streaming *${existingStream.title}* (${existingStream.category})!\n`, + channel: process.env.NOTIFICATION_CHANNEL_ID!, + unfurl_links: true, + }); + } + + if (existingStream.enableNotifications && !existingStream.channel.is247) { + for (const follower of subscribedFollowers) { + queue.add(`streamStartDm:${follower.user.id}`, { + text: `${existingStream.username} is now *live*, streaming *${existingStream.title}* (${existingStream.category})!\n\n_Stream notifications are enabled for this user. If you want to disable them, you can do so in \`Profile > Follows\`._`, + channel: follower.user.slack_id, unfurl_links: true, }); } - if (existingStream.enableNotifications && !existingStream.channel.is247) { - for (const follower of subscribedFollowers) { - queue.add(`streamStartDm:${follower.user.id}`, { - text: `${existingStream.username} is now *live*, streaming *${existingStream.title}* (${existingStream.category})!\n\n_Stream notifications are enabled for this user. If you want to disable them, you can do so in \`Profile > Follows\`._`, - channel: follower.user.slack_id, - unfurl_links: true, - }); - } - } } } } diff --git a/apps/web/src/lib/utils/mediamtx/client.ts b/apps/web/src/lib/utils/mediamtx/client.ts new file mode 100644 index 0000000..49f77f0 --- /dev/null +++ b/apps/web/src/lib/utils/mediamtx/client.ts @@ -0,0 +1,35 @@ +import { MediaMTXRegion } from './regions'; +import { getEnv } from '@/lib/env'; + +export interface MediaMTXClientEnvs { + publicUrl: string; + ingestRoute: string; + emoji: string; + string: string; +} + +export function getMediamtxClientEnvs(region: MediaMTXRegion = 'eu'): MediaMTXClientEnvs { + const envs: Record = { + eu: { + publicUrl: getEnv('NEXT_PUBLIC_MEDIAMTX_URL_EU')!, + ingestRoute: getEnv('NEXT_PUBLIC_MEDIAMTX_INGEST_ROUTE_EU')!, + emoji: 'πŸ‡ͺπŸ‡Ί', + string: 'EU', + }, + asia: { + publicUrl: getEnv('NEXT_PUBLIC_MEDIAMTX_URL_ASIA')!, + ingestRoute: getEnv('NEXT_PUBLIC_MEDIAMTX_INGEST_ROUTE_ASIA')!, + emoji: 'πŸ‡ΈπŸ‡¬', + string: 'Singapore' + }, + }; + + const regionEnvs = envs[region]; + + if (!regionEnvs) { + throw new Error(`Invalid MediaMTX region: ${region}`); + } + + return regionEnvs; +} + diff --git a/apps/web/src/lib/utils/mediamtx/regions.ts b/apps/web/src/lib/utils/mediamtx/regions.ts new file mode 100644 index 0000000..84892be --- /dev/null +++ b/apps/web/src/lib/utils/mediamtx/regions.ts @@ -0,0 +1 @@ +export type MediaMTXRegion = 'eu' | 'asia'; diff --git a/apps/web/src/lib/utils/mediamtx/server.ts b/apps/web/src/lib/utils/mediamtx/server.ts new file mode 100644 index 0000000..ee1f9d2 --- /dev/null +++ b/apps/web/src/lib/utils/mediamtx/server.ts @@ -0,0 +1,24 @@ +import { MediaMTXRegion } from './regions'; + +export interface MediaMTXEnvs { + apiUrl: string; +} + +export const MEDIAMTX_SERVER_REGIONS: Record = { + eu: { + apiUrl: process.env.MEDIAMTX_API_EU!, + }, + asia: { + apiUrl: process.env.MEDIAMTX_API_ASIA!, + }, +}; + +export function getMediamtxEnvs(region: MediaMTXRegion = 'eu'): MediaMTXEnvs { + const envs = MEDIAMTX_SERVER_REGIONS[region]; + + if (!envs) { + throw new Error(`Invalid MediaMTX region: ${region}`); + } + + return envs; +} diff --git a/apps/web/src/lib/workers/worker/thumbnails.ts b/apps/web/src/lib/workers/worker/thumbnails.ts index cdea891..1eec026 100644 --- a/apps/web/src/lib/workers/worker/thumbnails.ts +++ b/apps/web/src/lib/workers/worker/thumbnails.ts @@ -4,6 +4,7 @@ import { promisify } from 'node:util'; import { existsSync } from 'node:fs'; import { exec as execCallback } from 'node:child_process'; import { MEDIAMTX_URL } from '@/lib/env'; +import { getMediamtxClientEnvs } from '@/lib/utils/mediamtx/client'; const pExec = promisify(execCallback); const globalForWorker = global as unknown as { @@ -27,7 +28,10 @@ export async function registerThumbnailWorker(): Promise { try { // this is totally unnecessary, but i'll keep it for security purposes. const name = job.data.name.replace(/[^a-zA-Z0-9]/g, '_'); - const m3u8location = `${MEDIAMTX_URL}/${name}/index.m3u8`; + const server = job.data.server || 'default'; + const srvValues = getMediamtxClientEnvs(server); + + const m3u8location = `${srvValues.publicUrl}/${name}/index.m3u8`; const thumbDir = '/dev/shm/hctv-thumb'; if (!existsSync(thumbDir)) { @@ -42,11 +46,11 @@ export async function registerThumbnailWorker(): Promise { ); return { success: true }; } catch (ffmpegError) { - console.error(`FFmpeg error for ${name}:`, ffmpegError); + console.error(`FFmpeg error for ${name} on server ${server}:`, ffmpegError); return { success: false, error: ffmpegError instanceof Error ? ffmpegError.message : String(ffmpegError) }; } } catch (e) { - console.error('Slack notification failed:', e); + console.error('Thumbnail generation failed:', e); // @ts-ignore e is unknown return { success: false, error: e.message }; } diff --git a/dev/docker-compose.yml b/dev/docker-compose.yml index 01afc34..c7722ec 100644 --- a/dev/docker-compose.yml +++ b/dev/docker-compose.yml @@ -17,10 +17,21 @@ services: - 6379:6379 mediamtx: image: bluenviron/mediamtx:latest - network_mode: "host" - # ports: - # - 8890:8890/udp - # - 8891:8888 - # - 9997:9997 + ports: + - 8890:8890/udp + - 8891:8888 + - 9997:9997 volumes: - - ./mediamtx.yml:/mediamtx.yml \ No newline at end of file + - ./mediamtx.yml:/mediamtx.yml + extra_hosts: + - "host.docker.internal:host-gateway" + mediamtx2: + image: bluenviron/mediamtx:latest + ports: + - 8990:8890/udp + - 8991:8891 + - 9999:9997 + volumes: + - ./mediamtx.yml:/mediamtx.yml + extra_hosts: + - "host.docker.internal:host-gateway" \ No newline at end of file diff --git a/dev/mediamtx.yml b/dev/mediamtx.yml index 4d0948e..52b2e70 100644 --- a/dev/mediamtx.yml +++ b/dev/mediamtx.yml @@ -5,7 +5,6 @@ paths: srt: yes srtAddress: :8890 -# HLS Configuration - Low Latency HLS (LL-HLS) hls: yes hlsAddress: :8891 hlsSegmentCount: 7 @@ -14,6 +13,6 @@ hlsPartDuration: 200ms hlsMuxerCloseAfter: 5s authMethod: http -authHTTPAddress: http://localhost:3000/api/mediamtx/publish +authHTTPAddress: http://host.docker.internal:3000/api/mediamtx/publish api: yes diff --git a/packages/db/prisma/migrations/20260126160053_ingest_region_ground_work/migration.sql b/packages/db/prisma/migrations/20260126160053_ingest_region_ground_work/migration.sql new file mode 100644 index 0000000..98872de --- /dev/null +++ b/packages/db/prisma/migrations/20260126160053_ingest_region_ground_work/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "StreamInfo" ADD COLUMN "streamRegion" TEXT NOT NULL DEFAULT 'eu'; diff --git a/packages/db/prisma/schema.prisma b/packages/db/prisma/schema.prisma index c30000d..5bec0d6 100644 --- a/packages/db/prisma/schema.prisma +++ b/packages/db/prisma/schema.prisma @@ -79,6 +79,7 @@ model StreamInfo { category String startedAt DateTime isLive Boolean + streamRegion String @default("eu") channelId String channel Channel @relation(fields: [channelId], references: [id], onDelete: Cascade)