diff --git a/.gitignore b/.gitignore index ccf2abe..dd4c9a5 100644 --- a/.gitignore +++ b/.gitignore @@ -37,5 +37,4 @@ yarn-error.log* next-env.d.ts certificates -dev/ -!dev/docker-compose.yml \ No newline at end of file +dev/psql \ No newline at end of file diff --git a/benchmark.py b/benchmark.py new file mode 100644 index 0000000..30c86fa --- /dev/null +++ b/benchmark.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python3 +import asyncio +import aiohttp +import argparse +import time +import random +from tqdm import tqdm + +async def simulate_viewer(session, base_url, stream_name, viewer_id, duration): + """Simulate a viewer watching an HLS stream""" + hls_url = f"{base_url}/hls/{stream_name}.m3u8" + + # First request the playlist + try: + start_time = time.time() + end_time = start_time + duration + + while time.time() < end_time: + # Get the master playlist + async with session.get(hls_url) as response: + if response.status != 200: + print(f"Viewer {viewer_id}: Failed to get playlist: {response.status}") + return + + playlist = await response.text() + + # Parse the playlist to find segments + segments = [line for line in playlist.splitlines() if line.endswith('.ts')] + + if segments: + # Request a random segment to simulate viewing + segment = random.choice(segments) + segment_url = f"{base_url}/hls/{segment}" + + async with session.get(segment_url) as seg_response: + if seg_response.status != 200: + print(f"Viewer {viewer_id}: Failed to get segment: {seg_response.status}") + + # Wait a bit before requesting again (simulating segment download) + await asyncio.sleep(2) + + except Exception as e: + print(f"Viewer {viewer_id} error: {str(e)}") + +async def run_benchmark(base_url, stream_name, num_viewers, duration): + """Run the benchmark with the specified number of viewers""" + print(f"Starting benchmark with {num_viewers} viewers for {duration} seconds") + + all_tasks = [] # Keep track of all tasks + + async with aiohttp.ClientSession() as session: + with tqdm(total=num_viewers, desc="Connecting viewers") as pbar: + # Start viewers gradually to avoid overwhelming the server + for i in range(0, num_viewers, 10): + batch = [] + for j in range(i, min(i+10, num_viewers)): + task = asyncio.create_task(simulate_viewer(session, base_url, stream_name, j, duration)) + batch.append(task) + all_tasks.append(task) + + pbar.update(len(batch)) + await asyncio.sleep(0.5) # Small delay between batches + + print(f"All {num_viewers} viewers connected. Running for {duration} seconds...") + + # Wait for all tasks to complete + await asyncio.gather(*all_tasks) + + print(f"Benchmark completed after {duration} seconds.") + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='Benchmark NGINX-RTMP HLS streaming with simulated viewers') + parser.add_argument('--url', default='http://localhost:8888', help='Base URL of the NGINX server') + parser.add_argument('--stream', required=True, help='Stream name to connect to') + parser.add_argument('--viewers', type=int, default=100, help='Number of simulated viewers') + parser.add_argument('--duration', type=int, default=60, help='Duration in seconds to run the test') + + args = parser.parse_args() + + print(f"Benchmarking stream: {args.stream}") + print(f"Server: {args.url}") + print(f"Viewers: {args.viewers}") + print(f"Duration: {args.duration} seconds") + + asyncio.run(run_benchmark(args.url, args.stream, args.viewers, args.duration)) \ No newline at end of file diff --git a/dev/docker-compose.yml b/dev/docker-compose.yml index 5adb6af..8e33b1c 100644 --- a/dev/docker-compose.yml +++ b/dev/docker-compose.yml @@ -8,4 +8,33 @@ services: volumes: - ./psql:/var/lib/postgresql/data ports: - - 5555:5432 \ No newline at end of file + - 5555:5432 + nginx-rtmp: + # ports: + # - 1935:1935 + # - 8888:8888 + network_mode: host + environment: + UID: 1000 + GID: 1000 + volumes: + - ./nginx.conf:/etc/nginx/nginx.conf + - ./html:/var/www/html + - /dev/shm:/dev/shm + image: tiangolo/nginx-rtmp + entrypoint: + - /bin/sh + - -c + - | + usermod -u $${UID} www-data + groupmod -g $${GID} www-data + + mkdir -p /usr/local/nginx/proxy_temp /usr/local/nginx/client_body_temp + chown -R www-data:www-data /usr/local/nginx + + chown -R www-data:www-data /var/www/html + + mkdir -p /dev/shm/hls + chown -R www-data:www-data /dev/shm/hls + + nginx -g 'daemon off;' \ No newline at end of file diff --git a/dev/html/stat.xsl b/dev/html/stat.xsl new file mode 100644 index 0000000..207a6dd --- /dev/null +++ b/dev/html/stat.xsl @@ -0,0 +1,62 @@ + + + + + { + "server": { + "version": "", + "uptime": "", + "applications": [ + + , + { + "name": "", + "streams": [ + + , + { + "name": "", + "time": "", + "bw_in": "", + "bw_out": "", + "bytes_in": "", + "bytes_out": "", + "nclients": "", + "publishing": + + true + false + + , + "active": + + true + false + + , + "clients": [ + + , + { + "id": "", + "address": "", + "time": "", + "flashver": "", + "publishing": + + true + false + + } + + ] + } + + ] + } + + ] + } + } + + \ No newline at end of file diff --git a/dev/nginx.conf b/dev/nginx.conf new file mode 100644 index 0000000..8df17ab --- /dev/null +++ b/dev/nginx.conf @@ -0,0 +1,81 @@ +events { + worker_connections 1024; # Define the maximum number of simultaneous connections +} + +rtmp { + server { + listen 1935; + + application live { + live on; + record off; + + on_publish http://localhost:3000/api/rtmp/publish; + } + + application channel-live { + live on; + record off; + + allow publish 127.0.0.1; + deny publish all; + + hls on; + hls_path /dev/shm/hls; + hls_fragment 3; + hls_playlist_length 5; + hls_cleanup on; + + hls_variant _low BANDWIDTH=500000; + hls_variant _mid BANDWIDTH=1000000; + hls_variant _hi BANDWIDTH=1500000; + } + } +} + +http { + include mime.types; + default_type application/octet-stream; + + # performance optimizations + sendfile on; + tcp_nopush on; + tcp_nodelay on; + keepalive_timeout 65; + + server { + listen 8888; + + location /stat { + if ($request_method = "GET") { + add_header "Access-Control-Allow-Origin" *; + } + + rtmp_stat all; + rtmp_stat_stylesheet stat.xsl; + } + location /json { + if ($request_method = "GET") { + add_header "Access-Control-Allow-Origin" *; + } + + add_header Content-Type application/json; + rtmp_stat all; + rtmp_stat_stylesheet stat.xsl; + } + location /stat.xsl { + alias /var/www/html/stat.xsl; + } + + location /hls { + alias /dev/shm/hls; + add_header Cache-Control no-cache; + add_header Access-Control-Allow-Origin *; + + types { + application/vnd.apple.mpegurl m3u8; + video/mp2t ts; + } + } + } +} \ No newline at end of file diff --git a/package.json b/package.json index 7c5972d..ffb21ce 100644 --- a/package.json +++ b/package.json @@ -3,7 +3,8 @@ "version": "0.1.0", "private": true, "scripts": { - "dev": "docker compose --file dev/docker-compose.yml up -d && next dev --experimental-https --turbo", + "dev": "docker compose --file dev/docker-compose.yml up -d && next dev --turbo", + "donly": "docker compose --file dev/docker-compose.yml up", "setup": "docker compose --file dev/docker-compose.yml up -d && prisma migrate deploy", "build": "prisma generate && next build", "start": "prisma migrate deploy && next start", diff --git a/prisma/migrations/20250312160445_stream_key_proto/migration.sql b/prisma/migrations/20250312160445_stream_key_proto/migration.sql new file mode 100644 index 0000000..2d26c32 --- /dev/null +++ b/prisma/migrations/20250312160445_stream_key_proto/migration.sql @@ -0,0 +1,17 @@ +-- CreateTable +CREATE TABLE "StreamKey" ( + "id" TEXT NOT NULL, + "key" TEXT NOT NULL, + "channelId" TEXT NOT NULL, + + CONSTRAINT "StreamKey_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE UNIQUE INDEX "StreamKey_key_key" ON "StreamKey"("key"); + +-- CreateIndex +CREATE UNIQUE INDEX "StreamKey_channelId_key" ON "StreamKey"("channelId"); + +-- AddForeignKey +ALTER TABLE "StreamKey" ADD CONSTRAINT "StreamKey_channelId_fkey" FOREIGN KEY ("channelId") REFERENCES "Channel"("id") ON DELETE RESTRICT ON UPDATE CASCADE; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index d029db7..1cfc4d5 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -46,6 +46,7 @@ model Channel { managers User[] @relation("ChannelManagers") streamInfo StreamInfo[] followers Follow[] @relation("ChannelFollowers") + streamKey StreamKey? @@index([ownerId]) } @@ -89,4 +90,12 @@ model Follow { @@unique([userId, channelId]) @@index([userId]) @@index([channelId]) +} + +model StreamKey { + id String @id @default(cuid()) + key String @unique + + channelId String @unique + channel Channel @relation(fields: [channelId], references: [id]) } \ No newline at end of file diff --git a/src/app/(protected)/api/rtmp/publish/route.ts b/src/app/(protected)/api/rtmp/publish/route.ts new file mode 100644 index 0000000..7dc31d7 --- /dev/null +++ b/src/app/(protected)/api/rtmp/publish/route.ts @@ -0,0 +1,32 @@ +import prisma from '@/lib/db'; +import { NextRequest } from 'next/server'; +import { redirect } from 'next/navigation'; + +export async function POST(request: NextRequest) { + const formData = await request.formData(); + const streamKey = formData.get('name')?.toString() || ''; + console.log('streamKey:', streamKey); + + const key = await prisma.streamKey.findFirst({ + where: { + key: streamKey, + }, + include: { + channel: true, + }, + }); + + if (!key) { + return new Response('nay', { + status: 403, + }); + } + + const headers = new Headers(); + headers.append('Location', `rtmp://127.0.0.1/channel-live/${key.channel.name}`); + + return new Response(null, { + status: 302, + headers: headers, + }); +} \ No newline at end of file diff --git a/src/app/(protected)/api/rtmp/streamKey/route.ts b/src/app/(protected)/api/rtmp/streamKey/route.ts new file mode 100644 index 0000000..62de6dc --- /dev/null +++ b/src/app/(protected)/api/rtmp/streamKey/route.ts @@ -0,0 +1,52 @@ +import { validateRequest } from "@/lib/auth"; +import prisma from "@/lib/db"; +import { NextRequest } from "next/server"; + +export async function POST(request: NextRequest) { + const { user } = await validateRequest(); + const body = await request.json(); + const { channel } = body; + + if (!user) { + return new Response('Unauthorized', { status: 401 }); + } + + const channelInfo = await prisma.channel.findUnique({ + where: { name: channel }, + include: { + owner: true, + managers: true + } + }); + + if (!channelInfo) { + return new Response('Channel not found', { status: 404 }); + } + + const isBroadcaster = + channelInfo.ownerId === user.id || + channelInfo.managers.some(m => m.id === user.id); + if (!isBroadcaster) { + return new Response('Unauthorized', { status: 401 }); + } + + const dbUpdate = await prisma.streamKey.upsert({ + create: { + key: crypto.randomUUID(), + channelId: channelInfo.id + }, + update: { + key: crypto.randomUUID() + }, + where: { + channelId: channelInfo.id + } + }) + + return new Response(JSON.stringify({ key: dbUpdate.key }), { + status: 200, + headers: { + 'Content-Type': 'application/json' + } + }); +} \ No newline at end of file diff --git a/src/components/app/RegenerateKey/RegenerateKey.tsx b/src/components/app/RegenerateKey/RegenerateKey.tsx index e3e3b4b..a2cae3d 100644 --- a/src/components/app/RegenerateKey/RegenerateKey.tsx +++ b/src/components/app/RegenerateKey/RegenerateKey.tsx @@ -7,7 +7,7 @@ import { toast } from 'sonner'; import useSWR from 'swr/mutation'; export default function RegenerateKey(props: Props) { - const { error, isMutating, trigger } = useSWR('/api/livekit/broadcasterToken', async (url) => + const { error, isMutating, trigger } = useSWR('/api/rtmp/streamKey', async (url) => defaultFetcher(url, { body: JSON.stringify({ channel: props.channel }), method: 'POST' }) ); diff --git a/src/instrumentation.ts b/src/instrumentation.ts index 8290c76..19baeff 100644 --- a/src/instrumentation.ts +++ b/src/instrumentation.ts @@ -1,5 +1,5 @@ export async function register() { if (process.env.NEXT_RUNTIME === 'nodejs') { - await (await import('@/lib/instrumentation/streamInfo')).default(); + // await (await import('@/lib/instrumentation/streamInfo')).default(); } } \ No newline at end of file