Compare commits

...

39 Commits

Author SHA1 Message Date
626a9f0d5b chore: rename volumes for prom and grafana 2026-03-13 09:31:00 +01:00
88cb43204a fix: reflect real authentication result
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-03-13 09:04:30 +01:00
1e5416f4b6 fix: ensure scheme is correct
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-03-13 08:52:20 +01:00
f31f74eb1a fix: expand env
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-03-13 08:51:36 +01:00
dc02831482 revert: some stuff from prev commit 2026-03-12 22:15:04 +01:00
a77ed916c5 fix: address remaining metrics review feedback 2026-03-12 22:05:38 +01:00
96a68b46ae fix: add authentication to metrics routes 2026-03-12 21:53:41 +01:00
21e2e094d6 fix: do not expose metrics 2026-03-12 21:38:14 +01:00
fcdbc4e878 fix metrics cardinality and stream key cache counts 2026-03-12 20:16:01 +01:00
cdb0c01ffd fix(chat): possible DoS when logging to prometheus 2026-03-09 22:47:28 +01:00
3771baae8c Merge branch 'main' into feat/metrics 2026-03-09 22:36:00 +01:00
d719debf6a fix(ui): it was flexbox 2026-03-09 22:34:39 +01:00
e22a35484a fix(ui): last commit fixed arrows but this should fix all users from showing 2026-03-09 22:34:39 +01:00
2597aa8d86 fix(ui): carousel overflow 2026-03-09 22:34:39 +01:00
e0b6075900 fix(bot): do not inherit admin from owner status 2026-03-09 22:34:39 +01:00
c7cedbbfe0 chore(chat): exclude chat grants from adding numbers to viewer count 2026-03-09 22:34:39 +01:00
df4537bbe3 fix(ui): it was flexbox 2026-03-09 22:30:03 +01:00
7cd071b3b6 fix(ui): last commit fixed arrows but this should fix all users from showing 2026-03-09 16:58:30 +01:00
a8a64432a4 fix(ui): carousel overflow 2026-03-09 16:44:45 +01:00
10b77c673e fix(bot): do not inherit admin from owner status 2026-03-06 23:46:16 +01:00
960e3306e4 chore(chat): exclude chat grants from adding numbers to viewer count 2026-03-06 23:42:20 +01:00
fbfbe3ff6f chore(metrics): redesign with opus because of course 2026-03-06 21:57:52 +01:00
07eefcf9c7 feat(metrics): add more production ready metrics 2026-03-06 21:30:14 +01:00
527155a0c1 feat(metrics): initial ai impl 2026-03-06 19:32:12 +01:00
ba30d6e097 chore(ui): choose any bot as mod and fix landing page 2026-03-02 19:19:23 +01:00
70ae7ef3b3 chore(mediamtx): latency fix attempt 2026-03-02 19:11:54 +01:00
eddfebc311 chore(prod): WHY WHY WHY WHY WHY 2026-03-02 17:23:19 +01:00
460125972f chore(prod): add log level debug to mediamtx 2026-03-02 17:13:59 +01:00
91b08f00b2 fix(docker): please be over 2026-03-02 17:04:01 +01:00
eccf9e5791 fix(docker): build mediamtx image instead 2026-03-02 16:58:50 +01:00
01514931cb fix(docker): minor config issue 2026-03-02 16:22:27 +01:00
6d5f7b4fd5 feat(docker): add production docker image for mediatmx 2026-03-02 08:23:46 +01:00
2c95ddc6dd chore(docker): include mediamtx config file 2026-03-01 20:46:58 +01:00
a6fcaff5f3 fix(types): build issues 2026-02-24 22:56:36 +01:00
b4f66e01d9 fix(chat): code review and documentation update 2026-02-24 22:38:58 +01:00
ef5eab0d17 refactor(chat): platform admin fix cleanup 2026-02-24 22:27:38 +01:00
cf4cc77071 fix(chat): platform admin cannot moderate chats 2026-02-24 21:21:22 +01:00
1bbe4fdc0a chore(ui): remove zrl username block 2026-02-23 22:43:13 +01:00
67b9af57f9 fix(settings): user dropdown wasnt showing channel managers 2026-02-23 22:36:23 +01:00
38 changed files with 5154 additions and 641 deletions

View File

@@ -10,7 +10,7 @@ jobs:
name: Push frontend to Docker Hub
runs-on: ubuntu-latest
steps:
- name: Wait
- name: Wait
uses: NathanFirmo/wait-for-other-action@v1.0.4
with:
token: ${{ secrets.GITHUB_TOKEN }}
@@ -40,12 +40,12 @@ jobs:
RELEASE_URL=$(curl -s https://api.github.com/repos/srizan10/hctv/releases/latest | \
grep "browser_download_url.*slack-import-emojis-linux-x86_64" | \
cut -d '"' -f 4)
curl -L -o slack-import-emojis-bin $RELEASE_URL
chmod +x slack-import-emojis-bin
mkdir -p apps/web/src/lib/instrumentation/
./slack-import-emojis-bin default
cp emojis.json apps/web/
@@ -98,11 +98,43 @@ jobs:
secrets: |
TURBO_TOKEN=${{ secrets.TURBO_TOKEN }}
TURBO_TEAM=${{ secrets.TURBO_TEAM }}
mediamtx:
name: Push MediaMTX image to Docker Hub
runs-on: ubuntu-latest
steps:
- name: Check out the repo
uses: actions/checkout@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Log in to Docker Hub
uses: docker/login-action@f4ef78c080cd8ba55a85445d5b36e214a81df20a
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Extract metadata (tags, labels) for Docker
id: meta
uses: docker/metadata-action@9ec57ed1fcdbf14dcef7dfbe97b2010124a938b7
with:
images: srizan10/hclive-mediamtx
tags: latest
- name: Build and push Docker image
uses: docker/build-push-action@v6
with:
context: .
file: ./docker/mediamtx/Dockerfile
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
platforms: linux/amd64
deploy:
name: Deploy to Coolify
runs-on: ubuntu-latest
needs: [frontend, chat]
needs: [frontend, chat, mediamtx]
steps:
- name: Send coolify redeploy webhook
run: |
curl -X POST -H "Authorization: Bearer ${{ secrets.COOLIFY_API_KEY }}" https://coolify.srizan.dev/api/v1/deploy?uuid=${{ secrets.COOLIFY_APP_UUID }}&force=true
curl -X POST -H "Authorization: Bearer ${{ secrets.COOLIFY_API_KEY }}" https://coolify.srizan.dev/api/v1/deploy?uuid=${{ secrets.COOLIFY_APP_UUID }}&force=true

View File

@@ -14,7 +14,8 @@
"@hono/node-ws": "^1.1.0",
"@leeoniya/ufuzzy": "^1.0.18",
"@oslojs/encoding": "^1.1.0",
"hono": "^4.7.5"
"hono": "^4.7.5",
"prom-client": "^15.1.3"
},
"devDependencies": {
"@types/node": "^20.11.17",

View File

@@ -1,13 +1,43 @@
import { serve } from '@hono/node-server';
import { createNodeWebSocket, type ModifiedWebSocket } from '@hctv/hono-ws';
import { createNodeWebSocket } from '@hctv/hono-ws';
import { Hono } from 'hono';
import { readFile } from 'node:fs/promises';
import { lucia } from '@hctv/auth';
import { getCookie } from 'hono/cookie';
import {
chatMetricsRegistry,
recordChatConnectionAccepted,
recordChatConnectionRejected,
recordChatDisconnect,
recordChatError,
recordChatModerationBlock,
recordDeliveredChatMessage,
recordDeliveredChatMessageBytes,
recordEmojiSearchResults,
recordHistoryMessagesLoaded,
recordIncomingChatMessage,
recordUniqueChatter,
setChannelHistorySize,
setChatModerationState,
startChatMessageTimer,
} from './metrics.js';
import { getPersonalChannel } from './utils/personalChannel.js';
import { ChatModerationAction, getRedisConnection, prisma } from '@hctv/db';
import uFuzzy from '@leeoniya/ufuzzy';
import {
handleDeleteMessageCommand,
handleUserRestrictionCommand,
sendModerationError,
} from './utils/moderation.js';
import { randomString } from './utils/randomString.js';
import type {
ChatModerationCommand,
ChatModerationSettingsShape,
ChatRestrictionState,
ChatSocket,
ChatUser,
} from './types/chat.js';
import { basicAuth } from 'hono/basic-auth';
const redis = getRedisConnection();
const MESSAGE_HISTORY_SIZE = 100;
@@ -16,6 +46,35 @@ const MODERATION_SETTINGS_CACHE_TTL_SECONDS = 30;
const threed = await readFile('./src/3d.txt', 'utf-8');
const uf = new uFuzzy();
type IncomingMessage = {
type?: string;
[key: string]: unknown;
};
const METRICS_MESSAGE_TYPES = [
'ping',
'message',
'emojiMsg',
'emojiSearch',
'mod:deleteMessage',
'mod:timeoutUser',
'mod:banUser',
'mod:unbanUser',
'mod:liftTimeout',
] as const;
type MetricsMessageType = (typeof METRICS_MESSAGE_TYPES)[number] | 'unknown';
function getMetricsMessageType(type: unknown): MetricsMessageType {
if (typeof type !== 'string') {
return 'unknown';
}
return (METRICS_MESSAGE_TYPES as readonly string[]).includes(type)
? (type as MetricsMessageType)
: 'unknown';
}
const DEFAULT_MODERATION_SETTINGS: ChatModerationSettingsShape = {
blockedTerms: [],
slowModeSeconds: 0,
@@ -182,6 +241,14 @@ async function broadcastRestrictionStateToUser(
});
}
const RATE_LIMIT_LUA = `
local current = redis.call('INCR', KEYS[1])
if current == 1 then
redis.call('EXPIRE', KEYS[1], ARGV[1])
end
return current
`;
async function isRateLimited(
channelId: string,
userId: string,
@@ -189,12 +256,7 @@ async function isRateLimited(
windowSeconds: number
): Promise<boolean> {
const key = `chat:ratelimit:${channelId}:${userId}`;
const currentCount = await redis.incr(key);
if (currentCount === 1) {
await redis.expire(key, windowSeconds);
}
const currentCount = (await redis.eval(RATE_LIMIT_LUA, 1, key, String(windowSeconds))) as number;
return currentCount > count;
}
@@ -239,25 +301,42 @@ async function deleteMessageFromHistory(targetUsername: string, msgId: string):
const app = new Hono();
const { injectWebSocket, upgradeWebSocket } = createNodeWebSocket({ app });
if (process.env.NODE_ENV === 'production') {
app.use(
'/metrics',
basicAuth({ username: process.env.METRICS_USER!, password: process.env.METRICS_PASS! })
);
}
app.get('/', async (c) => {
return c.text(threed);
});
app.get('/up', async (c) => {
return c.text('it works');
return c.text('hello world');
});
app.get('/metrics', async () => {
return new Response(await chatMetricsRegistry.metrics(), {
headers: {
'Content-Type': chatMetricsRegistry.contentType,
'Cache-Control': 'no-store, no-cache, must-revalidate, proxy-revalidate',
},
});
});
app.get(
'/ws/:username',
upgradeWebSocket((c) => ({
async onOpen(evt, ws) {
let authMethod = 'unknown';
const token = getCookie(c, 'auth_session');
const grant = c.req.query('grant');
const authHeader = c.req.header('Authorization');
const botAuth = c.req.query('botAuth');
if (!token && (!grant || grant === 'null') && !authHeader && !botAuth) {
recordChatConnectionRejected(authMethod, 'missing_auth');
ws.close();
return;
}
@@ -284,6 +363,7 @@ app.get(
});
if (botAccount) {
authMethod = 'bot_api_key';
chatUser = {
id: botAccount.botAccount.id,
username: botAccount.botAccount.slug,
@@ -307,6 +387,7 @@ app.get(
if (session.user) {
const userChannel = await getPersonalChannel(session.user.id);
if (userChannel) {
authMethod = 'session';
chatUser = {
id: session.user.id,
username: userChannel.name,
@@ -329,16 +410,21 @@ app.get(
: null;
if (!chatUser && !dbGrant) {
recordChatConnectionRejected(authMethod, 'auth_failed');
ws.close();
return;
}
const { username } = c.req.param();
if (!chatUser && dbGrant) {
authMethod = 'obs_grant';
}
if (dbGrant && dbGrant.name !== username) {
recordChatConnectionRejected(authMethod, 'grant_mismatch');
ws.close();
return;
}
const channel = await prisma.channel.findUnique({
where: { name: username },
select: {
@@ -363,6 +449,7 @@ app.get(
});
if (!channel) {
recordChatConnectionRejected(authMethod, 'channel_not_found');
ws.close();
return;
}
@@ -391,14 +478,15 @@ app.get(
chatUser = {
...chatUser,
isPlatformAdmin: Boolean(moderatorUser?.isAdmin),
isPlatformAdmin: chatUser.isBot ? false : Boolean(moderatorUser?.isAdmin),
channelRole,
};
}
const isModerator = Boolean(
chatUser &&
(chatUser.channelRole === 'owner' ||
(chatUser.isPlatformAdmin ||
chatUser.channelRole === 'owner' ||
chatUser.channelRole === 'manager' ||
chatUser.channelRole === 'chatModerator' ||
chatUser.channelRole === 'botModerator')
@@ -415,6 +503,7 @@ app.get(
socket.personalChannel = personalChannel;
socket.viewerId = randomString(10);
socket.isModerator = isModerator;
socket.excludeFromViewerCount = Boolean(dbGrant);
socketState.targetUsername = username;
socketState.channelId = channel.id;
@@ -422,6 +511,21 @@ app.get(
socketState.personalChannel = personalChannel;
socketState.viewerId = socket.viewerId;
socketState.isModerator = isModerator;
socket.metricsTracked = true;
socketState.metricsTracked = true;
socket.metricsAuthMethod = authMethod;
socketState.metricsAuthMethod = authMethod;
recordChatConnectionAccepted(username, authMethod);
setChatModerationState(username, {
blockedTerms: moderationSettings.blockedTerms.length,
maxMessageLength: moderationSettings.maxMessageLength,
rateLimitCount: moderationSettings.rateLimitCount,
rateLimitWindowSeconds: moderationSettings.rateLimitWindowSeconds,
slowModeSeconds: moderationSettings.slowModeSeconds,
});
socketState.excludeFromViewerCount = Boolean(dbGrant);
socket.send(
JSON.stringify({
@@ -451,6 +555,7 @@ app.get(
const messages = await redis.zrange(channelKey, 0, MESSAGE_HISTORY_SIZE - 1);
if (messages.length > 0) {
recordHistoryMessagesLoaded(username, messages.length);
socket.send(
JSON.stringify({
type: 'history',
@@ -458,6 +563,7 @@ app.get(
})
);
}
setChannelHistorySize(username, messages.length);
},
async onClose(evt, ws) {
const socket = ws as unknown as ChatSocket;
@@ -465,6 +571,14 @@ app.get(
if (process.env.NODE_ENV !== 'production') console.log('client disconnected');
if (!socketState.targetUsername) return;
if (socketState.metricsTracked) {
recordChatDisconnect(
socketState.targetUsername,
socketState.metricsAuthMethod ?? 'unknown'
);
socketState.metricsTracked = false;
}
const streamInfo = await prisma.streamInfo.findUnique({
where: {
username: socketState.targetUsername,
@@ -476,68 +590,44 @@ app.get(
if (!streamInfo) return;
await redis.del(`viewer:${socketState.targetUsername}:${socketState.viewerId}`);
if (!socketState.excludeFromViewerCount) {
await redis.del(`viewer:${socketState.targetUsername}:${socketState.viewerId}`);
}
},
async onMessage(evt, ws) {
let outcome = 'ignored';
let messageType = 'unknown';
let stopTimer: ReturnType<typeof startChatMessageTimer> | null = null;
try {
const socket = ws as unknown as ChatSocket;
const socketState = resolveSocketState(socket);
const msg = JSON.parse(evt.data.toString());
const rawPayload = evt.data.toString();
const msg = JSON.parse(rawPayload) as IncomingMessage;
messageType = getMetricsMessageType(msg.type);
recordIncomingChatMessage(messageType, Buffer.byteLength(rawPayload));
stopTimer = startChatMessageTimer(messageType);
if (msg.type === 'ping') {
await redis.setex(
`viewer:${socketState.targetUsername}:${socketState.viewerId}`,
30,
'1'
);
if (!socketState.excludeFromViewerCount) {
await redis.setex(
`viewer:${socketState.targetUsername}:${socketState.viewerId}`,
30,
'1'
);
}
socket.send(JSON.stringify({ type: 'pong' }));
outcome = 'pong';
return;
}
if (msg.type === 'mod:deleteMessage') {
if (
!socketState.isModerator ||
!socketState.chatUser ||
!socketState.targetUsername ||
!socketState.channelId
) {
return;
}
const msgId = typeof msg.msgId === 'string' ? msg.msgId : '';
if (!msgId) {
socket.send(
JSON.stringify({
type: 'moderationError',
code: 'INVALID_REQUEST',
message: 'Invalid message id.',
})
);
return;
}
const deleted = await deleteMessageFromHistory(socketState.targetUsername, msgId);
if (!deleted) {
socket.send(
JSON.stringify({
type: 'moderationError',
code: 'NOT_FOUND',
message: 'Message not found.',
})
);
return;
}
await logModerationEvent({
action: ChatModerationAction.MESSAGE_DELETED,
channelId: socketState.channelId,
moderatorId: socketState.chatUser.moderatorUserId,
reason: 'Message deleted by moderator',
details: { msgId },
await handleDeleteMessageCommand(socket, socketState, msg as ChatModerationCommand, {
deleteMessageFromHistory,
logModerationEvent,
broadcastToChannel,
});
broadcastToChannel(socketState.targetUsername, socket, { type: 'messageDeleted', msgId });
outcome = 'moderation';
return;
}
@@ -547,159 +637,12 @@ app.get(
msg.type === 'mod:unbanUser' ||
msg.type === 'mod:liftTimeout'
) {
if (
!socketState.isModerator ||
!socketState.chatUser ||
!socketState.targetUsername ||
!socketState.channelId
) {
return;
}
const actingModeratorUserId = socketState.chatUser.moderatorUserId;
const targetUserId = typeof msg.targetUserId === 'string' ? msg.targetUserId : '';
if (!targetUserId || targetUserId === actingModeratorUserId) {
socket.send(
JSON.stringify({
type: 'moderationError',
code: 'INVALID_TARGET',
message: 'Invalid moderation target.',
})
);
return;
}
const targetUserRecord = await prisma.user.findUnique({
where: { id: targetUserId },
select: {
isAdmin: true,
personalChannel: { select: { name: true } },
},
await handleUserRestrictionCommand(socket, socketState, msg as ChatModerationCommand, {
logModerationEvent,
broadcastRestrictionStateToUser,
broadcastToChannel,
});
if (!targetUserRecord) {
socket.send(
JSON.stringify({
type: 'moderationError',
code: 'INVALID_TARGET',
message: 'Target user no longer exists.',
})
);
return;
}
const actingUserRecord = await prisma.user.findUnique({
where: { id: actingModeratorUserId },
select: { isAdmin: true },
});
if (
process.env.NODE_ENV === 'production' &&
targetUserRecord.isAdmin &&
!actingUserRecord?.isAdmin
) {
socket.send(
JSON.stringify({
type: 'moderationError',
code: 'FORBIDDEN',
message: 'Platform admins cannot be moderated via chat commands.',
})
);
return;
}
const resolvedTargetUsername = targetUserRecord.personalChannel?.name ?? 'that user';
if (msg.type === 'mod:unbanUser' || msg.type === 'mod:liftTimeout') {
await prisma.chatUserBan.deleteMany({
where: {
channelId: socketState.channelId,
userId: targetUserId,
},
});
await logModerationEvent({
action: ChatModerationAction.USER_UNBANNED,
channelId: socketState.channelId,
moderatorId: actingModeratorUserId,
targetUserId,
reason: 'User unbanned in chat',
});
await broadcastRestrictionStateToUser(
socketState.targetUsername,
targetUserId,
socketState.channelId,
socket
);
broadcastToChannel(socketState.targetUsername, socket, {
type: 'systemMsg',
message: `${resolvedTargetUsername} can chat again.`,
});
return;
}
const reason =
typeof msg.reason === 'string' && msg.reason.trim().length > 0
? msg.reason.trim().slice(0, 250)
: msg.type === 'mod:timeoutUser'
? 'Timed out by moderator'
: 'Banned by moderator';
const durationSeconds =
msg.type === 'mod:timeoutUser'
? Math.min(Math.max(Number(msg.durationSeconds) || 300, 10), 60 * 60 * 24)
: null;
const expiresAt = durationSeconds ? new Date(Date.now() + durationSeconds * 1000) : null;
await prisma.chatUserBan.upsert({
where: {
channelId_userId: {
channelId: socketState.channelId,
userId: targetUserId,
},
},
create: {
channelId: socketState.channelId,
userId: targetUserId,
bannedById: actingModeratorUserId,
reason,
expiresAt,
},
update: {
bannedById: actingModeratorUserId,
reason,
expiresAt,
},
});
await logModerationEvent({
action:
msg.type === 'mod:timeoutUser'
? ChatModerationAction.USER_TIMEOUT
: ChatModerationAction.USER_BANNED,
channelId: socketState.channelId,
moderatorId: actingModeratorUserId,
targetUserId,
reason,
details: durationSeconds ? { durationSeconds } : undefined,
});
await broadcastRestrictionStateToUser(
socketState.targetUsername,
targetUserId,
socketState.channelId,
socket
);
broadcastToChannel(socketState.targetUsername, socket, {
type: 'systemMsg',
message:
msg.type === 'mod:timeoutUser'
? `${resolvedTargetUsername} was timed out for ${durationSeconds}s.`
: `${resolvedTargetUsername} was banned.`,
});
outcome = 'moderation';
return;
}
@@ -726,19 +669,17 @@ app.get(
const restriction = await getActiveRestriction(channelId, chatUser.id);
if (restriction) {
socket.send(
JSON.stringify({
type: 'moderationError',
code: restriction.type === 'timeout' ? 'TIMED_OUT' : 'BANNED',
message:
restriction.type === 'timeout'
? 'You are currently timed out in this chat.'
: 'You are currently banned from this chat.',
restriction,
})
sendModerationError(
socket,
restriction.type === 'timeout' ? 'TIMED_OUT' : 'BANNED',
restriction.type === 'timeout'
? 'You are currently timed out in this chat.'
: 'You are currently banned from this chat.',
restriction
);
await sendChatAccessState(socket, channelId, chatUser.id);
outcome = 'blocked';
return;
}
@@ -751,13 +692,9 @@ app.get(
moderationSettings.rateLimitWindowSeconds
))
) {
socket.send(
JSON.stringify({
type: 'moderationError',
code: 'RATE_LIMIT',
message: 'You are sending messages too fast.',
})
);
sendModerationError(socket, 'RATE_LIMIT', 'You are sending messages too fast.');
recordChatModerationBlock('rate_limit');
outcome = 'rate_limited';
return;
}
@@ -765,13 +702,9 @@ app.get(
const slowModeKey = `chat:slowmode:${channelId}:${chatUser.id}`;
const timeRemaining = await redis.ttl(slowModeKey);
if (timeRemaining > 0) {
socket.send(
JSON.stringify({
type: 'moderationError',
code: 'SLOW_MODE',
message: `Slow mode is on. Wait ${timeRemaining}s.`,
})
);
sendModerationError(socket, 'SLOW_MODE', `Slow mode is on. Wait ${timeRemaining}s.`);
recordChatModerationBlock('slow_mode');
outcome = 'slow_mode';
return;
}
await redis.setex(slowModeKey, moderationSettings.slowModeSeconds, '1');
@@ -782,13 +715,13 @@ app.get(
return;
}
if (message.length > moderationSettings.maxMessageLength) {
socket.send(
JSON.stringify({
type: 'moderationError',
code: 'MESSAGE_TOO_LONG',
message: `Message exceeds ${moderationSettings.maxMessageLength} characters.`,
})
sendModerationError(
socket,
'MESSAGE_TOO_LONG',
`Message exceeds ${moderationSettings.maxMessageLength} characters.`
);
recordChatModerationBlock('message_too_long');
outcome = 'message_too_long';
return;
}
@@ -804,13 +737,9 @@ app.get(
details: { blockedTerm },
});
}
socket.send(
JSON.stringify({
type: 'moderationError',
code: 'BLOCKED_TERM',
message: 'Message blocked by channel moderation.',
})
);
recordChatModerationBlock('blocked_term');
sendModerationError(socket, 'BLOCKED_TERM', 'Message blocked by channel moderation.');
outcome = 'blocked_term';
return;
}
@@ -831,16 +760,30 @@ app.get(
};
const redisStr = JSON.stringify(msgObj);
const msgStr = JSON.stringify(msgObj);
const channelKey = `chat:history:${targetUsername}`;
await redis.zadd(channelKey, Date.now(), redisStr);
await redis.zremrangebyrank(channelKey, 0, -MESSAGE_HISTORY_SIZE - 1);
await redis.expire(channelKey, MESSAGE_TTL);
const historySize = await redis.zcard(channelKey);
setChannelHistorySize(targetUsername, historySize);
broadcastToChannel(targetUsername, socket, msgObj as unknown as Record<string, unknown>);
recordDeliveredChatMessage(chatUser.isBot ? 'bot' : 'user');
recordDeliveredChatMessageBytes(
chatUser.isBot ? 'bot' : 'user',
Buffer.byteLength(message)
);
const isFirstMessageFromUser =
(await redis.sadd(`chat:unique-chatters:${targetUsername}`, chatUser.id)) === 1;
if (isFirstMessageFromUser) {
recordUniqueChatter(chatUser.isBot ? 'bot' : 'user');
}
outcome = 'broadcast';
}
if (msg.type === 'emojiMsg') {
if (!socketState.chatUser) return;
const emojis = msg.emojis as string[];
const emojiMap: Record<string, string> = {};
@@ -865,11 +808,15 @@ app.get(
emojis: emojiMap,
})
);
outcome = 'emoji_lookup';
}
if (msg.type === 'emojiSearch') {
if (!socketState.chatUser) return;
const rawSearchTerm = (msg.searchTerm as string)?.trim() ?? '';
if (!rawSearchTerm || rawSearchTerm.length > 50) {
ws.send(JSON.stringify({ type: 'emojiSearchResponse', results: [] }));
recordEmojiSearchResults('empty', 0);
outcome = 'emoji_search_empty';
return;
}
const searchTerm = rawSearchTerm;
@@ -899,6 +846,8 @@ app.get(
results: results,
})
);
recordEmojiSearchResults('matched', results.length);
outcome = 'emoji_search';
} else {
ws.send(
JSON.stringify({
@@ -906,10 +855,16 @@ app.get(
results: [],
})
);
recordEmojiSearchResults('no_match', 0);
outcome = 'emoji_search_empty';
}
}
} catch (e) {
outcome = 'error';
recordChatError('on_message');
console.error('Error processing message:', e);
} finally {
stopTimer?.({ type: messageType, outcome });
}
},
}))
@@ -925,53 +880,3 @@ const server = serve(
}
);
injectWebSocket(server);
interface ChatUser {
id: string;
username: string;
pfpUrl: string;
displayName?: string;
isBot: boolean;
moderatorUserId: string;
isPlatformAdmin: boolean;
channelRole: 'owner' | 'manager' | 'chatModerator' | 'botModerator' | null;
}
interface ChatModerationSettingsShape {
blockedTerms: string[];
slowModeSeconds: number;
maxMessageLength: number;
rateLimitCount: number;
rateLimitWindowSeconds: number;
}
interface ChatRestrictionState {
type: 'timeout' | 'ban';
reason: string;
expiresAt: Date | null;
}
interface ChatSocket {
readyState: number;
OPEN: number;
send: (data: string) => void;
close: () => void;
wss: {
clients: Set<unknown>;
};
targetUsername?: string;
channelId?: string;
chatUser?: ChatUser | null;
personalChannel?: any;
viewerId?: string;
isModerator?: boolean;
raw?:
| (ModifiedWebSocket & {
targetUsername?: string;
channelId?: string;
chatUser?: ChatUser | null;
personalChannel?: any;
isModerator?: boolean;
})
| null;
}

251
apps/chat/src/metrics.ts Normal file
View File

@@ -0,0 +1,251 @@
import { collectDefaultMetrics, Counter, Gauge, Histogram, Registry } from 'prom-client';
function createMetricsStore() {
const register = new Registry();
register.setDefaultLabels({ app: 'chat' });
collectDefaultMetrics({
prefix: 'hctv_chat_',
register,
});
const websocketConnections = new Gauge({
name: 'hctv_chat_websocket_connections',
help: 'Current number of active chat websocket connections.',
registers: [register],
});
const websocketConnectionsByChannel = new Gauge({
name: 'hctv_chat_websocket_connections_by_channel',
help: 'Current number of active chat websocket connections by target channel.',
labelNames: ['channel'],
registers: [register],
});
const websocketConnectionsByAuthMethod = new Gauge({
name: 'hctv_chat_websocket_connections_by_auth_method',
help: 'Current number of active chat websocket connections by auth method.',
labelNames: ['auth_method'],
registers: [register],
});
const websocketConnectionAttempts = new Counter({
name: 'hctv_chat_websocket_connection_attempts_total',
help: 'Total websocket connection attempts grouped by outcome, auth method, and rejection reason.',
labelNames: ['outcome', 'auth_method', 'reason'],
registers: [register],
});
const incomingMessages = new Counter({
name: 'hctv_chat_incoming_messages_total',
help: 'Total inbound websocket frames grouped by message type.',
labelNames: ['type'],
registers: [register],
});
const inboundPayloadBytes = new Counter({
name: 'hctv_chat_inbound_payload_bytes_total',
help: 'Total inbound websocket payload bytes grouped by message type.',
labelNames: ['type'],
registers: [register],
});
const messageDuration = new Histogram({
name: 'hctv_chat_message_duration_seconds',
help: 'Chat websocket message processing time in seconds.',
labelNames: ['type', 'outcome'],
buckets: [0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1],
registers: [register],
});
const deliveredMessages = new Counter({
name: 'hctv_chat_messages_delivered_total',
help: 'Total chat messages successfully broadcast, grouped by sender type.',
labelNames: ['sender_type'],
registers: [register],
});
const deliveredMessageBytes = new Counter({
name: 'hctv_chat_message_bytes_delivered_total',
help: 'Total message body bytes successfully broadcast, grouped by sender type.',
labelNames: ['sender_type'],
registers: [register],
});
const channelHistorySize = new Gauge({
name: 'hctv_chat_channel_history_size',
help: 'Current number of messages retained in Redis history for a channel.',
labelNames: ['channel'],
registers: [register],
});
const channelHistoryLoadedMessages = new Counter({
name: 'hctv_chat_history_messages_loaded_total',
help: 'Total history messages loaded from Redis during websocket joins.',
labelNames: ['channel'],
registers: [register],
});
const moderationState = new Gauge({
name: 'hctv_chat_moderation_state',
help: 'Current moderation settings by channel.',
labelNames: ['channel', 'setting'],
registers: [register],
});
const channelUniqueChatters = new Counter({
name: 'hctv_chat_unique_chatters_total',
help: 'Users who successfully sent at least one chat message, grouped by sender type.',
labelNames: ['sender_type'],
registers: [register],
});
const moderationActions = new Counter({
name: 'hctv_chat_moderation_actions_total',
help: 'Successful moderation actions performed in chat.',
labelNames: ['action'],
registers: [register],
});
const moderationBlocks = new Counter({
name: 'hctv_chat_moderation_blocks_total',
help: 'Message blocks and throttling decisions grouped by reason.',
labelNames: ['reason'],
registers: [register],
});
const emojiSearchResults = new Histogram({
name: 'hctv_chat_emoji_search_results',
help: 'Number of emoji search results returned per query.',
labelNames: ['outcome'],
buckets: [0, 1, 2, 5, 10, 25, 50, 100, 150],
registers: [register],
});
const errors = new Counter({
name: 'hctv_chat_errors_total',
help: 'Errors observed in the chat service grouped by phase.',
labelNames: ['phase'],
registers: [register],
});
return {
deliveredMessages,
deliveredMessageBytes,
channelHistoryLoadedMessages,
channelHistorySize,
emojiSearchResults,
errors,
inboundPayloadBytes,
incomingMessages,
messageDuration,
moderationActions,
moderationBlocks,
moderationState,
register,
channelUniqueChatters,
websocketConnectionAttempts,
websocketConnections,
websocketConnectionsByAuthMethod,
websocketConnectionsByChannel,
};
}
const globalForMetrics = globalThis as typeof globalThis & {
__hctvChatMetrics?: ReturnType<typeof createMetricsStore>;
};
const metrics = (globalForMetrics.__hctvChatMetrics ??= createMetricsStore());
export const chatMetricsRegistry = metrics.register;
export function recordChatConnectionAccepted(channel: string, authMethod: string): void {
metrics.websocketConnectionAttempts.inc({
auth_method: authMethod,
outcome: 'accepted',
reason: 'none',
});
metrics.websocketConnections.inc();
metrics.websocketConnectionsByChannel.inc({ channel });
metrics.websocketConnectionsByAuthMethod.inc({ auth_method: authMethod });
}
export function recordChatConnectionRejected(authMethod: string, reason: string): void {
metrics.websocketConnectionAttempts.inc({ auth_method: authMethod, outcome: 'rejected', reason });
}
export function recordChatDisconnect(channel: string, authMethod: string): void {
metrics.websocketConnections.dec();
metrics.websocketConnectionsByChannel.dec({ channel });
metrics.websocketConnectionsByAuthMethod.dec({ auth_method: authMethod });
}
export function recordIncomingChatMessage(type: string, payloadBytes: number): void {
metrics.incomingMessages.inc({ type });
metrics.inboundPayloadBytes.inc({ type }, payloadBytes);
}
export function startChatMessageTimer(type: string) {
return metrics.messageDuration.startTimer({ type });
}
export function recordDeliveredChatMessage(senderType: string): void {
metrics.deliveredMessages.inc({ sender_type: senderType });
}
export function recordDeliveredChatMessageBytes(senderType: string, bytes: number): void {
metrics.deliveredMessageBytes.inc({ sender_type: senderType }, bytes);
}
export function setChannelHistorySize(channel: string, size: number): void {
metrics.channelHistorySize.set({ channel }, size);
}
export function recordHistoryMessagesLoaded(channel: string, count: number): void {
if (count > 0) {
metrics.channelHistoryLoadedMessages.inc({ channel }, count);
}
}
export function setChatModerationState(
channel: string,
settings: {
blockedTerms: number;
maxMessageLength: number;
rateLimitCount: number;
rateLimitWindowSeconds: number;
slowModeSeconds: number;
}
): void {
metrics.moderationState.set({ channel, setting: 'blocked_terms' }, settings.blockedTerms);
metrics.moderationState.set({ channel, setting: 'slow_mode_seconds' }, settings.slowModeSeconds);
metrics.moderationState.set(
{ channel, setting: 'max_message_length' },
settings.maxMessageLength
);
metrics.moderationState.set({ channel, setting: 'rate_limit_count' }, settings.rateLimitCount);
metrics.moderationState.set(
{ channel, setting: 'rate_limit_window_seconds' },
settings.rateLimitWindowSeconds
);
}
export function recordUniqueChatter(senderType: string): void {
metrics.channelUniqueChatters.inc({ sender_type: senderType });
}
export function recordChatModerationAction(action: string): void {
metrics.moderationActions.inc({ action });
}
export function recordChatModerationBlock(reason: string): void {
metrics.moderationBlocks.inc({ reason });
}
export function recordChatError(phase: string): void {
metrics.errors.inc({ phase });
}
export function recordEmojiSearchResults(outcome: string, count: number): void {
metrics.emojiSearchResults.observe({ outcome }, count);
}

View File

@@ -0,0 +1,70 @@
import type { ModifiedWebSocket } from '@hctv/hono-ws';
export interface ChatUser {
id: string;
username: string;
pfpUrl: string;
displayName?: string;
isBot: boolean;
moderatorUserId: string;
isPlatformAdmin: boolean;
channelRole: 'owner' | 'manager' | 'chatModerator' | 'botModerator' | null;
}
export interface ChatModerationSettingsShape {
blockedTerms: string[];
slowModeSeconds: number;
maxMessageLength: number;
rateLimitCount: number;
rateLimitWindowSeconds: number;
}
export interface ChatRestrictionState {
type: 'timeout' | 'ban';
reason: string;
expiresAt: Date | null;
}
export interface ChatSocket {
readyState: number;
OPEN: number;
send: (data: string) => void;
close: () => void;
wss: {
clients: Set<unknown>;
};
targetUsername?: string;
channelId?: string;
chatUser?: ChatUser | null;
personalChannel?: any;
viewerId?: string;
isModerator?: boolean;
metricsTracked?: boolean;
metricsAuthMethod?: string;
excludeFromViewerCount?: boolean;
raw?:
| (ModifiedWebSocket & {
targetUsername?: string;
channelId?: string;
chatUser?: ChatUser | null;
personalChannel?: any;
isModerator?: boolean;
metricsTracked?: boolean;
metricsAuthMethod?: string;
excludeFromViewerCount?: boolean;
})
| null;
}
export type ChatModerationCommand = {
type:
| 'mod:deleteMessage'
| 'mod:timeoutUser'
| 'mod:banUser'
| 'mod:unbanUser'
| 'mod:liftTimeout';
msgId?: string;
targetUserId?: string;
durationSeconds?: number;
reason?: string;
};

View File

@@ -0,0 +1,411 @@
import { ChatModerationAction, prisma } from '@hctv/db';
import { recordChatModerationAction } from '../metrics.js';
import type {
ChatModerationCommand,
ChatRestrictionState,
ChatSocket,
ChatUser,
} from '../types/chat.js';
const ROLE_RANK: Record<NonNullable<ChatUser['channelRole']> | '__none__', number> = {
owner: 100,
manager: 50,
chatModerator: 10,
botModerator: 10,
__none__: 0,
};
function roleRank(role: ChatUser['channelRole']): number {
return role ? (ROLE_RANK[role] ?? 0) : ROLE_RANK.__none__;
}
type ModerationContext = {
chatUser: ChatUser;
targetUsername: string;
channelId: string;
};
type DeleteMessageDeps = {
deleteMessageFromHistory: (targetUsername: string, msgId: string) => Promise<boolean>;
logModerationEvent: (payload: {
action: ChatModerationAction;
channelId: string;
moderatorId: string;
targetUserId?: string;
reason?: string;
details?: Record<string, unknown>;
}) => Promise<void>;
broadcastToChannel: (
targetUsername: string,
ws: ChatSocket,
payload: Record<string, unknown>
) => void;
};
type UserRestrictionDeps = {
logModerationEvent: (payload: {
action: ChatModerationAction;
channelId: string;
moderatorId: string;
targetUserId?: string;
reason?: string;
details?: Record<string, unknown>;
}) => Promise<void>;
broadcastRestrictionStateToUser: (
targetUsername: string,
targetUserId: string,
channelId: string,
ws: ChatSocket
) => Promise<void>;
broadcastToChannel: (
targetUsername: string,
ws: ChatSocket,
payload: Record<string, unknown>
) => void;
};
export function sendModerationError(
socket: ChatSocket,
code: string,
message: string,
restriction?: ChatRestrictionState
) {
socket.send(
JSON.stringify({
type: 'moderationError',
code,
message,
restriction,
})
);
}
async function requireModerationContext(
socket: ChatSocket,
socketState: ChatSocket
): Promise<ModerationContext | null> {
if (!socketState.chatUser || !socketState.targetUsername || !socketState.channelId) {
sendModerationError(socket, 'FORBIDDEN', 'You do not have permission to moderate this chat.');
return null;
}
const chatUser = socketState.chatUser;
const channelId = socketState.channelId;
const [channel, moderatorRecord] = await Promise.all([
prisma.channel.findUnique({
where: { id: channelId },
select: {
ownerId: true,
managers: { select: { id: true } },
chatModerators: { select: { id: true } },
chatModeratorBots: { select: { id: true } },
},
}),
prisma.user.findUnique({
where: { id: chatUser.moderatorUserId },
select: { isAdmin: true },
}),
]);
if (!channel) {
sendModerationError(socket, 'FORBIDDEN', 'You do not have permission to moderate this chat.');
return null;
}
const isPlatformAdmin = chatUser.isBot ? false : Boolean(moderatorRecord?.isAdmin);
let channelRole: ChatUser['channelRole'] = null;
if (chatUser.isBot) {
if (channel.chatModeratorBots.some((b) => b.id === chatUser.id)) {
channelRole = 'botModerator';
}
} else if (channel.ownerId === chatUser.id) {
channelRole = 'owner';
} else if (channel.managers.some((m) => m.id === chatUser.id)) {
channelRole = 'manager';
} else if (channel.chatModerators.some((m) => m.id === chatUser.id)) {
channelRole = 'chatModerator';
}
const isModerator =
isPlatformAdmin ||
channelRole === 'owner' ||
channelRole === 'manager' ||
channelRole === 'chatModerator' ||
channelRole === 'botModerator';
if (!isModerator) {
sendModerationError(socket, 'FORBIDDEN', 'You do not have permission to moderate this chat.');
return null;
}
const resolvedChatUser: ChatUser = { ...chatUser, isPlatformAdmin, channelRole };
return {
chatUser: resolvedChatUser,
targetUsername: socketState.targetUsername,
channelId,
};
}
async function resolveModerationTarget(
socket: ChatSocket,
actingModeratorUserId: string,
rawTargetUserId: unknown,
channelId: string
) {
const targetUserId = typeof rawTargetUserId === 'string' ? rawTargetUserId : '';
if (!targetUserId || targetUserId === actingModeratorUserId) {
sendModerationError(socket, 'INVALID_TARGET', 'Invalid moderation target.');
return null;
}
const targetUserRecord = await prisma.user.findUnique({
where: { id: targetUserId },
select: {
isAdmin: true,
personalChannel: { select: { name: true } },
ownedChannels: { where: { id: channelId }, select: { id: true } },
managedChannels: { where: { id: channelId }, select: { id: true } },
chatModeratedChannels: { where: { id: channelId }, select: { id: true } },
},
});
if (!targetUserRecord) {
sendModerationError(socket, 'INVALID_TARGET', 'Target user no longer exists.');
return null;
}
let targetChannelRole: ChatUser['channelRole'] = null;
if (targetUserRecord.ownedChannels.length > 0) {
targetChannelRole = 'owner';
} else if (targetUserRecord.managedChannels.length > 0) {
targetChannelRole = 'manager';
} else if (targetUserRecord.chatModeratedChannels.length > 0) {
targetChannelRole = 'chatModerator';
}
return {
targetUserId,
targetUserRecord,
targetChannelRole,
resolvedTargetUsername: targetUserRecord.personalChannel?.name ?? 'that user',
};
}
async function ensureAdminTargetModerationAllowed(
socket: ChatSocket,
actingModeratorUserId: string,
targetIsAdmin: boolean
) {
if (!targetIsAdmin) {
return true;
}
const actingUserRecord = await prisma.user.findUnique({
where: { id: actingModeratorUserId },
select: { isAdmin: true },
});
if (!actingUserRecord?.isAdmin) {
sendModerationError(
socket,
'FORBIDDEN',
'Platform admins cannot be moderated via chat commands.'
);
return false;
}
return true;
}
function ensureRoleHierarchyAllowed(
socket: ChatSocket,
actorRole: ChatUser['channelRole'],
actorIsPlatformAdmin: boolean,
targetRole: ChatUser['channelRole']
): boolean {
if (actorIsPlatformAdmin) return true;
if (roleRank(actorRole) <= roleRank(targetRole)) {
sendModerationError(
socket,
'FORBIDDEN',
'You cannot moderate a user with an equal or higher role than yours.'
);
return false;
}
return true;
}
export async function handleDeleteMessageCommand(
socket: ChatSocket,
socketState: ChatSocket,
msg: ChatModerationCommand,
deps: DeleteMessageDeps
) {
const context = await requireModerationContext(socket, socketState);
if (!context) {
return;
}
const msgId = typeof msg.msgId === 'string' ? msg.msgId : '';
if (!msgId) {
sendModerationError(socket, 'INVALID_REQUEST', 'Invalid message id.');
return;
}
const deleted = await deps.deleteMessageFromHistory(context.targetUsername, msgId);
if (!deleted) {
sendModerationError(socket, 'NOT_FOUND', 'Message not found.');
return;
}
await deps.logModerationEvent({
action: ChatModerationAction.MESSAGE_DELETED,
channelId: context.channelId,
moderatorId: context.chatUser.moderatorUserId,
reason: 'Message deleted by moderator',
details: { msgId },
});
recordChatModerationAction('message_deleted');
deps.broadcastToChannel(context.targetUsername, socket, { type: 'messageDeleted', msgId });
}
export async function handleUserRestrictionCommand(
socket: ChatSocket,
socketState: ChatSocket,
msg: ChatModerationCommand,
deps: UserRestrictionDeps
) {
const context = await requireModerationContext(socket, socketState);
if (!context) {
return;
}
const actingModeratorUserId = context.chatUser.moderatorUserId;
const target = await resolveModerationTarget(
socket,
actingModeratorUserId,
msg.targetUserId,
context.channelId
);
if (!target) {
return;
}
const canModerateTarget = await ensureAdminTargetModerationAllowed(
socket,
actingModeratorUserId,
target.targetUserRecord.isAdmin
);
if (!canModerateTarget) {
return;
}
const hierarchyAllowed = ensureRoleHierarchyAllowed(
socket,
context.chatUser.channelRole,
context.chatUser.isPlatformAdmin,
target.targetChannelRole
);
if (!hierarchyAllowed) {
return;
}
if (msg.type === 'mod:unbanUser' || msg.type === 'mod:liftTimeout') {
await prisma.chatUserBan.deleteMany({
where: {
channelId: context.channelId,
userId: target.targetUserId,
},
});
await deps.logModerationEvent({
action: ChatModerationAction.USER_UNBANNED,
channelId: context.channelId,
moderatorId: actingModeratorUserId,
targetUserId: target.targetUserId,
reason: 'User unbanned in chat',
});
recordChatModerationAction('user_unbanned');
await deps.broadcastRestrictionStateToUser(
context.targetUsername,
target.targetUserId,
context.channelId,
socket
);
deps.broadcastToChannel(context.targetUsername, socket, {
type: 'systemMsg',
message: `${target.resolvedTargetUsername} can chat again.`,
});
return;
}
const reason =
typeof msg.reason === 'string' && msg.reason.trim().length > 0
? msg.reason.trim().slice(0, 250)
: msg.type === 'mod:timeoutUser'
? 'Timed out by moderator'
: 'Banned by moderator';
const durationSeconds =
msg.type === 'mod:timeoutUser'
? Math.min(Math.max(Number(msg.durationSeconds) || 300, 10), 60 * 60 * 24)
: null;
const expiresAt = durationSeconds ? new Date(Date.now() + durationSeconds * 1000) : null;
await prisma.chatUserBan.upsert({
where: {
channelId_userId: {
channelId: context.channelId,
userId: target.targetUserId,
},
},
create: {
channelId: context.channelId,
userId: target.targetUserId,
bannedById: actingModeratorUserId,
reason,
expiresAt,
},
update: {
bannedById: actingModeratorUserId,
reason,
expiresAt,
},
});
await deps.logModerationEvent({
action:
msg.type === 'mod:timeoutUser'
? ChatModerationAction.USER_TIMEOUT
: ChatModerationAction.USER_BANNED,
channelId: context.channelId,
moderatorId: actingModeratorUserId,
targetUserId: target.targetUserId,
reason,
details: durationSeconds ? { durationSeconds } : undefined,
});
recordChatModerationAction(msg.type === 'mod:timeoutUser' ? 'user_timeout' : 'user_banned');
await deps.broadcastRestrictionStateToUser(
context.targetUsername,
target.targetUserId,
context.channelId,
socket
);
deps.broadcastToChannel(context.targetUsername, socket, {
type: 'systemMsg',
message:
msg.type === 'mod:timeoutUser'
? `${target.resolvedTargetUsername} was timed out for ${durationSeconds}s.`
: `${target.resolvedTargetUsername} was banned.`,
});
}

View File

@@ -21,64 +21,235 @@ Bot accounts are now supported. You can choose to connect as a bot by providing
**Security Note:** When using the `?botAuth=` query parameter, be aware that query parameters may be logged in server logs, and/or proxy logs. Use the `Authorization` header method whenever possible. The query parameter method should only be used when connecting from an environment where headers cannot be set.
It is highly advised to use a bot account for any automated task, and to implement anything pointed out in this page.
</Aside>
Once connected, you must implement a subroutine in your code to send ping messages every about 5 seconds. This is because of Cloudflare limitations.
Messages are sent and received in JSON format. The following message types are supported:
- `session`: sent by the server immediately upon connection.
- received by client:
```json
{
"type": "session",
"viewer": {
"id": "user_id",
"username": "your_username"
},
"permissions": {
"canModerate": false
},
"moderation": {
"hasBlockedTerms": false,
"slowModeSeconds": 0,
"maxMessageLength": 400
}
}
```
`viewer` is `null` for unauthenticated (grant-only) connections. `canModerate` is `true` for channel owners, managers, moderators, and platform admins.
- `chatAccess`: sent by the server on connect (for authenticated non-bot users) and whenever a user's restriction state changes.
- received by client:
```json
{
"type": "chatAccess",
"canSend": true,
"restriction": null
}
```
When the user is restricted, `canSend` is `false` and `restriction` contains:
```json
{
"type": "timeout",
"reason": "Timed out by moderator",
"expiresAt": "2026-01-01T00:00:00.000Z"
}
```
`type` is either `"timeout"` or `"ban"`. `expiresAt` is an ISO 8601 string for timeouts, or `null` for permanent bans.
- `ping`: a ping message to keep the connection alive.
- sent by client:
```json
{
"type": "ping"
}
```
- received by client:
```json
{
"type": "pong"
}
```
- `message`: a chat message.
- sent by client:
```json
{
"type": "message",
"message": "Hello, world!"
}
```
- received by client:
- received by client (broadcast to all viewers of the channel):
```json
{
"type": "message",
"msgId": "uuid-v4",
"user": {
"id": "user_id",
"username": "user_who_sent_message",
"avatar": "https://emoji.slack-edge.com/avatar.png"
"pfpUrl": "https://example.com/avatar.png",
"displayName": "Display Name",
"isBot": false,
"isPlatformAdmin": false,
"channelRole": null
},
"message": "Hello, world!"
}
```
- `ping`: a ping message to keep the connection alive.
- sent by client:
```json
{
"type": "ping"
}
```
- received by client:
```json
{
"type": "pong"
}
```
- `history`: a message containing the chat history. This is sent upon connection.
`channelRole` is one of `"owner"`, `"manager"`, `"chatModerator"`, `"botModerator"`, or `null`. `displayName` may be `undefined` for regular users.
- `history`: the recent chat history, sent upon connection.
- received by client:
```json
{
"type": "history",
"messages": [
{
"type": "message",
"msgId": "uuid-v4",
"user": {
"id": "user_id",
"username": "user_who_sent_message",
"avatar": "https://emoji.slack-edge.com/avatar.png"
"pfpUrl": "https://example.com/avatar.png",
"displayName": "Display Name",
"isBot": false,
"isPlatformAdmin": false,
"channelRole": null
},
"message": "Hello, world!",
"type": "message",
},
...
"message": "Hello, world!"
}
]
}
```
Up to 100 messages are returned. Each message has the same shape as a received `message` event.
- `systemMsg`: a system notification broadcast to all viewers, e.g. when a user is banned or unbanned.
- received by client:
```json
{
"type": "systemMsg",
"message": "username was banned."
}
```
- `moderationError`: sent to the acting client when a message or moderation action is rejected.
- received by client:
```json
{
"type": "moderationError",
"code": "RATE_LIMIT",
"message": "You are sending messages too fast.",
"restriction": null
}
```
`restriction` is only present (non-null) for `TIMED_OUT` and `BANNED` codes, and has the same shape as the `restriction` field in `chatAccess`. Possible codes:
| Code | Trigger |
| ------------------ | ---------------------------------------------- |
| `FORBIDDEN` | Not permitted to perform the action |
| `RATE_LIMIT` | Too many messages in the rate limit window |
| `SLOW_MODE` | Sent before the slow mode cooldown expired |
| `TIMED_OUT` | User is currently timed out |
| `BANNED` | User is permanently banned |
| `MESSAGE_TOO_LONG` | Message exceeds `maxMessageLength` |
| `BLOCKED_TERM` | Message contains a blocked term |
| `INVALID_TARGET` | Moderation target is invalid or does not exist |
| `INVALID_REQUEST` | Malformed moderation command |
| `NOT_FOUND` | Target message not found (delete) |
## Moderation commands
moderation commands are only available to authenticated users with the `canModerate` permission (`owner`, `manager`, `chatModerator`, `botModerator`, or platform admin). sending any of these without permission returns a `moderationError` with code `FORBIDDEN`.
obviously, role hierarchy is enforced: a `chatModerator` cannot moderate a `manager` or `owner`. Platform admins bypass hierarchy checks entirely.
- `mod:deleteMessage`: delete a message from the chat history and broadcast its removal.
- sent by client:
```json
{
"type": "mod:deleteMessage",
"msgId": "uuid-of-message-to-delete"
}
```
- received by all clients on success:
```json
{
"type": "messageDeleted",
"msgId": "uuid-of-message-to-delete"
}
```
- `mod:timeoutUser`: temporarily restrict a user from sending messages.
- sent by client:
```json
{
"type": "mod:timeoutUser",
"targetUserId": "user_id",
"durationSeconds": 300,
"reason": "Optional reason"
}
```
`durationSeconds` is clamped between 10 and 86400 (24 hours). Defaults to 300 if omitted. On success, a `systemMsg` is broadcast and the target receives a `chatAccess` update.
- `mod:banUser`: permanently ban a user from sending messages.
- sent by client:
```json
{
"type": "mod:banUser",
"targetUserId": "user_id",
"reason": "Optional reason"
}
```
On success, a `systemMsg` is broadcast and the target receives a `chatAccess` update.
- `mod:liftTimeout` / `mod:unbanUser`: remove an active timeout or ban.
- sent by client:
```json
{
"type": "mod:liftTimeout",
"targetUserId": "user_id"
}
```
Both types behave identically and remove any active restriction for the target user. On success, a `systemMsg` is broadcast and the target receives a `chatAccess` update with `canSend: true`.
## Emoji handling
_diagram source: devin deepwiki_
@@ -119,22 +290,29 @@ The server then checks Redis for the emoji URL and returns it.
When a user wants to look up an emoji (by typing `:(partial name)`), the server uses uFuzzy to find matching emojis in the Redis `emojis` hash key and returns the results.
<Aside type="caution">
`emojiMsg` and `emojiSearch` require an authenticated connection. They are not available to
grant-only (OBS) viewers.
</Aside>
Here's what gets sent on the websocket:
- `emojiMsg`: Looks up emojis
- sent by client:
```json
{
"type": "emojiMsg",
"emojis": ["aga", "yapa", "heavysob", "yay", "yay-bounce"]
}
```
- received by client:
```json
{
"type": "emojiMsgResponse",
"emojis": {
// rough example of urls
"aga": "https://emoji.slack-edge.com/aga.png",
"yapa": "https://emoji.slack-edge.com/yapa.png",
"heavysob": "https://emoji.slack-edge.com/heavysob.png",
@@ -143,20 +321,23 @@ Here's what gets sent on the websocket:
}
}
```
- `emojiSearch`: Searches for emojis
- sent by client:
```json
{
"type": "emojiSearch",
"searchTerm": "aga"
}
```
- received by client:
```json
{
"type": "emojiSearchResponse",
"results": [
// real results btw
"aga",
"aga-brick-throw",
"aga-dance",

View File

@@ -60,6 +60,7 @@
"nuqs": "^2.4.3",
"pg": "^8.14.1",
"pg-boss": "^10.1.6",
"prom-client": "^15.1.3",
"react": "^19.2.3",
"react-day-picker": "^9.13.0",
"react-dom": "^19.2.3",

View File

@@ -1,29 +1,39 @@
import { prisma, getRedisConnection } from '@hctv/db';
import { recordMediamtxAuth } from '@/lib/metrics';
import { NextRequest } from 'next/server';
import { z } from 'zod';
export async function POST(request: NextRequest) {
const startedAt = performance.now();
let action = 'invalid';
let protocol = 'invalid';
const finish = (body: string, status: number, outcome: string) => {
recordMediamtxAuth(action, protocol, outcome, (performance.now() - startedAt) / 1000);
return new Response(body, { status });
};
const redis = getRedisConnection();
const body = await request.json();
if (process.env.NODE_ENV !== 'production') {
console.log(
'Mediamtx publish auth request:',
JSON.stringify(body, null, 2)
);
};
console.log('Mediamtx publish auth request:', JSON.stringify(body, null, 2));
}
const parsed = schema.safeParse(body);
if (!parsed.success) {
return new Response('invalid request', { status: 400 });
return finish('invalid request', 400, 'invalid_request');
}
const { action, protocol, path, password } = parsed.data;
if (action === 'publish' && protocol === 'srt') {
const { action: parsedAction, protocol: parsedProtocol, path, password } = parsed.data;
action = parsedAction;
protocol = parsedProtocol;
if (parsedAction === 'publish' && parsedProtocol === 'srt') {
const channelKey = await redis.get(`streamKey:${path}`);
if (channelKey) {
if (channelKey !== password) {
return new Response('invalid stream key', { status: 403 });
return finish('invalid stream key', 403, 'invalid_stream_key');
}
const channel = await prisma.channel.findUnique({
@@ -38,39 +48,39 @@ export async function POST(request: NextRequest) {
});
if (channel?.restriction) {
const isExpired = channel.restriction.expiresAt &&
new Date(channel.restriction.expiresAt) < new Date();
const isExpired =
channel.restriction.expiresAt && new Date(channel.restriction.expiresAt) < new Date();
if (!isExpired) {
return new Response('channel restricted', { status: 403 });
return finish('channel restricted', 403, 'channel_restricted');
}
}
if (channel?.owner?.ban) {
const isExpired = channel.owner.ban.expiresAt &&
new Date(channel.owner.ban.expiresAt) < new Date();
const isExpired =
channel.owner.ban.expiresAt && new Date(channel.owner.ban.expiresAt) < new Date();
if (!isExpired) {
return new Response('user banned', { status: 403 });
return finish('user banned', 403, 'user_banned');
}
}
if (channel?.streamInfo[0].isLive) {
return new Response('stream already live', { status: 403 });
return finish('stream already live', 403, 'stream_already_live');
}
return new Response('youre in yay', { status: 200 });
return finish('youre in yay', 200, 'authorized_publish');
}
} else if (action === 'read' && protocol === 'hls') {
} else if (parsedAction === 'read' && parsedProtocol === 'hls') {
if (password === process.env.MEDIAMTX_PUBLISH_KEY) {
return new Response('authorized (hls read key for thumbs)', { status: 200 });
return finish('authorized (hls read key for thumbs)', 200, 'authorized_thumbnail');
}
const sessionExists = await redis.exists(`sessions:${password}`);
if (!sessionExists) {
return new Response('unauthorized', { status: 401 });
return finish('unauthorized', 401, 'unauthorized_session');
}
return new Response('authorized', { status: 200 });
return finish('authorized', 200, 'authorized_read');
}
return new Response('uhh', { status: 401 });
return finish('uhh', 401, 'unauthorized');
}
const schema = z.object({

View File

@@ -25,13 +25,14 @@ export async function GET(request: NextRequest) {
if (shouldGetOwned && user) {
channelConditions.push({ ownerId: user.id });
channelConditions.push({ managers: { some: { id: user.id } } });
}
if (allPersonalChannels) {
channelConditions.push({
personalFor: {
isNot: null
}
channelConditions.push({
personalFor: {
isNot: null,
},
});
}
@@ -40,9 +41,8 @@ export async function GET(request: NextRequest) {
}
if (channelConditions.length > 0) {
where.channel = channelConditions.length === 1
? channelConditions[0]
: { OR: channelConditions };
where.channel =
channelConditions.length === 1 ? channelConditions[0] : { OR: channelConditions };
}
const db = await prisma.streamInfo.findMany({
@@ -57,7 +57,7 @@ export async function GET(request: NextRequest) {
expiresAt: true,
},
},
}
},
},
},
});
@@ -71,7 +71,8 @@ export async function GET(request: NextRequest) {
delete obj.channel.obsChatGrantToken;
if (obj.channel.restriction) {
const isExpired = obj.channel.restriction.expiresAt &&
const isExpired =
obj.channel.restriction.expiresAt &&
new Date(obj.channel.restriction.expiresAt) < new Date();
if (isExpired) {
// @ts-ignore
@@ -88,4 +89,4 @@ export async function GET(request: NextRequest) {
});
return Response.json(db);
}
}

View File

@@ -61,6 +61,7 @@ import {
DialogTrigger,
} from '@/components/ui/dialog';
import { UserCombobox } from '@/components/app/UserCombobox/UserCombobox';
import { BotCombobox } from '@/components/app/BotCombobox/BotCombobox';
import { parseAsString, useQueryState } from 'nuqs';
import { Write } from '@/components/ui/channel-desc-fancy-area/write';
import { Preview } from '@/components/ui/channel-desc-fancy-area/preview';
@@ -89,7 +90,7 @@ interface ChannelSettingsClientProps {
chatModerators: User[];
chatModeratorPersonalChannels: (Channel | null)[];
chatModeratorBots: BotAccount[];
teamBotAccounts: BotAccount[];
allBotAccounts: Pick<BotAccount, 'id' | 'displayName' | 'slug' | 'pfpUrl'>[];
streamInfo: StreamInfo[];
streamKey: StreamKey | null;
chatSettings: ChatModerationSettings | null;
@@ -857,7 +858,7 @@ export default function ChannelSettingsClient({
/>
<AddChatBotModeratorDialog
channelId={channel.id}
teamBots={channel.teamBotAccounts}
botAccounts={channel.allBotAccounts}
existingBotModerators={channel.chatModeratorBots.map((bot) => bot.id)}
/>
</div>
@@ -1260,20 +1261,16 @@ function AddChatModeratorDialog({
function AddChatBotModeratorDialog({
channelId,
teamBots,
botAccounts,
existingBotModerators,
}: {
channelId: string;
teamBots: BotAccount[];
botAccounts: Pick<BotAccount, 'id' | 'displayName' | 'slug' | 'pfpUrl'>[];
existingBotModerators: string[];
}) {
const [open, setOpen] = useState(false);
const [selectedBotId, setSelectedBotId] = useState('');
const availableBots = teamBots.filter(
(botAccount) => !existingBotModerators.includes(botAccount.id)
);
return (
<Dialog open={open} onOpenChange={setOpen}>
<DialogTrigger asChild>
@@ -1286,21 +1283,16 @@ function AddChatBotModeratorDialog({
<DialogHeader>
<DialogTitle>Add bot moderator</DialogTitle>
<DialogDescription>
Bots can delete messages, timeout users, and ban users in chat.
Look up any bot account by name or slug to grant moderation powers.
</DialogDescription>
</DialogHeader>
<Select value={selectedBotId} onValueChange={setSelectedBotId}>
<SelectTrigger>
<SelectValue placeholder="Select bot" />
</SelectTrigger>
<SelectContent>
{availableBots.map((botAccount) => (
<SelectItem key={botAccount.id} value={botAccount.id}>
{botAccount.displayName} (@{botAccount.slug})
</SelectItem>
))}
</SelectContent>
</Select>
<BotCombobox
bots={botAccounts}
filter={existingBotModerators}
value={selectedBotId}
onValueChange={setSelectedBotId}
modal
/>
<DialogFooter>
<Button
disabled={!selectedBotId}

View File

@@ -65,12 +65,12 @@ export default async function ChannelSettingsPage({
const followerPersonalChannels = await Promise.all(
channel.followers.map((follower) => resolvePersonalChannel(follower.user.id))
);
const teamMemberIds = [channel.ownerId, ...channel.managers.map((manager) => manager.id)];
const teamBotAccounts = await prisma.botAccount.findMany({
where: {
ownerId: {
in: teamMemberIds,
},
const allBotAccounts = await prisma.botAccount.findMany({
select: {
id: true,
displayName: true,
slug: true,
pfpUrl: true,
},
orderBy: {
slug: 'asc',
@@ -86,7 +86,7 @@ export default async function ChannelSettingsPage({
managerPersonalChannels,
chatModeratorPersonalChannels,
followerPersonalChannels,
teamBotAccounts,
allBotAccounts,
}}
isOwner={isOwner}
currentUser={user}

View File

@@ -59,10 +59,10 @@ export default async function RootLayout({
<StreamInfoProvider>
{/* this promise is ugly but i'm lazy to fix the type errors */}
<Navbar editLivestream={Promise.resolve(<EditLivestream />)} />
<div className="flex flex-1 pt-16">
<div className="flex flex-1 pt-16 min-h-0 min-w-0">
{/* pt-16 for navbar height */}
<Sidebar className="pt-16" />
<main className="flex-1 overflow-auto">{children}</main>
<main className="flex-1 min-w-0">{children}</main>
</div>
<Toaster />
</StreamInfoProvider>

View File

@@ -0,0 +1,58 @@
import { webMetricsRegistry } from '@/lib/metrics';
import { NextRequest, NextResponse } from 'next/server';
export const runtime = 'nodejs';
export const dynamic = 'force-dynamic';
export async function GET(req: NextRequest) {
if (process.env.NODE_ENV === 'production' && !isAuthenticated(req)) {
return new NextResponse('Authentication required', {
status: 401,
headers: { 'WWW-Authenticate': 'Basic' },
});
}
return new Response(await webMetricsRegistry.metrics(), {
headers: {
'Content-Type': webMetricsRegistry.contentType,
'Cache-Control': 'no-store, no-cache, must-revalidate, proxy-revalidate',
},
});
}
// source: https://vancelucas.com/blog/how-to-add-http-basic-auth-to-next-js/
function isAuthenticated(req: NextRequest) {
const authheader = req.headers.get('authorization') ?? req.headers.get('Authorization');
if (!authheader) {
return false;
}
const parts = authheader.split(' ');
if (parts.length !== 2) {
return false;
}
const scheme = parts[0];
const encoded = parts[1];
if (scheme !== 'Basic' || !encoded) {
return false;
}
let decoded: string;
try {
decoded = Buffer.from(encoded, 'base64').toString();
} catch {
return false;
}
const separatorIndex = decoded.indexOf(':');
if (separatorIndex === -1) {
return false;
}
const user = decoded.substring(0, separatorIndex);
const pass = decoded.substring(separatorIndex + 1);
return user === process.env.METRICS_USER && pass === process.env.METRICS_PASS;
}

View File

@@ -0,0 +1,103 @@
'use client';
import * as React from 'react';
import { Check, ChevronsUpDown } from 'lucide-react';
import type { BotAccount } from '@hctv/db';
import { Avatar, AvatarFallback, AvatarImage } from '@/components/ui/avatar';
import { Button } from '@/components/ui/button';
import {
Command,
CommandEmpty,
CommandGroup,
CommandInput,
CommandItem,
CommandList,
} from '@/components/ui/command';
import { Popover, PopoverContent, PopoverTrigger } from '@/components/ui/popover';
import { cn } from '@/lib/utils';
export function BotCombobox({ bots, filter, value, modal, onValueChange }: Props) {
const [open, setOpen] = React.useState(false);
const selectedBot = bots.find((bot) => bot.id === value);
const availableBots = bots.filter((bot) => !filter?.includes(bot.id));
return (
<Popover open={open} onOpenChange={setOpen} modal={modal}>
<PopoverTrigger asChild>
<Button
variant="outline"
role="combobox"
aria-expanded={open}
className="w-full justify-between"
>
{selectedBot ? (
<div className="flex items-center gap-2 overflow-hidden">
<Avatar className="h-8 w-8 shrink-0">
<AvatarImage
src={selectedBot.pfpUrl}
alt={selectedBot.displayName}
loading="lazy"
decoding="async"
/>
<AvatarFallback>{selectedBot.displayName[0]?.toUpperCase()}</AvatarFallback>
</Avatar>
<span className="truncate">{selectedBot.displayName}</span>
</div>
) : (
'Select bot...'
)}
<ChevronsUpDown className="opacity-50" />
</Button>
</PopoverTrigger>
<PopoverContent className="w-[var(--radix-popover-trigger-width)] p-0">
<Command>
<CommandInput placeholder="Search bot..." className="h-9" />
<CommandList>
<CommandEmpty>No bot found.</CommandEmpty>
<CommandGroup>
{availableBots.map((bot) => (
<CommandItem
key={bot.id}
value={bot.id}
onSelect={(currentValue) => {
onValueChange(currentValue === value ? '' : currentValue);
setOpen(false);
}}
>
<Avatar className="h-8 w-8">
<AvatarImage
src={bot.pfpUrl}
alt={bot.displayName}
loading="lazy"
decoding="async"
/>
<AvatarFallback>{bot.displayName[0]?.toUpperCase()}</AvatarFallback>
</Avatar>
<div className="flex flex-col">
<span>{bot.displayName}</span>
<span className="text-xs text-mantle-foreground">@{bot.slug}</span>
</div>
<Check
className={cn('ml-auto', value === bot.id ? 'opacity-100' : 'opacity-0')}
/>
</CommandItem>
))}
</CommandGroup>
</CommandList>
</Command>
</PopoverContent>
</Popover>
);
}
type BotLookupAccount = Pick<BotAccount, 'id' | 'displayName' | 'slug' | 'pfpUrl'>;
type Props = {
bots: BotLookupAccount[];
filter?: string[];
value: string;
modal?: boolean;
onValueChange: (value: string) => void;
};

View File

@@ -24,7 +24,7 @@ export default function StreamGrid({ liveStreams, offlineStreams }: StreamGridPr
const sortedLiveStreams = [...liveStreams].sort((a, b) => b.viewers - a.viewers);
return (
<div className="space-y-8 md:space-y-10">
<div className="space-y-8 md:space-y-10 min-w-0">
{sortedLiveStreams.length === 0 && (
<div className="flex flex-col items-center gap-4 py-10 text-center">
<ConfusedDino className="h-24 w-24 opacity-70" />
@@ -53,16 +53,13 @@ export default function StreamGrid({ liveStreams, offlineStreams }: StreamGridPr
)}
{offlineStreams.length > 0 && (
<section>
<section className="w-full min-w-0">
<SectionHeading label="Offline channels" count={offlineStreams.length} />
<div className="relative">
<Carousel opts={{ align: 'start', dragFree: true }}>
<CarouselContent>
<div className="px-10">
<Carousel className="w-full max-w-full" opts={{ dragFree: true, containScroll: 'trimSnaps' }}>
<CarouselContent className="-ml-2">
{offlineStreams.map((stream) => (
<CarouselItem
key={stream.id}
className="flex basis-[74px] justify-center sm:basis-[82px] md:basis-[90px] lg:basis-[100px]"
>
<CarouselItem key={stream.id} className="basis-auto pl-2 md:pl-3">
<OfflineCard stream={stream} />
</CarouselItem>
))}
@@ -86,6 +83,8 @@ function StreamCard({ stream }: { stream: StreamWithChannel }) {
src={`/api/stream/thumb/${stream.channel.name}`}
alt={stream.title}
className="absolute inset-0 object-cover"
loading="lazy"
decoding="async"
/>
<div className="absolute inset-0 bg-gradient-to-t from-black/60 via-transparent to-transparent" />
<div className="absolute bottom-1.5 left-1.5 md:bottom-2 md:left-2">
@@ -97,7 +96,12 @@ function StreamCard({ stream }: { stream: StreamWithChannel }) {
</div>
<div className="flex items-start gap-2 p-2 md:gap-3 md:p-3">
<Avatar className="h-7 w-7 shrink-0 ring-1 ring-primary/20 md:h-8 md:w-8">
<AvatarImage src={stream.channel.pfpUrl} alt={stream.channel.name} />
<AvatarImage
src={stream.channel.pfpUrl}
alt={stream.channel.name}
loading="lazy"
decoding="async"
/>
<AvatarFallback className="text-[10px]">
{stream.channel.name.slice(0, 2).toUpperCase()}
</AvatarFallback>
@@ -124,11 +128,16 @@ function StreamCard({ stream }: { stream: StreamWithChannel }) {
function OfflineCard({ stream }: { stream: StreamWithChannel }) {
return (
<Link href={`/${stream.username}`} className="group inline-flex">
<Link href={`/${stream.username}`} className="group inline-flex w-[70px]">
<div className="flex w-[70px] flex-col items-center gap-1 rounded-lg p-1.5 transition-colors duration-150 hover:bg-muted/50 sm:w-[78px] md:w-[86px] md:gap-1.5 md:p-2">
<div className="relative">
<Avatar className="h-9 w-9 ring-2 ring-border transition-colors duration-150 group-hover:ring-border/60 sm:h-10 sm:w-10 md:h-11 md:w-11">
<AvatarImage src={stream.channel.pfpUrl} alt={stream.channel.name} />
<AvatarImage
src={stream.channel.pfpUrl}
alt={stream.channel.name}
loading="lazy"
decoding="async"
/>
<AvatarFallback className="text-xs font-semibold">
{stream.channel.name.slice(0, 2).toUpperCase()}
</AvatarFallback>

View File

@@ -20,7 +20,7 @@ export default function StreamPlayer() {
const { username } = useParams();
const { session } = useSession();
const { streamInfo: userInfo } = useUserStreamInfo(username!.toString());
const videoRef = useRef(null);
useEffect(() => {
@@ -38,7 +38,7 @@ export default function StreamPlayer() {
debug: process.env.NODE_ENV === 'development',
backBufferLength: 90,
enableWorker: true,
maxLiveSyncPlaybackRate: 1.5,
maxLiveSyncPlaybackRate: 1,
liveSyncDurationCount: 2,
liveMaxLatencyDurationCount: 4,
};
@@ -57,12 +57,7 @@ export default function StreamPlayer() {
return (
<MediaController className="w-full aspect-video">
<HlsVideo
ref={videoRef}
slot="media"
crossOrigin="anonymous"
autoplay
/>
<HlsVideo ref={videoRef} slot="media" crossOrigin="anonymous" autoplay />
<MediaLoadingIndicator slot="centered-chrome" noAutohide />
<MediaControlBar className="w-full px-2">
<div className="flex items-center gap-2">

View File

@@ -22,7 +22,7 @@ import { Avatar, AvatarFallback, AvatarImage } from '@/components/ui/avatar';
export function UserCombobox(props: Props) {
const [open, setOpen] = React.useState(false);
const [internalValue, setInternalValue] = React.useState('');
// Use external value if provided, otherwise use internal state
const value = props.value ?? internalValue;
const setValue = props.onValueChange ?? setInternalValue;
@@ -30,10 +30,7 @@ export function UserCombobox(props: Props) {
data: fetchedUsers,
error,
isLoading,
} = useSWR<APIResponse>(
props.users ? null : '/api/stream/info?personal=true',
fetcher
);
} = useSWR<APIResponse>(props.users ? null : '/api/stream/info?personal=true', fetcher);
const users = props.users || fetchedUsers;
@@ -48,17 +45,22 @@ export function UserCombobox(props: Props) {
aria-expanded={open}
className="w-[200px] justify-between"
>
{value
? (
<div className='flex items-center gap-2'>
<Avatar className="h-8 w-8">
<AvatarImage src={users?.find((user) => user.username === value)?.channel.pfpUrl} alt={value} />
<AvatarFallback>{value[0]}</AvatarFallback>
</Avatar>
<span>{users?.find((user) => user.username === value)?.username}</span>
</div>
)
: 'Select user...'}
{value ? (
<div className="flex items-center gap-2">
<Avatar className="h-8 w-8">
<AvatarImage
src={users?.find((user) => user.username === value)?.channel.pfpUrl}
alt={value}
loading="lazy"
decoding="async"
/>
<AvatarFallback>{value[0]}</AvatarFallback>
</Avatar>
<span>{users?.find((user) => user.username === value)?.username}</span>
</div>
) : (
'Select user...'
)}
<ChevronsUpDown className="opacity-50" />
</Button>
</PopoverTrigger>
@@ -68,28 +70,35 @@ export function UserCombobox(props: Props) {
<CommandList>
<CommandEmpty>No user found.</CommandEmpty>
<CommandGroup>
{users?.filter(user => !props.filter?.some(filterStr => user.userId === filterStr)).map((user) => (
<CommandItem
key={user.channelId}
value={user.username}
onSelect={(currentValue) => {
setValue(currentValue === value ? '' : currentValue);
setOpen(false);
}}
>
<Avatar className="h-8 w-8">
<AvatarImage src={user.channel.pfpUrl} alt={user.username} />
<AvatarFallback>{user.username[0]}</AvatarFallback>
</Avatar>
{user.username}
<Check
className={cn(
'ml-auto',
value === user.username ? 'opacity-100' : 'opacity-0'
)}
/>
</CommandItem>
))}
{users
?.filter((user) => !props.filter?.some((filterStr) => user.userId === filterStr))
.map((user) => (
<CommandItem
key={user.channelId}
value={user.username}
onSelect={(currentValue) => {
setValue(currentValue === value ? '' : currentValue);
setOpen(false);
}}
>
<Avatar className="h-8 w-8">
<AvatarImage
src={user.channel.pfpUrl}
alt={user.username}
loading="lazy"
decoding="async"
/>
<AvatarFallback>{user.username[0]}</AvatarFallback>
</Avatar>
{user.username}
<Check
className={cn(
'ml-auto',
value === user.username ? 'opacity-100' : 'opacity-0'
)}
/>
</CommandItem>
))}
</CommandGroup>
</CommandList>
</Command>
@@ -105,4 +114,4 @@ type Props = {
filter?: string[];
modal?: boolean;
onValueChange?: (value: string) => void;
}
};

View File

@@ -373,7 +373,7 @@ export async function addChatBotModerator(channelId: string, botId: string) {
const bot = await prisma.botAccount.findUnique({
where: { id: botId },
select: { id: true, ownerId: true },
select: { id: true },
});
if (!bot) {
@@ -384,14 +384,6 @@ export async function addChatBotModerator(channelId: string, botId: string) {
return { success: false, error: 'Bot is already a chat moderator' };
}
const canUseBot =
bot.ownerId === channel.ownerId ||
channel.managers.some((manager) => manager.id === bot.ownerId);
if (!canUseBot) {
return { success: false, error: 'Bot owner must be a channel manager or owner' };
}
await prisma.channel.update({
where: { id: channelId },
data: {

View File

@@ -1,13 +1,6 @@
import { z } from 'zod';
const disallowedUsernames = [
'admin',
'administrator',
'settings',
'create',
// i hope this doesn't age well tbh
'zrl',
];
const disallowedUsernames = ['admin', 'administrator', 'settings', 'create'];
const username = z
.string()
.min(1)

View File

@@ -1,23 +1,30 @@
import { prisma } from "@hctv/db";
import { getThumbnailQueue } from "../workers";
import { prisma } from '@hctv/db';
import { recordThumbnailJobsEnqueued, setThumbnailRefreshTargets, trackWebJob } from '../metrics';
import { getThumbnailQueue } from '../workers';
export default async function getLiveThumb() {
const liveChannels = await prisma.streamInfo.findMany({
where: {
isLive: true,
},
include: {
channel: true,
}
});
const liveChannelNames = liveChannels.map((channel) => channel.channel.name);
const thumbQueue = getThumbnailQueue();
for (const channel of liveChannelNames) {
const lc = liveChannels.find(c => c.channel.name === channel)!;
await thumbQueue.add("getLiveThumb", {
name: channel,
server: lc.streamRegion,
return trackWebJob('thumbnail_refresh', async () => {
const liveChannels = await prisma.streamInfo.findMany({
where: {
isLive: true,
},
include: {
channel: true,
},
});
}
}
const thumbQueue = getThumbnailQueue();
const jobsByRegion: Record<string, number> = {};
setThumbnailRefreshTargets(liveChannels.length);
for (const liveChannel of liveChannels) {
await thumbQueue.add('getLiveThumb', {
name: liveChannel.channel.name,
server: liveChannel.streamRegion,
});
jobsByRegion[liveChannel.streamRegion] = (jobsByRegion[liveChannel.streamRegion] ?? 0) + 1;
}
recordThumbnailJobsEnqueued(jobsByRegion);
});
}

View File

@@ -1,4 +1,13 @@
import { prisma } from '@hctv/db';
import {
recordLiveStreamTransition,
recordNotificationsEnqueued,
recordStreamSyncScrape,
setLiveStreamsByRegion,
setPlatformInventory,
setStreamPathsByRegion,
trackWebJob,
} from '../metrics';
import { HttpFlv } from '../types/liveBackendJson';
import { getNotificationQueue } from '../workers';
import client from '../services/slackNotifier';
@@ -10,9 +19,29 @@ export default async function runner() {
if ((await prisma.user.count()) === 0) {
return;
}
await refreshPlatformInventory();
await initializeStreamInfo();
await syncStream();
setInterval(syncStream, 5000);
setInterval(refreshPlatformInventory, 60_000);
}
async function refreshPlatformInventory() {
const [channels, liveStreams, follows, botAccounts, users] = await Promise.all([
prisma.channel.count(),
prisma.streamInfo.count({ where: { isLive: true } }),
prisma.follow.count(),
prisma.botAccount.count(),
prisma.user.count(),
]);
setPlatformInventory({
bot_accounts: botAccounts,
channels,
follows,
live_stream_rows: liveStreams,
users,
});
}
export async function initializeStreamInfo(channelId?: string) {
@@ -50,103 +79,122 @@ export async function initializeStreamInfo(channelId?: string) {
export async function syncStream() {
try {
const regions = Object.keys(MEDIAMTX_SERVER_REGIONS) as Array<
keyof typeof MEDIAMTX_SERVER_REGIONS
>;
await trackWebJob('stream_sync', async () => {
const regions = Object.keys(MEDIAMTX_SERVER_REGIONS) as Array<
keyof typeof MEDIAMTX_SERVER_REGIONS
>;
const allActiveStreams = new Map<string, keyof typeof MEDIAMTX_SERVER_REGIONS>();
const allActiveStreams = new Map<string, keyof typeof MEDIAMTX_SERVER_REGIONS>();
const liveStreamsByRegion = Object.fromEntries(regions.map((region) => [region, 0]));
const pathsSeenByRegion = Object.fromEntries(regions.map((region) => [region, 0]));
for (const r of regions) {
const region = MEDIAMTX_SERVER_REGIONS[r];
const response = await fetch(`${region.apiUrl}/v3/paths/list?itemsPerPage=1000`);
for (const r of regions) {
const region = MEDIAMTX_SERVER_REGIONS[r];
const response = await fetch(`${region.apiUrl}/v3/paths/list?itemsPerPage=1000`);
if (!response.ok) {
console.error(
`Failed to fetch ${r} stream stats: ${response.status} ${response.statusText}`
);
continue;
}
if (!response.ok) {
recordStreamSyncScrape(r, 'error');
console.error(
`Failed to fetch ${r} stream stats: ${response.status} ${response.statusText}`
);
continue;
}
type ResponseType =
paths['/v3/paths/list']['get']['responses']['200']['content']['application/json'];
const data = (await response.json()) as ResponseType;
recordStreamSyncScrape(r, 'success');
if (data?.items) {
for (const stream of data.items) {
if (stream.ready && stream.name) {
allActiveStreams.set(stream.name, r);
type ResponseType =
paths['/v3/paths/list']['get']['responses']['200']['content']['application/json'];
const data = (await response.json()) as ResponseType;
if (data?.items) {
for (const stream of data.items) {
if (stream.ready && stream.name) {
allActiveStreams.set(stream.name, r);
liveStreamsByRegion[r] += 1;
pathsSeenByRegion[r] += 1;
}
}
}
}
}
// handle streams going offline
const currentLiveStreams = await prisma.streamInfo.findMany({
where: { isLive: true },
});
setLiveStreamsByRegion(liveStreamsByRegion);
setStreamPathsByRegion(pathsSeenByRegion);
for (const dbStream of currentLiveStreams) {
if (!allActiveStreams.has(dbStream.username)) {
await prisma.streamInfo.update({
where: { username: dbStream.username },
data: {
isLive: false,
viewers: 0,
startedAt: new Date(0),
},
});
}
}
// handle streams going online
for (const [username, regionKey] of allActiveStreams) {
const existingStream = await prisma.streamInfo.findUnique({
where: { username },
include: { channel: true },
const currentLiveStreams = await prisma.streamInfo.findMany({
where: { isLive: true },
});
if (existingStream && !existingStream.isLive) {
console.log(`Stream ${username} is now live in region ${regionKey}`);
await prisma.streamInfo.update({
where: { username },
data: {
isLive: true,
startedAt: new Date(),
streamRegion: regionKey,
},
});
const subscribedFollowers = await prisma.follow.findMany({
where: {
channelId: existingStream.channelId,
notifyStream: true,
},
include: {
user: true,
},
});
const queue = getNotificationQueue();
if (!existingStream.channel.is247) {
queue.add(`streamStartChannel:${existingStream.username}`, {
text: `${existingStream.username} is now *live*, streaming *${existingStream.title}* (${existingStream.category})!\n<https://hackclub.tv/${existingStream.username}|Go check them out>`,
channel: process.env.NOTIFICATION_CHANNEL_ID!,
unfurl_links: true,
for (const dbStream of currentLiveStreams) {
if (!allActiveStreams.has(dbStream.username)) {
recordLiveStreamTransition('offline', dbStream.streamRegion);
await prisma.streamInfo.update({
where: { username: dbStream.username },
data: {
isLive: false,
viewers: 0,
startedAt: new Date(0),
},
});
}
}
if (existingStream.enableNotifications && !existingStream.channel.is247) {
for (const follower of subscribedFollowers) {
queue.add(`streamStartDm:${follower.user.id}`, {
text: `${existingStream.username} is now *live*, streaming *${existingStream.title}* (${existingStream.category})!\n<https://hackclub.tv/${existingStream.username}|Go check them out>\n_Stream notifications are enabled for this user. If you want to disable them, you can do so in \`Profile > Follows\`._`,
channel: follower.user.slack_id,
for (const [username, regionKey] of allActiveStreams) {
const existingStream = await prisma.streamInfo.findUnique({
where: { username },
include: { channel: true },
});
if (existingStream && !existingStream.isLive) {
console.log(`Stream ${username} is now live in region ${regionKey}`);
recordLiveStreamTransition('online', regionKey);
await prisma.streamInfo.update({
where: { username },
data: {
isLive: true,
startedAt: new Date(),
streamRegion: regionKey,
},
});
const subscribedFollowers = await prisma.follow.findMany({
where: {
channelId: existingStream.channelId,
notifyStream: true,
},
include: {
user: true,
},
});
const queue = getNotificationQueue();
if (!existingStream.channel.is247) {
queue.add(`streamStartChannel:${existingStream.username}`, {
text: `${existingStream.username} is now *live*, streaming *${existingStream.title}* (${existingStream.category})!\n<https://hackclub.tv/${existingStream.username}|Go check them out>`,
channel: process.env.NOTIFICATION_CHANNEL_ID!,
unfurl_links: true,
});
}
if (existingStream.enableNotifications && !existingStream.channel.is247) {
for (const follower of subscribedFollowers) {
queue.add(`streamStartDm:${follower.user.id}`, {
text: `${existingStream.username} is now *live*, streaming *${existingStream.title}* (${existingStream.category})!\n<https://hackclub.tv/${existingStream.username}|Go check them out>\n_Stream notifications are enabled for this user. If you want to disable them, you can do so in \`Profile > Follows\`._`,
channel: follower.user.slack_id,
unfurl_links: true,
});
}
}
recordNotificationsEnqueued('channel', existingStream.channel.is247 ? 0 : 1);
recordNotificationsEnqueued(
'dm',
existingStream.enableNotifications && !existingStream.channel.is247
? subscribedFollowers.length
: 0
);
}
}
}
});
} catch (error) {
console.error('Error syncing stream status:', error);
}

View File

@@ -1,30 +1,37 @@
import { prisma, getRedisConnection } from '@hctv/db';
import { setCacheEntryCount, trackWebJob } from '../metrics';
export default async function syncStreamKeys() {
console.log('Syncing stream keys to Redis...');
try {
const keys = await prisma.streamKey.findMany({
include: {
channel: true,
},
});
await trackWebJob('sync_stream_keys', async () => {
const keys = await prisma.streamKey.findMany({
include: {
channel: true,
},
});
if (keys.length === 0) {
console.log('No stream keys found to sync.');
return;
}
const redis = getRedisConnection();
const pipeline = redis.pipeline();
for (const key of keys) {
if (key.channel && key.channel.name) {
pipeline.set(`streamKey:${key.channel.name}`, key.key);
if (keys.length === 0) {
setCacheEntryCount('stream_keys', 0);
console.log('No stream keys found to sync.');
return;
}
}
await pipeline.exec();
console.log(`Synced ${keys.length} stream keys to Redis`);
const redis = getRedisConnection();
const pipeline = redis.pipeline();
let syncedKeyCount = 0;
for (const key of keys) {
if (key.channel && key.channel.name) {
pipeline.set(`streamKey:${key.channel.name}`, key.key);
syncedKeyCount += 1;
}
}
await pipeline.exec();
setCacheEntryCount('stream_keys', syncedKeyCount);
console.log(`Synced ${syncedKeyCount} stream keys to Redis`);
});
} catch (error) {
console.error('Failed to sync stream keys to Redis:', error);
}

View File

@@ -1,40 +1,101 @@
import { getRedisConnection, prisma } from "@hctv/db";
import { getRedisConnection, prisma } from '@hctv/db';
import { setViewerSnapshot, trackWebJob } from '../metrics';
async function countViewersForChannel(channelName: string): Promise<number> {
const redis = getRedisConnection();
let cursor = '0';
let total = 0;
do {
const [nextCursor, keys] = await redis.scan(
cursor,
'MATCH',
`viewer:${channelName}:*`,
'COUNT',
200
);
cursor = nextCursor;
total += keys.length;
} while (cursor !== '0');
return total;
}
export async function viewerCountSync() {
const streams = await prisma.streamInfo.findMany({
where: {
isLive: true
},
include: {
channel: true
}
})
if (streams.length === 0) {
return;
}
const redis = getRedisConnection();
const multi = redis.multi();
for (const stream of streams) {
multi.keys(`viewer:${stream.channel.name}:*`);
}
const results = await multi.exec();
await prisma.$transaction(async (tx) => {
const updates = results?.map((res, index) => {
const count = Array.isArray(res[1]) ? res[1].length : 0;
const stream = streams[index];
return tx.streamInfo.update({
try {
await trackWebJob('viewer_count_sync', async () => {
const streams = await prisma.streamInfo.findMany({
where: {
// using username here because it uses a map
username: stream.username
isLive: true,
},
data: {
viewers: count
select: {
username: true,
streamRegion: true,
channel: {
select: {
name: true,
},
},
},
});
if (streams.length === 0) {
setViewerSnapshot({
totalViewers: 0,
trackedStreams: 0,
streamsWithViewers: 0,
hottestStreamViewers: 0,
viewersByRegion: {},
});
return;
}
const viewersByRegion: Record<string, number> = {};
let totalViewers = 0;
let streamsWithViewers = 0;
let hottestStreamViewers = 0;
const streamCounts = await Promise.all(
streams.map(async (stream) => ({
stream,
count: await countViewersForChannel(stream.channel.name),
}))
);
for (const { stream, count } of streamCounts) {
totalViewers += count;
if (stream.streamRegion) {
viewersByRegion[stream.streamRegion] =
(viewersByRegion[stream.streamRegion] ?? 0) + count;
}
})
})
await Promise.all(updates || []);
})
}
if (count > 0) {
streamsWithViewers += 1;
}
if (count > hottestStreamViewers) {
hottestStreamViewers = count;
}
await prisma.streamInfo.update({
where: {
username: stream.username,
},
data: {
viewers: count,
},
});
}
setViewerSnapshot({
totalViewers,
trackedStreams: streams.length,
streamsWithViewers,
hottestStreamViewers,
viewersByRegion,
});
});
} catch (error) {
console.error('Error syncing viewer counts:', error);
}
}

View File

@@ -1,17 +1,35 @@
import { prisma } from "@hctv/db";
import { getRedisConnection } from "@hctv/db";
import { getRedisConnection, prisma } from '@hctv/db';
import { setCacheEntryCount, trackWebJob } from '../metrics';
async function deleteSessionKeys() {
const redis = getRedisConnection();
let cursor = '0';
do {
const [nextCursor, keys] = await redis.scan(cursor, 'MATCH', 'sessions:*', 'COUNT', 200);
cursor = nextCursor;
if (keys.length > 0) {
await redis.unlink(...keys);
}
} while (cursor !== '0');
}
export default async function writeSessions() {
const sessions = await prisma.session.findMany();
const sessionIds = sessions.map((session) => session.id);
const redis = getRedisConnection();
const multi = redis.multi();
multi.del('sessions:*')
for (const sessionId of sessionIds) {
multi.set(`sessions:${sessionId}`, '');
}
await multi.exec();
return trackWebJob('write_sessions', async () => {
const sessions = await prisma.session.findMany();
const sessionIds = sessions.map((session) => session.id);
console.log("Sessions written to Redis");
}
await deleteSessionKeys();
const redis = getRedisConnection();
const multi = redis.multi();
for (const sessionId of sessionIds) {
multi.set(`sessions:${sessionId}`, '');
}
await multi.exec();
setCacheEntryCount('sessions', sessionIds.length);
console.log('Sessions written to Redis');
});
}

261
apps/web/src/lib/metrics.ts Normal file
View File

@@ -0,0 +1,261 @@
import { collectDefaultMetrics, Counter, Gauge, Histogram, Registry } from 'prom-client';
function createMetricsStore() {
const register = new Registry();
register.setDefaultLabels({ app: 'web' });
collectDefaultMetrics({
prefix: 'hctv_web_',
register,
});
const backgroundJobRuns = new Counter({
name: 'hctv_web_background_job_runs_total',
help: 'Total number of background jobs run by the web app.',
labelNames: ['job', 'status'],
registers: [register],
});
const backgroundJobDuration = new Histogram({
name: 'hctv_web_background_job_duration_seconds',
help: 'Background job execution time in seconds.',
labelNames: ['job', 'status'],
buckets: [0.01, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30],
registers: [register],
});
const liveStreams = new Gauge({
name: 'hctv_web_live_streams',
help: 'Current number of live streams grouped by MediaMTX region.',
labelNames: ['region'],
registers: [register],
});
const streamPathsSeen = new Gauge({
name: 'hctv_web_stream_paths_seen',
help: 'Current number of ready MediaMTX paths seen during the latest sync.',
labelNames: ['region'],
registers: [register],
});
const liveStreamTransitions = new Counter({
name: 'hctv_web_live_stream_transitions_total',
help: 'Live stream state transitions observed by the web app.',
labelNames: ['transition', 'region'],
registers: [register],
});
const streamSyncScrapes = new Counter({
name: 'hctv_web_stream_sync_scrapes_total',
help: 'MediaMTX region scrapes attempted by stream sync.',
labelNames: ['region', 'status'],
registers: [register],
});
const activeViewers = new Gauge({
name: 'hctv_web_active_viewers',
help: 'Current number of active viewers across all live streams.',
registers: [register],
});
const activeViewersByRegion = new Gauge({
name: 'hctv_web_active_viewers_by_region',
help: 'Current number of active viewers grouped by stream region.',
labelNames: ['region'],
registers: [register],
});
const viewerCountTrackedStreams = new Gauge({
name: 'hctv_web_viewer_count_tracked_streams',
help: 'Number of live streams included in the latest viewer sync.',
registers: [register],
});
const streamsWithViewers = new Gauge({
name: 'hctv_web_streams_with_viewers',
help: 'Current number of live streams with at least one viewer.',
registers: [register],
});
const hottestStreamViewers = new Gauge({
name: 'hctv_web_hottest_stream_viewers',
help: 'Current viewer count of the most watched live stream.',
registers: [register],
});
const thumbnailJobsEnqueued = new Counter({
name: 'hctv_web_thumbnail_jobs_enqueued_total',
help: 'Total thumbnail refresh jobs enqueued by region.',
labelNames: ['region'],
registers: [register],
});
const thumbnailRefreshTargets = new Gauge({
name: 'hctv_web_thumbnail_refresh_targets',
help: 'Number of live streams targeted in the latest thumbnail refresh run.',
registers: [register],
});
const notificationsEnqueued = new Counter({
name: 'hctv_web_notifications_enqueued_total',
help: 'Notification jobs enqueued when streams go live.',
labelNames: ['target'],
registers: [register],
});
const cacheEntries = new Gauge({
name: 'hctv_web_cache_entries',
help: 'Current number of records mirrored into Redis by cache-sync jobs.',
labelNames: ['cache'],
registers: [register],
});
const platformInventory = new Gauge({
name: 'hctv_web_platform_inventory',
help: 'High-level counts of important platform records.',
labelNames: ['entity'],
registers: [register],
});
const mediamtxAuthRequests = new Counter({
name: 'hctv_web_mediamtx_auth_requests_total',
help: 'Total MediaMTX auth decisions handled by the web app.',
labelNames: ['action', 'protocol', 'outcome'],
registers: [register],
});
const mediamtxAuthDuration = new Histogram({
name: 'hctv_web_mediamtx_auth_duration_seconds',
help: 'MediaMTX auth request duration in seconds.',
labelNames: ['action', 'protocol', 'outcome'],
buckets: [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5],
registers: [register],
});
return {
register,
activeViewers,
activeViewersByRegion,
backgroundJobDuration,
backgroundJobRuns,
cacheEntries,
hottestStreamViewers,
liveStreams,
liveStreamTransitions,
mediamtxAuthDuration,
mediamtxAuthRequests,
notificationsEnqueued,
platformInventory,
streamPathsSeen,
streamsWithViewers,
streamSyncScrapes,
thumbnailRefreshTargets,
thumbnailJobsEnqueued,
viewerCountTrackedStreams,
};
}
const globalForMetrics = globalThis as typeof globalThis & {
__hctvWebMetrics?: ReturnType<typeof createMetricsStore>;
};
const metrics = (globalForMetrics.__hctvWebMetrics ??= createMetricsStore());
export const webMetricsRegistry = metrics.register;
export async function trackWebJob<T>(job: string, fn: () => Promise<T>): Promise<T> {
const stopTimer = metrics.backgroundJobDuration.startTimer({ job });
let status = 'success';
try {
return await fn();
} catch (error) {
status = 'error';
throw error;
} finally {
metrics.backgroundJobRuns.inc({ job, status });
stopTimer({ job, status });
}
}
export function setLiveStreamsByRegion(streamsByRegion: Record<string, number>): void {
metrics.liveStreams.reset();
for (const [region, count] of Object.entries(streamsByRegion)) {
metrics.liveStreams.set({ region }, count);
}
}
export function setStreamPathsByRegion(pathsByRegion: Record<string, number>): void {
metrics.streamPathsSeen.reset();
for (const [region, count] of Object.entries(pathsByRegion)) {
metrics.streamPathsSeen.set({ region }, count);
}
}
export function recordLiveStreamTransition(transition: 'online' | 'offline', region: string): void {
metrics.liveStreamTransitions.inc({ transition, region });
}
export function recordStreamSyncScrape(region: string, status: 'success' | 'error'): void {
metrics.streamSyncScrapes.inc({ region, status });
}
export function setViewerSnapshot(snapshot: {
totalViewers: number;
trackedStreams: number;
viewersByRegion: Record<string, number>;
streamsWithViewers: number;
hottestStreamViewers: number;
}): void {
metrics.activeViewers.set(snapshot.totalViewers);
metrics.viewerCountTrackedStreams.set(snapshot.trackedStreams);
metrics.streamsWithViewers.set(snapshot.streamsWithViewers);
metrics.hottestStreamViewers.set(snapshot.hottestStreamViewers);
metrics.activeViewersByRegion.reset();
for (const [region, count] of Object.entries(snapshot.viewersByRegion)) {
metrics.activeViewersByRegion.set({ region }, count);
}
}
export function recordThumbnailJobsEnqueued(jobsByRegion: Record<string, number>): void {
for (const [region, count] of Object.entries(jobsByRegion)) {
if (count > 0) {
metrics.thumbnailJobsEnqueued.inc({ region }, count);
}
}
}
export function setThumbnailRefreshTargets(count: number): void {
metrics.thumbnailRefreshTargets.set(count);
}
export function recordNotificationsEnqueued(target: 'channel' | 'dm', count: number): void {
if (count > 0) {
metrics.notificationsEnqueued.inc({ target }, count);
}
}
export function setCacheEntryCount(cache: 'sessions' | 'stream_keys', count: number): void {
metrics.cacheEntries.set({ cache }, count);
}
export function setPlatformInventory(snapshot: Record<string, number>): void {
metrics.platformInventory.reset();
for (const [entity, count] of Object.entries(snapshot)) {
metrics.platformInventory.set({ entity }, count);
}
}
export function recordMediamtxAuth(
action: string,
protocol: string,
outcome: string,
durationSeconds: number
): void {
metrics.mediamtxAuthRequests.inc({ action, protocol, outcome });
metrics.mediamtxAuthDuration.observe({ action, protocol, outcome }, durationSeconds);
}

View File

@@ -1,8 +1,11 @@
import { registerNotificationWorker } from './worker/notification';
import { registerThumbnailWorker } from './worker/thumbnails';
import { trackWebJob } from '../metrics';
export async function registerWorkers(): Promise<void> {
await registerNotificationWorker();
await registerThumbnailWorker();
console.log('All workers registered successfully');
}
await trackWebJob('register_workers', async () => {
await registerNotificationWorker();
await registerThumbnailWorker();
console.log('All workers registered successfully');
});
}

View File

@@ -58,8 +58,49 @@ services:
volumes:
- 'hctv_redis:/data'
mediamtx:
image: 'bluenviron/mediamtx:latest'
build:
context: .
dockerfile: docker/mediamtx/Dockerfile
ports:
- '8890:8890/udp'
postgres-exporter:
image: 'prometheuscommunity/postgres-exporter:v0.17.1'
environment:
DATA_SOURCE_NAME: 'postgresql://postgres:${PG_PASS}@postgres:5432/hctv?sslmode=disable'
redis-exporter:
image: 'oliver006/redis_exporter:v1.67.0'
environment:
REDIS_ADDR: 'redis://redis:6379'
prometheus:
image: 'prom/prometheus:v3.4.2'
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--config.expand-env'
- '--storage.tsdb.path=/prometheus'
- '--web.enable-lifecycle'
env_file:
- .env
volumes:
- './mediamtx.yml:/mediamtx.yml'
- './observability/prometheus.yml:/etc/prometheus/prometheus.yml:ro'
- 'hctv_prometheus_data:/prometheus'
extra_hosts:
- 'host.docker.internal:host-gateway'
grafana:
image: 'grafana/grafana:11.6.0'
depends_on:
- prometheus
environment:
GF_SECURITY_ADMIN_USER: '${GRAFANA_ADMIN_USER:-admin}'
GF_SECURITY_ADMIN_PASSWORD: '${GRAFANA_ADMIN_PASSWORD:-admin}'
GF_USERS_DEFAULT_THEME: light
volumes:
- './observability/grafana/provisioning/datasources:/etc/grafana/provisioning/datasources:ro'
- './observability/grafana/provisioning/dashboards:/etc/grafana/provisioning/dashboards:ro'
- './observability/grafana/dashboards:/var/lib/grafana/dashboards:ro'
- 'hctv_grafana_data:/var/lib/grafana'
volumes:
hctv_pgdata:
hctv_redis:
hctv_prometheus_data:
hctv_grafana_data:

View File

@@ -9,22 +9,59 @@ services:
- ./psql:/var/lib/postgresql
ports:
- 5555:5432
postgres-exporter:
image: prometheuscommunity/postgres-exporter:v0.17.1
environment:
DATA_SOURCE_NAME: postgresql://postgres:skbiditoilet@psql:5432/postgres?sslmode=disable
redis:
image: redis:7.4-alpine
volumes:
- ./redis:/data
ports:
- 6379:6379
redis-exporter:
image: oliver006/redis_exporter:v1.67.0
environment:
REDIS_ADDR: redis://redis:6379
mediamtx:
image: bluenviron/mediamtx:latest
ports:
- 8890:8890/udp
- 8891:8888
- 9997:9997
- 9998:9998
volumes:
- ./mediamtx.yml:/mediamtx.yml
extra_hosts:
- "host.docker.internal:host-gateway"
- 'host.docker.internal:host-gateway'
prometheus:
image: prom/prometheus:v3.4.2
command:
- --config.file=/etc/prometheus/prometheus.yml
- --storage.tsdb.path=/prometheus
- --web.enable-lifecycle
volumes:
- ../observability/prometheus.dev.yml:/etc/prometheus/prometheus.yml:ro
- prometheus_data:/prometheus
ports:
- 9090:9090
extra_hosts:
- 'host.docker.internal:host-gateway'
grafana:
image: grafana/grafana:11.6.0
depends_on:
- prometheus
environment:
GF_SECURITY_ADMIN_USER: admin
GF_SECURITY_ADMIN_PASSWORD: admin
GF_USERS_DEFAULT_THEME: light
volumes:
- ../observability/grafana/provisioning/datasources:/etc/grafana/provisioning/datasources:ro
- ../observability/grafana/provisioning/dashboards:/etc/grafana/provisioning/dashboards:ro
- ../observability/grafana/dashboards:/var/lib/grafana/dashboards:ro
- grafana_data:/var/lib/grafana
ports:
- 3001:3000
# mediamtx2:
# image: bluenviron/mediamtx:latest
# ports:
@@ -34,4 +71,8 @@ services:
# volumes:
# - ./mediamtx.yml:/mediamtx.yml
# extra_hosts:
# - "host.docker.internal:host-gateway"
# - "host.docker.internal:host-gateway"
volumes:
prometheus_data:
grafana_data:

View File

@@ -6,8 +6,14 @@ srt: yes
srtAddress: :8890
hls: yes
hlsVariant: lowLatency
hlsSegmentDuration: 2s
hlsPartDuration: 500ms
hlsSegmentCount: 10
authMethod: http
authHTTPAddress: http://host.docker.internal:3000/api/mediamtx/publish
api: yes
metrics: yes
metricsAddress: :9998

View File

@@ -0,0 +1,8 @@
FROM bluenviron/mediamtx:1 AS mediamtx
FROM ubuntu:24.04
COPY --from=mediamtx /mediamtx /
COPY ./docker/mediamtx/mediamtx.yml /mediamtx.yml
ENTRYPOINT ["/mediamtx"]

View File

@@ -0,0 +1,19 @@
paths:
all:
source: publisher
srt: yes
srtAddress: :8890
hls: yes
hlsVariant: lowLatency
hlsSegmentDuration: 2s
hlsPartDuration: 1s
hlsSegmentCount: 10
authMethod: http
authHTTPAddress: http://hctv:3000/api/mediamtx/publish
api: yes
metrics: yes
metricsAddress: :9998

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,10 @@
apiVersion: 1
providers:
- name: HackClubTV
folder: HackClub TV
type: file
disableDeletion: false
editable: true
options:
path: /var/lib/grafana/dashboards

View File

@@ -0,0 +1,10 @@
apiVersion: 1
datasources:
- name: Prometheus
uid: prometheus
type: prometheus
access: proxy
url: http://prometheus:9090
isDefault: true
editable: false

View File

@@ -0,0 +1,39 @@
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: web
metrics_path: /api/metrics
static_configs:
- targets:
- host.docker.internal:3000
- job_name: chat
metrics_path: /metrics
static_configs:
- targets:
- host.docker.internal:8000
- job_name: mediamtx
metrics_path: /metrics
static_configs:
- targets:
- mediamtx:9998
- job_name: redis
metrics_path: /metrics
static_configs:
- targets:
- redis-exporter:9121
- job_name: postgres
metrics_path: /metrics
static_configs:
- targets:
- postgres-exporter:9187
- job_name: prometheus
static_configs:
- targets:
- localhost:9090

View File

@@ -0,0 +1,45 @@
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: web
metrics_path: /api/metrics
basic_auth:
username: ${METRICS_USER}
password: ${METRICS_PASS}
static_configs:
- targets:
- hctv:3000
- job_name: chat
metrics_path: /metrics
basic_auth:
username: ${METRICS_USER}
password: ${METRICS_PASS}
static_configs:
- targets:
- chat:8000
- job_name: mediamtx
metrics_path: /metrics
static_configs:
- targets:
- mediamtx:9998
- job_name: redis
metrics_path: /metrics
static_configs:
- targets:
- redis-exporter:9121
- job_name: postgres
metrics_path: /metrics
static_configs:
- targets:
- postgres-exporter:9187
- job_name: prometheus
static_configs:
- targets:
- localhost:9090

27
pnpm-lock.yaml generated
View File

@@ -41,6 +41,9 @@ importers:
hono:
specifier: ^4.7.5
version: 4.11.3
prom-client:
specifier: ^15.1.3
version: 15.1.3
devDependencies:
'@types/node':
specifier: ^20.11.17
@@ -219,6 +222,9 @@ importers:
pg-boss:
specifier: ^10.1.6
version: 10.4.0
prom-client:
specifier: ^15.1.3
version: 15.1.3
react:
specifier: ^19.2.3
version: 19.2.3
@@ -3890,6 +3896,9 @@ packages:
resolution: {integrity: sha512-Ceh+7ox5qe7LJuLHoY0feh3pHuUDHAcRUeyL2VYghZwfpkNIy/+8Ocg0a3UuSoYzavmylwuLWQOf3hl0jjMMIw==}
engines: {node: '>=8'}
bintrees@1.0.2:
resolution: {integrity: sha512-VOMgTMwjAaUG580SXn3LacVgjurrbMme7ZZNYGSSV7mmtY6QQRh0Eg3pwIcntQ77DErK1L0NxkbetjcoXzVwKw==}
bl@5.1.0:
resolution: {integrity: sha512-tv1ZJHLfTDnXE6tMHv73YgSJaWR2AFuPwMntBe7XL/GBFHnT0CLnsHMogfk5+GzCDC5ZWarSCYaIGATZt9dNsQ==}
@@ -6683,6 +6692,10 @@ packages:
resolution: {integrity: sha512-7PiHtLll5LdnKIMw100I+8xJXR5gW2QwWYkT6iJva0bXitZKa/XMrSbdmg3r2Xnaidz9Qumd0VPaMrZlF9V9sA==}
engines: {node: '>=0.4.0'}
prom-client@15.1.3:
resolution: {integrity: sha512-6ZiOBfCywsD4k1BN9IX0uZhF+tJkV8q8llP64G5Hajs4JOeVLPCwpPVcpXy3BwYiUGgyJzsJJQeOIv7+hDSq8g==}
engines: {node: ^16 || ^18 || >=20}
prompts@2.4.2:
resolution: {integrity: sha512-NxNv/kLguCA7p3jE8oL2aEBsrJWgAakBpgmgK6lpPWV+WuOmY6r2/zbAVnP+T8bQlA0nzHXSJSJW0Hq7ylaD2Q==}
engines: {node: '>= 6'}
@@ -7351,6 +7364,9 @@ packages:
resolution: {integrity: sha512-g9ljZiwki/LfxmQADO3dEY1CbpmXT5Hm2fJ+QaGKwSXUylMybePR7/67YW7jOrrvjEgL1Fmz5kzyAjWVWLlucg==}
engines: {node: '>=6'}
tdigest@0.1.2:
resolution: {integrity: sha512-+G0LLgjjo9BZX2MfdvPfH+MKLCrxlXSYec5DaPYP1fe6Iyhf0/fSmJ0bFiZ1F8BT6cGXl2LpltQptzjXKWEkKA==}
terser-webpack-plugin@5.3.16:
resolution: {integrity: sha512-h9oBFCWrq78NyWWVcSwZarJkZ01c2AyGrzs1crmHZO3QUg9D61Wu4NPjBy69n7JqylFF5y+CsUZYmYEIZ3mR+Q==}
engines: {node: '>= 10.13.0'}
@@ -12331,6 +12347,8 @@ snapshots:
binary-extensions@2.3.0: {}
bintrees@1.0.2: {}
bl@5.1.0:
dependencies:
buffer: 6.0.3
@@ -15871,6 +15889,11 @@ snapshots:
progress@2.0.3: {}
prom-client@15.1.3:
dependencies:
'@opentelemetry/api': 1.9.0
tdigest: 0.1.2
prompts@2.4.2:
dependencies:
kleur: 3.0.3
@@ -16813,6 +16836,10 @@ snapshots:
tapable@2.3.0: {}
tdigest@0.1.2:
dependencies:
bintrees: 1.0.2
terser-webpack-plugin@5.3.16(webpack@5.104.1):
dependencies:
'@jridgewell/trace-mapping': 0.3.31