feat: initial and probably final rtmp server backend

This commit is contained in:
2025-03-13 17:47:36 +01:00
parent 8d33c7eb62
commit 0c6c7a910d
12 changed files with 373 additions and 6 deletions

3
.gitignore vendored
View File

@@ -37,5 +37,4 @@ yarn-error.log*
next-env.d.ts
certificates
dev/
!dev/docker-compose.yml
dev/psql

85
benchmark.py Normal file
View File

@@ -0,0 +1,85 @@
#!/usr/bin/env python3
import asyncio
import aiohttp
import argparse
import time
import random
from tqdm import tqdm
async def simulate_viewer(session, base_url, stream_name, viewer_id, duration):
"""Simulate a viewer watching an HLS stream"""
hls_url = f"{base_url}/hls/{stream_name}.m3u8"
# First request the playlist
try:
start_time = time.time()
end_time = start_time + duration
while time.time() < end_time:
# Get the master playlist
async with session.get(hls_url) as response:
if response.status != 200:
print(f"Viewer {viewer_id}: Failed to get playlist: {response.status}")
return
playlist = await response.text()
# Parse the playlist to find segments
segments = [line for line in playlist.splitlines() if line.endswith('.ts')]
if segments:
# Request a random segment to simulate viewing
segment = random.choice(segments)
segment_url = f"{base_url}/hls/{segment}"
async with session.get(segment_url) as seg_response:
if seg_response.status != 200:
print(f"Viewer {viewer_id}: Failed to get segment: {seg_response.status}")
# Wait a bit before requesting again (simulating segment download)
await asyncio.sleep(2)
except Exception as e:
print(f"Viewer {viewer_id} error: {str(e)}")
async def run_benchmark(base_url, stream_name, num_viewers, duration):
"""Run the benchmark with the specified number of viewers"""
print(f"Starting benchmark with {num_viewers} viewers for {duration} seconds")
all_tasks = [] # Keep track of all tasks
async with aiohttp.ClientSession() as session:
with tqdm(total=num_viewers, desc="Connecting viewers") as pbar:
# Start viewers gradually to avoid overwhelming the server
for i in range(0, num_viewers, 10):
batch = []
for j in range(i, min(i+10, num_viewers)):
task = asyncio.create_task(simulate_viewer(session, base_url, stream_name, j, duration))
batch.append(task)
all_tasks.append(task)
pbar.update(len(batch))
await asyncio.sleep(0.5) # Small delay between batches
print(f"All {num_viewers} viewers connected. Running for {duration} seconds...")
# Wait for all tasks to complete
await asyncio.gather(*all_tasks)
print(f"Benchmark completed after {duration} seconds.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Benchmark NGINX-RTMP HLS streaming with simulated viewers')
parser.add_argument('--url', default='http://localhost:8888', help='Base URL of the NGINX server')
parser.add_argument('--stream', required=True, help='Stream name to connect to')
parser.add_argument('--viewers', type=int, default=100, help='Number of simulated viewers')
parser.add_argument('--duration', type=int, default=60, help='Duration in seconds to run the test')
args = parser.parse_args()
print(f"Benchmarking stream: {args.stream}")
print(f"Server: {args.url}")
print(f"Viewers: {args.viewers}")
print(f"Duration: {args.duration} seconds")
asyncio.run(run_benchmark(args.url, args.stream, args.viewers, args.duration))

View File

@@ -8,4 +8,33 @@ services:
volumes:
- ./psql:/var/lib/postgresql/data
ports:
- 5555:5432
- 5555:5432
nginx-rtmp:
# ports:
# - 1935:1935
# - 8888:8888
network_mode: host
environment:
UID: 1000
GID: 1000
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./html:/var/www/html
- /dev/shm:/dev/shm
image: tiangolo/nginx-rtmp
entrypoint:
- /bin/sh
- -c
- |
usermod -u $${UID} www-data
groupmod -g $${GID} www-data
mkdir -p /usr/local/nginx/proxy_temp /usr/local/nginx/client_body_temp
chown -R www-data:www-data /usr/local/nginx
chown -R www-data:www-data /var/www/html
mkdir -p /dev/shm/hls
chown -R www-data:www-data /dev/shm/hls
nginx -g 'daemon off;'

62
dev/html/stat.xsl Normal file
View File

@@ -0,0 +1,62 @@
<?xml version="1.0" encoding="UTF-8"?>
<xsl:stylesheet version="1.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform">
<xsl:output method="text" encoding="UTF-8" media-type="application/json"/>
<xsl:template match="/">
<xsl:text>{</xsl:text>
<xsl:text>"server": {</xsl:text>
<xsl:text>"version": "</xsl:text><xsl:value-of select="rtmp/nginx_version"/><xsl:text>",</xsl:text>
<xsl:text>"uptime": "</xsl:text><xsl:value-of select="rtmp/server/uptime"/><xsl:text>",</xsl:text>
<xsl:text>"applications": [</xsl:text>
<xsl:for-each select="rtmp/server/application">
<xsl:if test="position() > 1">,</xsl:if>
<xsl:text>{</xsl:text>
<xsl:text>"name": "</xsl:text><xsl:value-of select="name"/><xsl:text>",</xsl:text>
<xsl:text>"streams": [</xsl:text>
<xsl:for-each select="live/stream">
<xsl:if test="position() > 1">,</xsl:if>
<xsl:text>{</xsl:text>
<xsl:text>"name": "</xsl:text><xsl:value-of select="name"/><xsl:text>",</xsl:text>
<xsl:text>"time": "</xsl:text><xsl:value-of select="time"/><xsl:text>",</xsl:text>
<xsl:text>"bw_in": "</xsl:text><xsl:value-of select="bw_in"/><xsl:text>",</xsl:text>
<xsl:text>"bw_out": "</xsl:text><xsl:value-of select="bw_out"/><xsl:text>",</xsl:text>
<xsl:text>"bytes_in": "</xsl:text><xsl:value-of select="bytes_in"/><xsl:text>",</xsl:text>
<xsl:text>"bytes_out": "</xsl:text><xsl:value-of select="bytes_out"/><xsl:text>",</xsl:text>
<xsl:text>"nclients": "</xsl:text><xsl:value-of select="nclients"/><xsl:text>",</xsl:text>
<xsl:text>"publishing": </xsl:text>
<xsl:choose>
<xsl:when test="count(client[publishing]) > 0">true</xsl:when>
<xsl:otherwise>false</xsl:otherwise>
</xsl:choose>
<xsl:text>,</xsl:text>
<xsl:text>"active": </xsl:text>
<xsl:choose>
<xsl:when test="active = 1">true</xsl:when>
<xsl:otherwise>false</xsl:otherwise>
</xsl:choose>
<xsl:text>,</xsl:text>
<xsl:text>"clients": [</xsl:text>
<xsl:for-each select="client">
<xsl:if test="position() > 1">,</xsl:if>
<xsl:text>{</xsl:text>
<xsl:text>"id": "</xsl:text><xsl:value-of select="id"/><xsl:text>",</xsl:text>
<xsl:text>"address": "</xsl:text><xsl:value-of select="address"/><xsl:text>",</xsl:text>
<xsl:text>"time": "</xsl:text><xsl:value-of select="time"/><xsl:text>",</xsl:text>
<xsl:text>"flashver": "</xsl:text><xsl:value-of select="flashver"/><xsl:text>",</xsl:text>
<xsl:text>"publishing": </xsl:text>
<xsl:choose>
<xsl:when test="publishing">true</xsl:when>
<xsl:otherwise>false</xsl:otherwise>
</xsl:choose>
<xsl:text>}</xsl:text>
</xsl:for-each>
<xsl:text>]</xsl:text>
<xsl:text>}</xsl:text>
</xsl:for-each>
<xsl:text>]</xsl:text>
<xsl:text>}</xsl:text>
</xsl:for-each>
<xsl:text>]</xsl:text>
<xsl:text>}</xsl:text>
<xsl:text>}</xsl:text>
</xsl:template>
</xsl:stylesheet>

81
dev/nginx.conf Normal file
View File

@@ -0,0 +1,81 @@
events {
worker_connections 1024; # Define the maximum number of simultaneous connections
}
rtmp {
server {
listen 1935;
application live {
live on;
record off;
on_publish http://localhost:3000/api/rtmp/publish;
}
application channel-live {
live on;
record off;
allow publish 127.0.0.1;
deny publish all;
hls on;
hls_path /dev/shm/hls;
hls_fragment 3;
hls_playlist_length 5;
hls_cleanup on;
hls_variant _low BANDWIDTH=500000;
hls_variant _mid BANDWIDTH=1000000;
hls_variant _hi BANDWIDTH=1500000;
}
}
}
http {
include mime.types;
default_type application/octet-stream;
# performance optimizations
sendfile on;
tcp_nopush on;
tcp_nodelay on;
keepalive_timeout 65;
server {
listen 8888;
location /stat {
if ($request_method = "GET") {
add_header "Access-Control-Allow-Origin" *;
}
rtmp_stat all;
rtmp_stat_stylesheet stat.xsl;
}
location /json {
if ($request_method = "GET") {
add_header "Access-Control-Allow-Origin" *;
}
add_header Content-Type application/json;
rtmp_stat all;
rtmp_stat_stylesheet stat.xsl;
}
location /stat.xsl {
alias /var/www/html/stat.xsl;
}
location /hls {
alias /dev/shm/hls;
add_header Cache-Control no-cache;
add_header Access-Control-Allow-Origin *;
types {
application/vnd.apple.mpegurl m3u8;
video/mp2t ts;
}
}
}
}

View File

@@ -3,7 +3,8 @@
"version": "0.1.0",
"private": true,
"scripts": {
"dev": "docker compose --file dev/docker-compose.yml up -d && next dev --experimental-https --turbo",
"dev": "docker compose --file dev/docker-compose.yml up -d && next dev --turbo",
"donly": "docker compose --file dev/docker-compose.yml up",
"setup": "docker compose --file dev/docker-compose.yml up -d && prisma migrate deploy",
"build": "prisma generate && next build",
"start": "prisma migrate deploy && next start",

View File

@@ -0,0 +1,17 @@
-- CreateTable
CREATE TABLE "StreamKey" (
"id" TEXT NOT NULL,
"key" TEXT NOT NULL,
"channelId" TEXT NOT NULL,
CONSTRAINT "StreamKey_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE UNIQUE INDEX "StreamKey_key_key" ON "StreamKey"("key");
-- CreateIndex
CREATE UNIQUE INDEX "StreamKey_channelId_key" ON "StreamKey"("channelId");
-- AddForeignKey
ALTER TABLE "StreamKey" ADD CONSTRAINT "StreamKey_channelId_fkey" FOREIGN KEY ("channelId") REFERENCES "Channel"("id") ON DELETE RESTRICT ON UPDATE CASCADE;

View File

@@ -46,6 +46,7 @@ model Channel {
managers User[] @relation("ChannelManagers")
streamInfo StreamInfo[]
followers Follow[] @relation("ChannelFollowers")
streamKey StreamKey?
@@index([ownerId])
}
@@ -89,4 +90,12 @@ model Follow {
@@unique([userId, channelId])
@@index([userId])
@@index([channelId])
}
model StreamKey {
id String @id @default(cuid())
key String @unique
channelId String @unique
channel Channel @relation(fields: [channelId], references: [id])
}

View File

@@ -0,0 +1,32 @@
import prisma from '@/lib/db';
import { NextRequest } from 'next/server';
import { redirect } from 'next/navigation';
export async function POST(request: NextRequest) {
const formData = await request.formData();
const streamKey = formData.get('name')?.toString() || '';
console.log('streamKey:', streamKey);
const key = await prisma.streamKey.findFirst({
where: {
key: streamKey,
},
include: {
channel: true,
},
});
if (!key) {
return new Response('nay', {
status: 403,
});
}
const headers = new Headers();
headers.append('Location', `rtmp://127.0.0.1/channel-live/${key.channel.name}`);
return new Response(null, {
status: 302,
headers: headers,
});
}

View File

@@ -0,0 +1,52 @@
import { validateRequest } from "@/lib/auth";
import prisma from "@/lib/db";
import { NextRequest } from "next/server";
export async function POST(request: NextRequest) {
const { user } = await validateRequest();
const body = await request.json();
const { channel } = body;
if (!user) {
return new Response('Unauthorized', { status: 401 });
}
const channelInfo = await prisma.channel.findUnique({
where: { name: channel },
include: {
owner: true,
managers: true
}
});
if (!channelInfo) {
return new Response('Channel not found', { status: 404 });
}
const isBroadcaster =
channelInfo.ownerId === user.id ||
channelInfo.managers.some(m => m.id === user.id);
if (!isBroadcaster) {
return new Response('Unauthorized', { status: 401 });
}
const dbUpdate = await prisma.streamKey.upsert({
create: {
key: crypto.randomUUID(),
channelId: channelInfo.id
},
update: {
key: crypto.randomUUID()
},
where: {
channelId: channelInfo.id
}
})
return new Response(JSON.stringify({ key: dbUpdate.key }), {
status: 200,
headers: {
'Content-Type': 'application/json'
}
});
}

View File

@@ -7,7 +7,7 @@ import { toast } from 'sonner';
import useSWR from 'swr/mutation';
export default function RegenerateKey(props: Props) {
const { error, isMutating, trigger } = useSWR('/api/livekit/broadcasterToken', async (url) =>
const { error, isMutating, trigger } = useSWR('/api/rtmp/streamKey', async (url) =>
defaultFetcher(url, { body: JSON.stringify({ channel: props.channel }), method: 'POST' })
);

View File

@@ -1,5 +1,5 @@
export async function register() {
if (process.env.NEXT_RUNTIME === 'nodejs') {
await (await import('@/lib/instrumentation/streamInfo')).default();
// await (await import('@/lib/instrumentation/streamInfo')).default();
}
}