feat: multiple streaming servers

This commit is contained in:
2026-01-27 16:56:43 +01:00
parent b623de5bdd
commit 5add3b0e5d
16 changed files with 269 additions and 79 deletions

View File

@@ -5,7 +5,7 @@
"type": "module",
"scripts": {
"dd": "docker compose --file ../../dev/docker-compose.yml up -d",
"dev": "next dev --turbo",
"dev": "next dev --turbo -H 0.0.0.0",
"donly": "docker compose --file ../../dev/docker-compose.yml up",
"build": "next build",
"start": "next start",

View File

@@ -1,12 +1,17 @@
import { prisma, getRedisConnection } from '@hctv/db';
import { NextRequest } from 'next/server';
import { z } from 'zod';
import { lucia } from '@hctv/auth';
export async function POST(request: NextRequest) {
const redis = getRedisConnection();
const body = await request.json();
if (process.env.NODE_ENV !== 'production') {
console.log(
'Mediamtx publish auth request:',
JSON.stringify(body, null, 2)
)
}
const parsed = schema.safeParse(body);
if (!parsed.success) {
@@ -56,7 +61,7 @@ export async function POST(request: NextRequest) {
}
} else if (action === 'read' && protocol === 'hls') {
if (password === process.env.MEDIAMTX_PUBLISH_KEY) {
return new Response('authorized', { status: 200 });
return new Response('authorized (hls read key for thumbs)', { status: 200 });
}
const sessionExists = await redis.exists(`sessions:${password}`);
if (!sessionExists) {

View File

@@ -9,6 +9,7 @@ export async function GET(request: NextRequest) {
const shouldGetOwned = searchParams.get('owned') === 'true';
const allPersonalChannels = searchParams.get('personal') === 'true';
const isLive = searchParams.get('live') === 'true';
const username = searchParams.get('username');
const { user } = await validateRequest();
if ((shouldGetOwned || allPersonalChannels) && !user) {
@@ -18,6 +19,10 @@ export async function GET(request: NextRequest) {
const where: Prisma.StreamInfoWhereInput = {};
const channelConditions: Prisma.ChannelWhereInput[] = [];
if (username) {
where.username = username;
}
if (shouldGetOwned && user) {
channelConditions.push({ ownerId: user.id });
}

View File

@@ -53,7 +53,15 @@ import { ChannelSelect } from '@/components/app/ChannelSelect/ChannelSelect';
import { useRouter } from 'next/navigation';
import Link from 'next/link';
import { useConfirm } from '@omit/react-confirm-dialog';
import { MEDIAMTX_INGEST_ROUTE } from '@/lib/env';
import {
Select,
SelectContent,
SelectItem,
SelectTrigger,
SelectValue,
} from '@/components/ui/select';
import { getMediamtxClientEnvs } from '@/lib/utils/mediamtx/client';
import type { MediaMTXRegion } from '@/lib/utils/mediamtx/regions';
interface ChannelSettingsClientProps {
channel: Channel & {
@@ -88,6 +96,7 @@ export default function ChannelSettingsClient({
const [selTab, setSelTab] = useQueryState('tab', parseAsString.withDefault('general'));
const [isUploading, setIsUploading] = useState(false);
const [uploadError, setUploadError] = useState<string | null>(null);
const [region, setRegion] = useState<MediaMTXRegion>('eu');
const channelList = useOwnedChannels();
const router = useRouter();
@@ -137,7 +146,8 @@ export default function ChannelSettingsClient({
toast.error('Stream key not available');
return '';
}
return `srt://${MEDIAMTX_INGEST_ROUTE}?streamid=publish:${channel.name}:thisusernameislongonpurposesoyoudontaccidentallyleakyourstreamkey:${streamKey}&pkt_size=1316`;
const { ingestRoute } = getMediamtxClientEnvs(region);
return `srt://${ingestRoute}?streamid=publish:${channel.name}:thisusernameislongonpurposesoyoudontaccidentallyleakyourstreamkey:${streamKey}&pkt_size=1316`;
};
const copyStreamUrl = async () => {
@@ -449,7 +459,18 @@ export default function ChannelSettingsClient({
</div>
<div className="space-y-2">
<label className="text-sm font-medium">Stream URL (for OBS)</label>
<div className="flex items-center justify-between">
<label className="text-sm font-medium">Stream URL (for OBS)</label>
<Select value={region} onValueChange={(v) => setRegion(v as MediaMTXRegion)}>
<SelectTrigger className="w-[180px] h-8">
<SelectValue placeholder="Select region" />
</SelectTrigger>
<SelectContent>
<SelectItem value="eu">Europe 🇪🇺</SelectItem>
<SelectItem value="asia">Singapore 🇸🇬</SelectItem>
</SelectContent>
</Select>
</div>
<div className="flex items-center gap-2">
<div className="relative flex-1">
<input

View File

@@ -14,10 +14,14 @@ import {
import HlsVideo from 'hls-video-element/react';
import { useSession } from '@/lib/providers/SessionProvider';
import { MEDIAMTX_URL } from '@/lib/env';
import { useUserStreamInfo } from '@/lib/hooks/useUserList';
import { getMediamtxClientEnvs } from '@/lib/utils/mediamtx/client';
export default function StreamPlayer() {
const { username } = useParams();
const { session } = useSession();
const { streamInfo: userInfo } = useUserStreamInfo(username!.toString());
const videoRef = useRef(null);
useEffect(() => {
@@ -41,7 +45,7 @@ export default function StreamPlayer() {
};
// @ts-ignore
video.src = `${MEDIAMTX_URL}/${username}/index.m3u8`;
video.src = `${getMediamtxClientEnvs(userInfo?.streamRegion!).publicUrl}/${username}/index.m3u8`;
}
return () => {

View File

@@ -27,6 +27,7 @@ function createCacheKey(options: UseUserListOptions): string {
if (options.owned) params.push('owned')
if (options.personal) params.push('personal')
if (options.live) params.push('live')
if (options.username) params.push(`user-${options.username}`)
return params.length > 0
? `stream-info:${params.join('-')}`
@@ -76,6 +77,8 @@ export interface UseUserListOptions {
personal?: boolean
/** Only fetch live channels */
live?: boolean
/** Search for a specific user's streaminfo */
username?: string
/** Refresh interval in milliseconds */
refreshInterval?: number
/** Cache time to live in milliseconds (default: 5 minutes) */
@@ -132,6 +135,7 @@ export function useUserList(options: UseUserListOptions = {}): UseUserListReturn
owned = false,
personal = false,
live = false,
username,
refreshInterval = 30000,
cacheTTL = 5 * 60 * 1000, // 5 minutes
revalidateOnFocus = false,
@@ -151,8 +155,9 @@ export function useUserList(options: UseUserListOptions = {}): UseUserListReturn
if (owned) searchParams.set('owned', 'true')
if (personal) searchParams.set('personal', 'true')
if (live) searchParams.set('live', 'true')
if (username) searchParams.set('username', username)
return searchParams
}, [owned, personal, live])
}, [owned, personal, live, username])
const queryString = params.toString()
const url = `/api/stream/info${queryString ? `?${queryString}` : ''}`
@@ -325,6 +330,67 @@ export function usePersonalChannels(refreshInterval?: number): UseUserListReturn
})
}
export interface UseUserStreamInfoReturn extends Omit<UseUserListReturn, 'channels'> {
/** The found stream info for the specific user */
streamInfo: StreamInfoResponse[0] | null
/** All matching channels (usually just one) */
channels: StreamInfoResponse
}
/**
* Hook to fetch stream info for a specific user
* Returns the first match if multiple channels exist for that user
*/
export function useUserStreamInfo(
username: string | undefined,
refresh = true,
refreshInterval?: number,
): UseUserStreamInfoReturn {
const result = useUserList({
username,
refreshInterval: refresh ? (refreshInterval ?? 15000) : undefined,
cacheTTL: 2 * 60 * 1000, // 2 minutes cache
revalidateOnFocus: true,
isPaused: !username, // Don't fetch if no username provided
errorRetryCount: 3,
})
return {
...result,
streamInfo: result.channels[0] || null,
}
}
/**
* Lazy version that doesn't automatically fetch - useful for on-demand lookups
*/
export function useUserStreamInfoLazy(refreshInterval?: number) {
const result = useUserList({
refreshInterval: refreshInterval ?? 15000,
cacheTTL: 2 * 60 * 1000,
revalidateOnFocus: true,
isPaused: true, // Start paused
errorRetryCount: 3,
})
const lookupUser = useCallback(async (username: string) => {
if (!username) return null
try {
const response = await enhancedFetcher(`/api/stream/info?username=${encodeURIComponent(username)}`)
return response[0] || null
} catch (error) {
console.error('[useUserStreamInfoLazy] Error looking up user:', error)
throw error
}
}, [])
return {
...result,
lookupUser,
}
}
// Cache management utilities with proper error handling
export const channelCacheUtils = {
/** Clear all channel caches */
@@ -379,6 +445,7 @@ export const channelCacheUtils = {
if (options.owned) params.set('owned', 'true')
if (options.personal) params.set('personal', 'true')
if (options.live) params.set('live', 'true')
if (options.username) params.set('username', options.username)
const queryString = params.toString()
const url = `/api/stream/info${queryString ? `?${queryString}` : ''}`

View File

@@ -14,8 +14,10 @@ export default async function getLiveThumb() {
const thumbQueue = getThumbnailQueue();
for (const channel of liveChannelNames) {
const lc = liveChannels.find(c => c.channel.name === channel)!;
await thumbQueue.add("getLiveThumb", {
name: channel,
server: lc.streamRegion,
});
}
}

View File

@@ -3,6 +3,7 @@ import { HttpFlv } from '../types/liveBackendJson';
import { getNotificationQueue } from '../workers';
import client from '../services/slackNotifier';
import type { paths } from '../types/mediamtx.d.ts';
import { MEDIAMTX_SERVER_REGIONS } from '../utils/mediamtx/server';
export default async function runner() {
// if there are no users it explodes so yeah
@@ -49,37 +50,43 @@ export async function initializeStreamInfo(channelId?: string) {
export async function syncStream() {
try {
const response = await fetch(`${process.env.MEDIAMTX_API}/v3/paths/list?itemsPerPage=1000`);
const regions = Object.keys(MEDIAMTX_SERVER_REGIONS) as Array<
keyof typeof MEDIAMTX_SERVER_REGIONS
>;
if (!response.ok) {
console.error(`Failed to fetch stream stats: ${response.status} ${response.statusText}`);
return;
const allActiveStreams = new Map<string, keyof typeof MEDIAMTX_SERVER_REGIONS>();
for (const r of regions) {
const region = MEDIAMTX_SERVER_REGIONS[r];
const response = await fetch(`${region.apiUrl}/v3/paths/list?itemsPerPage=1000`);
if (!response.ok) {
console.error(
`Failed to fetch ${r} stream stats: ${response.status} ${response.statusText}`
);
continue;
}
type ResponseType =
paths['/v3/paths/list']['get']['responses']['200']['content']['application/json'];
const data = (await response.json()) as ResponseType;
if (data?.items) {
for (const stream of data.items) {
if (stream.ready && stream.name) {
allActiveStreams.set(stream.name, r);
}
}
}
}
type ResponseType = paths['/v3/paths/list']['get']['responses']['200']['content']['application/json'];
const data = await response.json() as ResponseType;
if (!data) {
return;
}
const activeStreams = data.items!;
// handle streams going offline
const currentLiveStreams = await prisma.streamInfo.findMany({
where: { isLive: true },
});
const activeStreamMap = new Map();
for (const stream of activeStreams) {
activeStreamMap.set(stream.name, {
isLive: stream.ready,
});
}
for (const dbStream of currentLiveStreams) {
const streamStats = activeStreamMap.get(dbStream.username);
if (!streamStats || !streamStats.isLive) {
if (!allActiveStreams.has(dbStream.username)) {
await prisma.streamInfo.update({
where: { username: dbStream.username },
data: {
@@ -91,50 +98,52 @@ export async function syncStream() {
}
}
for (const stream of activeStreams) {
if (stream.ready) {
const existingStream = await prisma.streamInfo.findUnique({
where: { username: stream.name },
include: { channel: true },
// handle streams going online
for (const [username, regionKey] of allActiveStreams) {
const existingStream = await prisma.streamInfo.findUnique({
where: { username },
include: { channel: true },
});
if (existingStream && !existingStream.isLive) {
console.log(`Stream ${username} is now live in region ${regionKey}`);
await prisma.streamInfo.update({
where: { username },
data: {
isLive: true,
startedAt: new Date(),
streamRegion: regionKey,
},
});
if (existingStream && !existingStream.isLive) {
await prisma.streamInfo.update({
where: { username: stream.name },
data: {
isLive: true,
startedAt: new Date(),
},
});
const subscribedFollowers = await prisma.follow.findMany({
where: {
channelId: existingStream.channelId,
notifyStream: true,
},
include: {
user: true,
},
});
const subscribedFollowers = await prisma.follow.findMany({
where: {
channelId: existingStream.channelId,
notifyStream: true,
},
include: {
user: true,
},
});
const queue = getNotificationQueue();
const queue = getNotificationQueue();
if (!existingStream.channel.is247) {
queue.add(`streamStartChannel:${existingStream.username}`, {
text: `${existingStream.username} is now *live*, streaming *${existingStream.title}* (${existingStream.category})!\n<https://hackclub.tv/${existingStream.username}|Go check them out>`,
channel: process.env.NOTIFICATION_CHANNEL_ID!,
if (!existingStream.channel.is247) {
queue.add(`streamStartChannel:${existingStream.username}`, {
text: `${existingStream.username} is now *live*, streaming *${existingStream.title}* (${existingStream.category})!\n<https://hackclub.tv/${existingStream.username}|Go check them out>`,
channel: process.env.NOTIFICATION_CHANNEL_ID!,
unfurl_links: true,
});
}
if (existingStream.enableNotifications && !existingStream.channel.is247) {
for (const follower of subscribedFollowers) {
queue.add(`streamStartDm:${follower.user.id}`, {
text: `${existingStream.username} is now *live*, streaming *${existingStream.title}* (${existingStream.category})!\n<https://hackclub.tv/${existingStream.username}|Go check them out>\n_Stream notifications are enabled for this user. If you want to disable them, you can do so in \`Profile > Follows\`._`,
channel: follower.user.slack_id,
unfurl_links: true,
});
}
if (existingStream.enableNotifications && !existingStream.channel.is247) {
for (const follower of subscribedFollowers) {
queue.add(`streamStartDm:${follower.user.id}`, {
text: `${existingStream.username} is now *live*, streaming *${existingStream.title}* (${existingStream.category})!\n<https://hackclub.tv/${existingStream.username}|Go check them out>\n_Stream notifications are enabled for this user. If you want to disable them, you can do so in \`Profile > Follows\`._`,
channel: follower.user.slack_id,
unfurl_links: true,
});
}
}
}
}
}

View File

@@ -0,0 +1,35 @@
import { MediaMTXRegion } from './regions';
import { getEnv } from '@/lib/env';
export interface MediaMTXClientEnvs {
publicUrl: string;
ingestRoute: string;
emoji: string;
string: string;
}
export function getMediamtxClientEnvs(region: MediaMTXRegion = 'eu'): MediaMTXClientEnvs {
const envs: Record<MediaMTXRegion, MediaMTXClientEnvs> = {
eu: {
publicUrl: getEnv('NEXT_PUBLIC_MEDIAMTX_URL_EU')!,
ingestRoute: getEnv('NEXT_PUBLIC_MEDIAMTX_INGEST_ROUTE_EU')!,
emoji: '🇪🇺',
string: 'EU',
},
asia: {
publicUrl: getEnv('NEXT_PUBLIC_MEDIAMTX_URL_ASIA')!,
ingestRoute: getEnv('NEXT_PUBLIC_MEDIAMTX_INGEST_ROUTE_ASIA')!,
emoji: '🇸🇬',
string: 'Singapore'
},
};
const regionEnvs = envs[region];
if (!regionEnvs) {
throw new Error(`Invalid MediaMTX region: ${region}`);
}
return regionEnvs;
}

View File

@@ -0,0 +1 @@
export type MediaMTXRegion = 'eu' | 'asia';

View File

@@ -0,0 +1,24 @@
import { MediaMTXRegion } from './regions';
export interface MediaMTXEnvs {
apiUrl: string;
}
export const MEDIAMTX_SERVER_REGIONS: Record<MediaMTXRegion, MediaMTXEnvs> = {
eu: {
apiUrl: process.env.MEDIAMTX_API_EU!,
},
asia: {
apiUrl: process.env.MEDIAMTX_API_ASIA!,
},
};
export function getMediamtxEnvs(region: MediaMTXRegion = 'eu'): MediaMTXEnvs {
const envs = MEDIAMTX_SERVER_REGIONS[region];
if (!envs) {
throw new Error(`Invalid MediaMTX region: ${region}`);
}
return envs;
}

View File

@@ -4,6 +4,7 @@ import { promisify } from 'node:util';
import { existsSync } from 'node:fs';
import { exec as execCallback } from 'node:child_process';
import { MEDIAMTX_URL } from '@/lib/env';
import { getMediamtxClientEnvs } from '@/lib/utils/mediamtx/client';
const pExec = promisify(execCallback);
const globalForWorker = global as unknown as {
@@ -27,7 +28,10 @@ export async function registerThumbnailWorker(): Promise<void> {
try {
// this is totally unnecessary, but i'll keep it for security purposes.
const name = job.data.name.replace(/[^a-zA-Z0-9]/g, '_');
const m3u8location = `${MEDIAMTX_URL}/${name}/index.m3u8`;
const server = job.data.server || 'default';
const srvValues = getMediamtxClientEnvs(server);
const m3u8location = `${srvValues.publicUrl}/${name}/index.m3u8`;
const thumbDir = '/dev/shm/hctv-thumb';
if (!existsSync(thumbDir)) {
@@ -42,11 +46,11 @@ export async function registerThumbnailWorker(): Promise<void> {
);
return { success: true };
} catch (ffmpegError) {
console.error(`FFmpeg error for ${name}:`, ffmpegError);
console.error(`FFmpeg error for ${name} on server ${server}:`, ffmpegError);
return { success: false, error: ffmpegError instanceof Error ? ffmpegError.message : String(ffmpegError) };
}
} catch (e) {
console.error('Slack notification failed:', e);
console.error('Thumbnail generation failed:', e);
// @ts-ignore e is unknown
return { success: false, error: e.message };
}

View File

@@ -17,10 +17,21 @@ services:
- 6379:6379
mediamtx:
image: bluenviron/mediamtx:latest
network_mode: "host"
# ports:
# - 8890:8890/udp
# - 8891:8888
# - 9997:9997
ports:
- 8890:8890/udp
- 8891:8888
- 9997:9997
volumes:
- ./mediamtx.yml:/mediamtx.yml
- ./mediamtx.yml:/mediamtx.yml
extra_hosts:
- "host.docker.internal:host-gateway"
mediamtx2:
image: bluenviron/mediamtx:latest
ports:
- 8990:8890/udp
- 8991:8891
- 9999:9997
volumes:
- ./mediamtx.yml:/mediamtx.yml
extra_hosts:
- "host.docker.internal:host-gateway"

View File

@@ -5,7 +5,6 @@ paths:
srt: yes
srtAddress: :8890
# HLS Configuration - Low Latency HLS (LL-HLS)
hls: yes
hlsAddress: :8891
hlsSegmentCount: 7
@@ -14,6 +13,6 @@ hlsPartDuration: 200ms
hlsMuxerCloseAfter: 5s
authMethod: http
authHTTPAddress: http://localhost:3000/api/mediamtx/publish
authHTTPAddress: http://host.docker.internal:3000/api/mediamtx/publish
api: yes

View File

@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "StreamInfo" ADD COLUMN "streamRegion" TEXT NOT NULL DEFAULT 'eu';

View File

@@ -79,6 +79,7 @@ model StreamInfo {
category String
startedAt DateTime
isLive Boolean
streamRegion String @default("eu")
channelId String
channel Channel @relation(fields: [channelId], references: [id], onDelete: Cascade)