feat: #63 from SrIzan10/feat/metrics

feat: metrics
This commit is contained in:
2026-03-13 09:33:33 +01:00
committed by GitHub
25 changed files with 4132 additions and 197 deletions

View File

@@ -14,7 +14,8 @@
"@hono/node-ws": "^1.1.0",
"@leeoniya/ufuzzy": "^1.0.18",
"@oslojs/encoding": "^1.1.0",
"hono": "^4.7.5"
"hono": "^4.7.5",
"prom-client": "^15.1.3"
},
"devDependencies": {
"@types/node": "^20.11.17",

View File

@@ -4,6 +4,23 @@ import { Hono } from 'hono';
import { readFile } from 'node:fs/promises';
import { lucia } from '@hctv/auth';
import { getCookie } from 'hono/cookie';
import {
chatMetricsRegistry,
recordChatConnectionAccepted,
recordChatConnectionRejected,
recordChatDisconnect,
recordChatError,
recordChatModerationBlock,
recordDeliveredChatMessage,
recordDeliveredChatMessageBytes,
recordEmojiSearchResults,
recordHistoryMessagesLoaded,
recordIncomingChatMessage,
recordUniqueChatter,
setChannelHistorySize,
setChatModerationState,
startChatMessageTimer,
} from './metrics.js';
import { getPersonalChannel } from './utils/personalChannel.js';
import { ChatModerationAction, getRedisConnection, prisma } from '@hctv/db';
import uFuzzy from '@leeoniya/ufuzzy';
@@ -20,6 +37,7 @@ import type {
ChatSocket,
ChatUser,
} from './types/chat.js';
import { basicAuth } from 'hono/basic-auth';
const redis = getRedisConnection();
const MESSAGE_HISTORY_SIZE = 100;
@@ -33,6 +51,30 @@ type IncomingMessage = {
[key: string]: unknown;
};
const METRICS_MESSAGE_TYPES = [
'ping',
'message',
'emojiMsg',
'emojiSearch',
'mod:deleteMessage',
'mod:timeoutUser',
'mod:banUser',
'mod:unbanUser',
'mod:liftTimeout',
] as const;
type MetricsMessageType = (typeof METRICS_MESSAGE_TYPES)[number] | 'unknown';
function getMetricsMessageType(type: unknown): MetricsMessageType {
if (typeof type !== 'string') {
return 'unknown';
}
return (METRICS_MESSAGE_TYPES as readonly string[]).includes(type)
? (type as MetricsMessageType)
: 'unknown';
}
const DEFAULT_MODERATION_SETTINGS: ChatModerationSettingsShape = {
blockedTerms: [],
slowModeSeconds: 0,
@@ -259,25 +301,42 @@ async function deleteMessageFromHistory(targetUsername: string, msgId: string):
const app = new Hono();
const { injectWebSocket, upgradeWebSocket } = createNodeWebSocket({ app });
if (process.env.NODE_ENV === 'production') {
app.use(
'/metrics',
basicAuth({ username: process.env.METRICS_USER!, password: process.env.METRICS_PASS! })
);
}
app.get('/', async (c) => {
return c.text(threed);
});
app.get('/up', async (c) => {
return c.text('it works');
return c.text('hello world');
});
app.get('/metrics', async () => {
return new Response(await chatMetricsRegistry.metrics(), {
headers: {
'Content-Type': chatMetricsRegistry.contentType,
'Cache-Control': 'no-store, no-cache, must-revalidate, proxy-revalidate',
},
});
});
app.get(
'/ws/:username',
upgradeWebSocket((c) => ({
async onOpen(evt, ws) {
let authMethod = 'unknown';
const token = getCookie(c, 'auth_session');
const grant = c.req.query('grant');
const authHeader = c.req.header('Authorization');
const botAuth = c.req.query('botAuth');
if (!token && (!grant || grant === 'null') && !authHeader && !botAuth) {
recordChatConnectionRejected(authMethod, 'missing_auth');
ws.close();
return;
}
@@ -304,6 +363,7 @@ app.get(
});
if (botAccount) {
authMethod = 'bot_api_key';
chatUser = {
id: botAccount.botAccount.id,
username: botAccount.botAccount.slug,
@@ -327,6 +387,7 @@ app.get(
if (session.user) {
const userChannel = await getPersonalChannel(session.user.id);
if (userChannel) {
authMethod = 'session';
chatUser = {
id: session.user.id,
username: userChannel.name,
@@ -349,16 +410,21 @@ app.get(
: null;
if (!chatUser && !dbGrant) {
recordChatConnectionRejected(authMethod, 'auth_failed');
ws.close();
return;
}
const { username } = c.req.param();
if (!chatUser && dbGrant) {
authMethod = 'obs_grant';
}
if (dbGrant && dbGrant.name !== username) {
recordChatConnectionRejected(authMethod, 'grant_mismatch');
ws.close();
return;
}
const channel = await prisma.channel.findUnique({
where: { name: username },
select: {
@@ -383,6 +449,7 @@ app.get(
});
if (!channel) {
recordChatConnectionRejected(authMethod, 'channel_not_found');
ws.close();
return;
}
@@ -444,6 +511,20 @@ app.get(
socketState.personalChannel = personalChannel;
socketState.viewerId = socket.viewerId;
socketState.isModerator = isModerator;
socket.metricsTracked = true;
socketState.metricsTracked = true;
socket.metricsAuthMethod = authMethod;
socketState.metricsAuthMethod = authMethod;
recordChatConnectionAccepted(username, authMethod);
setChatModerationState(username, {
blockedTerms: moderationSettings.blockedTerms.length,
maxMessageLength: moderationSettings.maxMessageLength,
rateLimitCount: moderationSettings.rateLimitCount,
rateLimitWindowSeconds: moderationSettings.rateLimitWindowSeconds,
slowModeSeconds: moderationSettings.slowModeSeconds,
});
socketState.excludeFromViewerCount = Boolean(dbGrant);
socket.send(
@@ -474,6 +555,7 @@ app.get(
const messages = await redis.zrange(channelKey, 0, MESSAGE_HISTORY_SIZE - 1);
if (messages.length > 0) {
recordHistoryMessagesLoaded(username, messages.length);
socket.send(
JSON.stringify({
type: 'history',
@@ -481,6 +563,7 @@ app.get(
})
);
}
setChannelHistorySize(username, messages.length);
},
async onClose(evt, ws) {
const socket = ws as unknown as ChatSocket;
@@ -488,6 +571,14 @@ app.get(
if (process.env.NODE_ENV !== 'production') console.log('client disconnected');
if (!socketState.targetUsername) return;
if (socketState.metricsTracked) {
recordChatDisconnect(
socketState.targetUsername,
socketState.metricsAuthMethod ?? 'unknown'
);
socketState.metricsTracked = false;
}
const streamInfo = await prisma.streamInfo.findUnique({
where: {
username: socketState.targetUsername,
@@ -504,10 +595,18 @@ app.get(
}
},
async onMessage(evt, ws) {
let outcome = 'ignored';
let messageType = 'unknown';
let stopTimer: ReturnType<typeof startChatMessageTimer> | null = null;
try {
const socket = ws as unknown as ChatSocket;
const socketState = resolveSocketState(socket);
const msg = JSON.parse(evt.data.toString()) as IncomingMessage;
const rawPayload = evt.data.toString();
const msg = JSON.parse(rawPayload) as IncomingMessage;
messageType = getMetricsMessageType(msg.type);
recordIncomingChatMessage(messageType, Buffer.byteLength(rawPayload));
stopTimer = startChatMessageTimer(messageType);
if (msg.type === 'ping') {
if (!socketState.excludeFromViewerCount) {
@@ -518,6 +617,7 @@ app.get(
);
}
socket.send(JSON.stringify({ type: 'pong' }));
outcome = 'pong';
return;
}
@@ -527,6 +627,7 @@ app.get(
logModerationEvent,
broadcastToChannel,
});
outcome = 'moderation';
return;
}
@@ -541,6 +642,7 @@ app.get(
broadcastRestrictionStateToUser,
broadcastToChannel,
});
outcome = 'moderation';
return;
}
@@ -577,6 +679,7 @@ app.get(
);
await sendChatAccessState(socket, channelId, chatUser.id);
outcome = 'blocked';
return;
}
@@ -590,6 +693,8 @@ app.get(
))
) {
sendModerationError(socket, 'RATE_LIMIT', 'You are sending messages too fast.');
recordChatModerationBlock('rate_limit');
outcome = 'rate_limited';
return;
}
@@ -598,6 +703,8 @@ app.get(
const timeRemaining = await redis.ttl(slowModeKey);
if (timeRemaining > 0) {
sendModerationError(socket, 'SLOW_MODE', `Slow mode is on. Wait ${timeRemaining}s.`);
recordChatModerationBlock('slow_mode');
outcome = 'slow_mode';
return;
}
await redis.setex(slowModeKey, moderationSettings.slowModeSeconds, '1');
@@ -613,6 +720,8 @@ app.get(
'MESSAGE_TOO_LONG',
`Message exceeds ${moderationSettings.maxMessageLength} characters.`
);
recordChatModerationBlock('message_too_long');
outcome = 'message_too_long';
return;
}
@@ -628,7 +737,9 @@ app.get(
details: { blockedTerm },
});
}
recordChatModerationBlock('blocked_term');
sendModerationError(socket, 'BLOCKED_TERM', 'Message blocked by channel moderation.');
outcome = 'blocked_term';
return;
}
@@ -654,8 +765,22 @@ app.get(
await redis.zadd(channelKey, Date.now(), redisStr);
await redis.zremrangebyrank(channelKey, 0, -MESSAGE_HISTORY_SIZE - 1);
await redis.expire(channelKey, MESSAGE_TTL);
const historySize = await redis.zcard(channelKey);
setChannelHistorySize(targetUsername, historySize);
broadcastToChannel(targetUsername, socket, msgObj as unknown as Record<string, unknown>);
recordDeliveredChatMessage(chatUser.isBot ? 'bot' : 'user');
recordDeliveredChatMessageBytes(
chatUser.isBot ? 'bot' : 'user',
Buffer.byteLength(message)
);
const isFirstMessageFromUser =
(await redis.sadd(`chat:unique-chatters:${targetUsername}`, chatUser.id)) === 1;
if (isFirstMessageFromUser) {
recordUniqueChatter(chatUser.isBot ? 'bot' : 'user');
}
outcome = 'broadcast';
}
if (msg.type === 'emojiMsg') {
if (!socketState.chatUser) return;
@@ -683,12 +808,15 @@ app.get(
emojis: emojiMap,
})
);
outcome = 'emoji_lookup';
}
if (msg.type === 'emojiSearch') {
if (!socketState.chatUser) return;
const rawSearchTerm = (msg.searchTerm as string)?.trim() ?? '';
if (!rawSearchTerm || rawSearchTerm.length > 50) {
ws.send(JSON.stringify({ type: 'emojiSearchResponse', results: [] }));
recordEmojiSearchResults('empty', 0);
outcome = 'emoji_search_empty';
return;
}
const searchTerm = rawSearchTerm;
@@ -718,6 +846,8 @@ app.get(
results: results,
})
);
recordEmojiSearchResults('matched', results.length);
outcome = 'emoji_search';
} else {
ws.send(
JSON.stringify({
@@ -725,10 +855,16 @@ app.get(
results: [],
})
);
recordEmojiSearchResults('no_match', 0);
outcome = 'emoji_search_empty';
}
}
} catch (e) {
outcome = 'error';
recordChatError('on_message');
console.error('Error processing message:', e);
} finally {
stopTimer?.({ type: messageType, outcome });
}
},
}))

251
apps/chat/src/metrics.ts Normal file
View File

@@ -0,0 +1,251 @@
import { collectDefaultMetrics, Counter, Gauge, Histogram, Registry } from 'prom-client';
function createMetricsStore() {
const register = new Registry();
register.setDefaultLabels({ app: 'chat' });
collectDefaultMetrics({
prefix: 'hctv_chat_',
register,
});
const websocketConnections = new Gauge({
name: 'hctv_chat_websocket_connections',
help: 'Current number of active chat websocket connections.',
registers: [register],
});
const websocketConnectionsByChannel = new Gauge({
name: 'hctv_chat_websocket_connections_by_channel',
help: 'Current number of active chat websocket connections by target channel.',
labelNames: ['channel'],
registers: [register],
});
const websocketConnectionsByAuthMethod = new Gauge({
name: 'hctv_chat_websocket_connections_by_auth_method',
help: 'Current number of active chat websocket connections by auth method.',
labelNames: ['auth_method'],
registers: [register],
});
const websocketConnectionAttempts = new Counter({
name: 'hctv_chat_websocket_connection_attempts_total',
help: 'Total websocket connection attempts grouped by outcome, auth method, and rejection reason.',
labelNames: ['outcome', 'auth_method', 'reason'],
registers: [register],
});
const incomingMessages = new Counter({
name: 'hctv_chat_incoming_messages_total',
help: 'Total inbound websocket frames grouped by message type.',
labelNames: ['type'],
registers: [register],
});
const inboundPayloadBytes = new Counter({
name: 'hctv_chat_inbound_payload_bytes_total',
help: 'Total inbound websocket payload bytes grouped by message type.',
labelNames: ['type'],
registers: [register],
});
const messageDuration = new Histogram({
name: 'hctv_chat_message_duration_seconds',
help: 'Chat websocket message processing time in seconds.',
labelNames: ['type', 'outcome'],
buckets: [0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1],
registers: [register],
});
const deliveredMessages = new Counter({
name: 'hctv_chat_messages_delivered_total',
help: 'Total chat messages successfully broadcast, grouped by sender type.',
labelNames: ['sender_type'],
registers: [register],
});
const deliveredMessageBytes = new Counter({
name: 'hctv_chat_message_bytes_delivered_total',
help: 'Total message body bytes successfully broadcast, grouped by sender type.',
labelNames: ['sender_type'],
registers: [register],
});
const channelHistorySize = new Gauge({
name: 'hctv_chat_channel_history_size',
help: 'Current number of messages retained in Redis history for a channel.',
labelNames: ['channel'],
registers: [register],
});
const channelHistoryLoadedMessages = new Counter({
name: 'hctv_chat_history_messages_loaded_total',
help: 'Total history messages loaded from Redis during websocket joins.',
labelNames: ['channel'],
registers: [register],
});
const moderationState = new Gauge({
name: 'hctv_chat_moderation_state',
help: 'Current moderation settings by channel.',
labelNames: ['channel', 'setting'],
registers: [register],
});
const channelUniqueChatters = new Counter({
name: 'hctv_chat_unique_chatters_total',
help: 'Users who successfully sent at least one chat message, grouped by sender type.',
labelNames: ['sender_type'],
registers: [register],
});
const moderationActions = new Counter({
name: 'hctv_chat_moderation_actions_total',
help: 'Successful moderation actions performed in chat.',
labelNames: ['action'],
registers: [register],
});
const moderationBlocks = new Counter({
name: 'hctv_chat_moderation_blocks_total',
help: 'Message blocks and throttling decisions grouped by reason.',
labelNames: ['reason'],
registers: [register],
});
const emojiSearchResults = new Histogram({
name: 'hctv_chat_emoji_search_results',
help: 'Number of emoji search results returned per query.',
labelNames: ['outcome'],
buckets: [0, 1, 2, 5, 10, 25, 50, 100, 150],
registers: [register],
});
const errors = new Counter({
name: 'hctv_chat_errors_total',
help: 'Errors observed in the chat service grouped by phase.',
labelNames: ['phase'],
registers: [register],
});
return {
deliveredMessages,
deliveredMessageBytes,
channelHistoryLoadedMessages,
channelHistorySize,
emojiSearchResults,
errors,
inboundPayloadBytes,
incomingMessages,
messageDuration,
moderationActions,
moderationBlocks,
moderationState,
register,
channelUniqueChatters,
websocketConnectionAttempts,
websocketConnections,
websocketConnectionsByAuthMethod,
websocketConnectionsByChannel,
};
}
const globalForMetrics = globalThis as typeof globalThis & {
__hctvChatMetrics?: ReturnType<typeof createMetricsStore>;
};
const metrics = (globalForMetrics.__hctvChatMetrics ??= createMetricsStore());
export const chatMetricsRegistry = metrics.register;
export function recordChatConnectionAccepted(channel: string, authMethod: string): void {
metrics.websocketConnectionAttempts.inc({
auth_method: authMethod,
outcome: 'accepted',
reason: 'none',
});
metrics.websocketConnections.inc();
metrics.websocketConnectionsByChannel.inc({ channel });
metrics.websocketConnectionsByAuthMethod.inc({ auth_method: authMethod });
}
export function recordChatConnectionRejected(authMethod: string, reason: string): void {
metrics.websocketConnectionAttempts.inc({ auth_method: authMethod, outcome: 'rejected', reason });
}
export function recordChatDisconnect(channel: string, authMethod: string): void {
metrics.websocketConnections.dec();
metrics.websocketConnectionsByChannel.dec({ channel });
metrics.websocketConnectionsByAuthMethod.dec({ auth_method: authMethod });
}
export function recordIncomingChatMessage(type: string, payloadBytes: number): void {
metrics.incomingMessages.inc({ type });
metrics.inboundPayloadBytes.inc({ type }, payloadBytes);
}
export function startChatMessageTimer(type: string) {
return metrics.messageDuration.startTimer({ type });
}
export function recordDeliveredChatMessage(senderType: string): void {
metrics.deliveredMessages.inc({ sender_type: senderType });
}
export function recordDeliveredChatMessageBytes(senderType: string, bytes: number): void {
metrics.deliveredMessageBytes.inc({ sender_type: senderType }, bytes);
}
export function setChannelHistorySize(channel: string, size: number): void {
metrics.channelHistorySize.set({ channel }, size);
}
export function recordHistoryMessagesLoaded(channel: string, count: number): void {
if (count > 0) {
metrics.channelHistoryLoadedMessages.inc({ channel }, count);
}
}
export function setChatModerationState(
channel: string,
settings: {
blockedTerms: number;
maxMessageLength: number;
rateLimitCount: number;
rateLimitWindowSeconds: number;
slowModeSeconds: number;
}
): void {
metrics.moderationState.set({ channel, setting: 'blocked_terms' }, settings.blockedTerms);
metrics.moderationState.set({ channel, setting: 'slow_mode_seconds' }, settings.slowModeSeconds);
metrics.moderationState.set(
{ channel, setting: 'max_message_length' },
settings.maxMessageLength
);
metrics.moderationState.set({ channel, setting: 'rate_limit_count' }, settings.rateLimitCount);
metrics.moderationState.set(
{ channel, setting: 'rate_limit_window_seconds' },
settings.rateLimitWindowSeconds
);
}
export function recordUniqueChatter(senderType: string): void {
metrics.channelUniqueChatters.inc({ sender_type: senderType });
}
export function recordChatModerationAction(action: string): void {
metrics.moderationActions.inc({ action });
}
export function recordChatModerationBlock(reason: string): void {
metrics.moderationBlocks.inc({ reason });
}
export function recordChatError(phase: string): void {
metrics.errors.inc({ phase });
}
export function recordEmojiSearchResults(outcome: string, count: number): void {
metrics.emojiSearchResults.observe({ outcome }, count);
}

View File

@@ -39,6 +39,8 @@ export interface ChatSocket {
personalChannel?: any;
viewerId?: string;
isModerator?: boolean;
metricsTracked?: boolean;
metricsAuthMethod?: string;
excludeFromViewerCount?: boolean;
raw?:
| (ModifiedWebSocket & {
@@ -47,6 +49,8 @@ export interface ChatSocket {
chatUser?: ChatUser | null;
personalChannel?: any;
isModerator?: boolean;
metricsTracked?: boolean;
metricsAuthMethod?: string;
excludeFromViewerCount?: boolean;
})
| null;

View File

@@ -1,4 +1,5 @@
import { ChatModerationAction, prisma } from '@hctv/db';
import { recordChatModerationAction } from '../metrics.js';
import type {
ChatModerationCommand,
ChatRestrictionState,
@@ -270,6 +271,7 @@ export async function handleDeleteMessageCommand(
reason: 'Message deleted by moderator',
details: { msgId },
});
recordChatModerationAction('message_deleted');
deps.broadcastToChannel(context.targetUsername, socket, { type: 'messageDeleted', msgId });
}
@@ -330,6 +332,7 @@ export async function handleUserRestrictionCommand(
targetUserId: target.targetUserId,
reason: 'User unbanned in chat',
});
recordChatModerationAction('user_unbanned');
await deps.broadcastRestrictionStateToUser(
context.targetUsername,
@@ -389,6 +392,7 @@ export async function handleUserRestrictionCommand(
reason,
details: durationSeconds ? { durationSeconds } : undefined,
});
recordChatModerationAction(msg.type === 'mod:timeoutUser' ? 'user_timeout' : 'user_banned');
await deps.broadcastRestrictionStateToUser(
context.targetUsername,

View File

@@ -60,6 +60,7 @@
"nuqs": "^2.4.3",
"pg": "^8.14.1",
"pg-boss": "^10.1.6",
"prom-client": "^15.1.3",
"react": "^19.2.3",
"react-day-picker": "^9.13.0",
"react-dom": "^19.2.3",

View File

@@ -1,29 +1,39 @@
import { prisma, getRedisConnection } from '@hctv/db';
import { recordMediamtxAuth } from '@/lib/metrics';
import { NextRequest } from 'next/server';
import { z } from 'zod';
export async function POST(request: NextRequest) {
const startedAt = performance.now();
let action = 'invalid';
let protocol = 'invalid';
const finish = (body: string, status: number, outcome: string) => {
recordMediamtxAuth(action, protocol, outcome, (performance.now() - startedAt) / 1000);
return new Response(body, { status });
};
const redis = getRedisConnection();
const body = await request.json();
if (process.env.NODE_ENV !== 'production') {
console.log(
'Mediamtx publish auth request:',
JSON.stringify(body, null, 2)
);
};
console.log('Mediamtx publish auth request:', JSON.stringify(body, null, 2));
}
const parsed = schema.safeParse(body);
if (!parsed.success) {
return new Response('invalid request', { status: 400 });
return finish('invalid request', 400, 'invalid_request');
}
const { action, protocol, path, password } = parsed.data;
if (action === 'publish' && protocol === 'srt') {
const { action: parsedAction, protocol: parsedProtocol, path, password } = parsed.data;
action = parsedAction;
protocol = parsedProtocol;
if (parsedAction === 'publish' && parsedProtocol === 'srt') {
const channelKey = await redis.get(`streamKey:${path}`);
if (channelKey) {
if (channelKey !== password) {
return new Response('invalid stream key', { status: 403 });
return finish('invalid stream key', 403, 'invalid_stream_key');
}
const channel = await prisma.channel.findUnique({
@@ -38,39 +48,39 @@ export async function POST(request: NextRequest) {
});
if (channel?.restriction) {
const isExpired = channel.restriction.expiresAt &&
new Date(channel.restriction.expiresAt) < new Date();
const isExpired =
channel.restriction.expiresAt && new Date(channel.restriction.expiresAt) < new Date();
if (!isExpired) {
return new Response('channel restricted', { status: 403 });
return finish('channel restricted', 403, 'channel_restricted');
}
}
if (channel?.owner?.ban) {
const isExpired = channel.owner.ban.expiresAt &&
new Date(channel.owner.ban.expiresAt) < new Date();
const isExpired =
channel.owner.ban.expiresAt && new Date(channel.owner.ban.expiresAt) < new Date();
if (!isExpired) {
return new Response('user banned', { status: 403 });
return finish('user banned', 403, 'user_banned');
}
}
if (channel?.streamInfo[0].isLive) {
return new Response('stream already live', { status: 403 });
return finish('stream already live', 403, 'stream_already_live');
}
return new Response('youre in yay', { status: 200 });
return finish('youre in yay', 200, 'authorized_publish');
}
} else if (action === 'read' && protocol === 'hls') {
} else if (parsedAction === 'read' && parsedProtocol === 'hls') {
if (password === process.env.MEDIAMTX_PUBLISH_KEY) {
return new Response('authorized (hls read key for thumbs)', { status: 200 });
return finish('authorized (hls read key for thumbs)', 200, 'authorized_thumbnail');
}
const sessionExists = await redis.exists(`sessions:${password}`);
if (!sessionExists) {
return new Response('unauthorized', { status: 401 });
return finish('unauthorized', 401, 'unauthorized_session');
}
return new Response('authorized', { status: 200 });
return finish('authorized', 200, 'authorized_read');
}
return new Response('uhh', { status: 401 });
return finish('uhh', 401, 'unauthorized');
}
const schema = z.object({

View File

@@ -0,0 +1,58 @@
import { webMetricsRegistry } from '@/lib/metrics';
import { NextRequest, NextResponse } from 'next/server';
export const runtime = 'nodejs';
export const dynamic = 'force-dynamic';
export async function GET(req: NextRequest) {
if (process.env.NODE_ENV === 'production' && !isAuthenticated(req)) {
return new NextResponse('Authentication required', {
status: 401,
headers: { 'WWW-Authenticate': 'Basic' },
});
}
return new Response(await webMetricsRegistry.metrics(), {
headers: {
'Content-Type': webMetricsRegistry.contentType,
'Cache-Control': 'no-store, no-cache, must-revalidate, proxy-revalidate',
},
});
}
// source: https://vancelucas.com/blog/how-to-add-http-basic-auth-to-next-js/
function isAuthenticated(req: NextRequest) {
const authheader = req.headers.get('authorization') ?? req.headers.get('Authorization');
if (!authheader) {
return false;
}
const parts = authheader.split(' ');
if (parts.length !== 2) {
return false;
}
const scheme = parts[0];
const encoded = parts[1];
if (scheme !== 'Basic' || !encoded) {
return false;
}
let decoded: string;
try {
decoded = Buffer.from(encoded, 'base64').toString();
} catch {
return false;
}
const separatorIndex = decoded.indexOf(':');
if (separatorIndex === -1) {
return false;
}
const user = decoded.substring(0, separatorIndex);
const pass = decoded.substring(separatorIndex + 1);
return user === process.env.METRICS_USER && pass === process.env.METRICS_PASS;
}

View File

@@ -1,23 +1,30 @@
import { prisma } from "@hctv/db";
import { getThumbnailQueue } from "../workers";
import { prisma } from '@hctv/db';
import { recordThumbnailJobsEnqueued, setThumbnailRefreshTargets, trackWebJob } from '../metrics';
import { getThumbnailQueue } from '../workers';
export default async function getLiveThumb() {
const liveChannels = await prisma.streamInfo.findMany({
where: {
isLive: true,
},
include: {
channel: true,
}
});
const liveChannelNames = liveChannels.map((channel) => channel.channel.name);
const thumbQueue = getThumbnailQueue();
for (const channel of liveChannelNames) {
const lc = liveChannels.find(c => c.channel.name === channel)!;
await thumbQueue.add("getLiveThumb", {
name: channel,
server: lc.streamRegion,
return trackWebJob('thumbnail_refresh', async () => {
const liveChannels = await prisma.streamInfo.findMany({
where: {
isLive: true,
},
include: {
channel: true,
},
});
}
}
const thumbQueue = getThumbnailQueue();
const jobsByRegion: Record<string, number> = {};
setThumbnailRefreshTargets(liveChannels.length);
for (const liveChannel of liveChannels) {
await thumbQueue.add('getLiveThumb', {
name: liveChannel.channel.name,
server: liveChannel.streamRegion,
});
jobsByRegion[liveChannel.streamRegion] = (jobsByRegion[liveChannel.streamRegion] ?? 0) + 1;
}
recordThumbnailJobsEnqueued(jobsByRegion);
});
}

View File

@@ -1,4 +1,13 @@
import { prisma } from '@hctv/db';
import {
recordLiveStreamTransition,
recordNotificationsEnqueued,
recordStreamSyncScrape,
setLiveStreamsByRegion,
setPlatformInventory,
setStreamPathsByRegion,
trackWebJob,
} from '../metrics';
import { HttpFlv } from '../types/liveBackendJson';
import { getNotificationQueue } from '../workers';
import client from '../services/slackNotifier';
@@ -10,9 +19,29 @@ export default async function runner() {
if ((await prisma.user.count()) === 0) {
return;
}
await refreshPlatformInventory();
await initializeStreamInfo();
await syncStream();
setInterval(syncStream, 5000);
setInterval(refreshPlatformInventory, 60_000);
}
async function refreshPlatformInventory() {
const [channels, liveStreams, follows, botAccounts, users] = await Promise.all([
prisma.channel.count(),
prisma.streamInfo.count({ where: { isLive: true } }),
prisma.follow.count(),
prisma.botAccount.count(),
prisma.user.count(),
]);
setPlatformInventory({
bot_accounts: botAccounts,
channels,
follows,
live_stream_rows: liveStreams,
users,
});
}
export async function initializeStreamInfo(channelId?: string) {
@@ -50,103 +79,122 @@ export async function initializeStreamInfo(channelId?: string) {
export async function syncStream() {
try {
const regions = Object.keys(MEDIAMTX_SERVER_REGIONS) as Array<
keyof typeof MEDIAMTX_SERVER_REGIONS
>;
await trackWebJob('stream_sync', async () => {
const regions = Object.keys(MEDIAMTX_SERVER_REGIONS) as Array<
keyof typeof MEDIAMTX_SERVER_REGIONS
>;
const allActiveStreams = new Map<string, keyof typeof MEDIAMTX_SERVER_REGIONS>();
const allActiveStreams = new Map<string, keyof typeof MEDIAMTX_SERVER_REGIONS>();
const liveStreamsByRegion = Object.fromEntries(regions.map((region) => [region, 0]));
const pathsSeenByRegion = Object.fromEntries(regions.map((region) => [region, 0]));
for (const r of regions) {
const region = MEDIAMTX_SERVER_REGIONS[r];
const response = await fetch(`${region.apiUrl}/v3/paths/list?itemsPerPage=1000`);
for (const r of regions) {
const region = MEDIAMTX_SERVER_REGIONS[r];
const response = await fetch(`${region.apiUrl}/v3/paths/list?itemsPerPage=1000`);
if (!response.ok) {
console.error(
`Failed to fetch ${r} stream stats: ${response.status} ${response.statusText}`
);
continue;
}
if (!response.ok) {
recordStreamSyncScrape(r, 'error');
console.error(
`Failed to fetch ${r} stream stats: ${response.status} ${response.statusText}`
);
continue;
}
type ResponseType =
paths['/v3/paths/list']['get']['responses']['200']['content']['application/json'];
const data = (await response.json()) as ResponseType;
recordStreamSyncScrape(r, 'success');
if (data?.items) {
for (const stream of data.items) {
if (stream.ready && stream.name) {
allActiveStreams.set(stream.name, r);
type ResponseType =
paths['/v3/paths/list']['get']['responses']['200']['content']['application/json'];
const data = (await response.json()) as ResponseType;
if (data?.items) {
for (const stream of data.items) {
if (stream.ready && stream.name) {
allActiveStreams.set(stream.name, r);
liveStreamsByRegion[r] += 1;
pathsSeenByRegion[r] += 1;
}
}
}
}
}
// handle streams going offline
const currentLiveStreams = await prisma.streamInfo.findMany({
where: { isLive: true },
});
setLiveStreamsByRegion(liveStreamsByRegion);
setStreamPathsByRegion(pathsSeenByRegion);
for (const dbStream of currentLiveStreams) {
if (!allActiveStreams.has(dbStream.username)) {
await prisma.streamInfo.update({
where: { username: dbStream.username },
data: {
isLive: false,
viewers: 0,
startedAt: new Date(0),
},
});
}
}
// handle streams going online
for (const [username, regionKey] of allActiveStreams) {
const existingStream = await prisma.streamInfo.findUnique({
where: { username },
include: { channel: true },
const currentLiveStreams = await prisma.streamInfo.findMany({
where: { isLive: true },
});
if (existingStream && !existingStream.isLive) {
console.log(`Stream ${username} is now live in region ${regionKey}`);
await prisma.streamInfo.update({
where: { username },
data: {
isLive: true,
startedAt: new Date(),
streamRegion: regionKey,
},
});
const subscribedFollowers = await prisma.follow.findMany({
where: {
channelId: existingStream.channelId,
notifyStream: true,
},
include: {
user: true,
},
});
const queue = getNotificationQueue();
if (!existingStream.channel.is247) {
queue.add(`streamStartChannel:${existingStream.username}`, {
text: `${existingStream.username} is now *live*, streaming *${existingStream.title}* (${existingStream.category})!\n<https://hackclub.tv/${existingStream.username}|Go check them out>`,
channel: process.env.NOTIFICATION_CHANNEL_ID!,
unfurl_links: true,
for (const dbStream of currentLiveStreams) {
if (!allActiveStreams.has(dbStream.username)) {
recordLiveStreamTransition('offline', dbStream.streamRegion);
await prisma.streamInfo.update({
where: { username: dbStream.username },
data: {
isLive: false,
viewers: 0,
startedAt: new Date(0),
},
});
}
}
if (existingStream.enableNotifications && !existingStream.channel.is247) {
for (const follower of subscribedFollowers) {
queue.add(`streamStartDm:${follower.user.id}`, {
text: `${existingStream.username} is now *live*, streaming *${existingStream.title}* (${existingStream.category})!\n<https://hackclub.tv/${existingStream.username}|Go check them out>\n_Stream notifications are enabled for this user. If you want to disable them, you can do so in \`Profile > Follows\`._`,
channel: follower.user.slack_id,
for (const [username, regionKey] of allActiveStreams) {
const existingStream = await prisma.streamInfo.findUnique({
where: { username },
include: { channel: true },
});
if (existingStream && !existingStream.isLive) {
console.log(`Stream ${username} is now live in region ${regionKey}`);
recordLiveStreamTransition('online', regionKey);
await prisma.streamInfo.update({
where: { username },
data: {
isLive: true,
startedAt: new Date(),
streamRegion: regionKey,
},
});
const subscribedFollowers = await prisma.follow.findMany({
where: {
channelId: existingStream.channelId,
notifyStream: true,
},
include: {
user: true,
},
});
const queue = getNotificationQueue();
if (!existingStream.channel.is247) {
queue.add(`streamStartChannel:${existingStream.username}`, {
text: `${existingStream.username} is now *live*, streaming *${existingStream.title}* (${existingStream.category})!\n<https://hackclub.tv/${existingStream.username}|Go check them out>`,
channel: process.env.NOTIFICATION_CHANNEL_ID!,
unfurl_links: true,
});
}
if (existingStream.enableNotifications && !existingStream.channel.is247) {
for (const follower of subscribedFollowers) {
queue.add(`streamStartDm:${follower.user.id}`, {
text: `${existingStream.username} is now *live*, streaming *${existingStream.title}* (${existingStream.category})!\n<https://hackclub.tv/${existingStream.username}|Go check them out>\n_Stream notifications are enabled for this user. If you want to disable them, you can do so in \`Profile > Follows\`._`,
channel: follower.user.slack_id,
unfurl_links: true,
});
}
}
recordNotificationsEnqueued('channel', existingStream.channel.is247 ? 0 : 1);
recordNotificationsEnqueued(
'dm',
existingStream.enableNotifications && !existingStream.channel.is247
? subscribedFollowers.length
: 0
);
}
}
}
});
} catch (error) {
console.error('Error syncing stream status:', error);
}

View File

@@ -1,30 +1,37 @@
import { prisma, getRedisConnection } from '@hctv/db';
import { setCacheEntryCount, trackWebJob } from '../metrics';
export default async function syncStreamKeys() {
console.log('Syncing stream keys to Redis...');
try {
const keys = await prisma.streamKey.findMany({
include: {
channel: true,
},
});
await trackWebJob('sync_stream_keys', async () => {
const keys = await prisma.streamKey.findMany({
include: {
channel: true,
},
});
if (keys.length === 0) {
console.log('No stream keys found to sync.');
return;
}
const redis = getRedisConnection();
const pipeline = redis.pipeline();
for (const key of keys) {
if (key.channel && key.channel.name) {
pipeline.set(`streamKey:${key.channel.name}`, key.key);
if (keys.length === 0) {
setCacheEntryCount('stream_keys', 0);
console.log('No stream keys found to sync.');
return;
}
}
await pipeline.exec();
console.log(`Synced ${keys.length} stream keys to Redis`);
const redis = getRedisConnection();
const pipeline = redis.pipeline();
let syncedKeyCount = 0;
for (const key of keys) {
if (key.channel && key.channel.name) {
pipeline.set(`streamKey:${key.channel.name}`, key.key);
syncedKeyCount += 1;
}
}
await pipeline.exec();
setCacheEntryCount('stream_keys', syncedKeyCount);
console.log(`Synced ${syncedKeyCount} stream keys to Redis`);
});
} catch (error) {
console.error('Failed to sync stream keys to Redis:', error);
}

View File

@@ -1,40 +1,101 @@
import { getRedisConnection, prisma } from "@hctv/db";
import { getRedisConnection, prisma } from '@hctv/db';
import { setViewerSnapshot, trackWebJob } from '../metrics';
async function countViewersForChannel(channelName: string): Promise<number> {
const redis = getRedisConnection();
let cursor = '0';
let total = 0;
do {
const [nextCursor, keys] = await redis.scan(
cursor,
'MATCH',
`viewer:${channelName}:*`,
'COUNT',
200
);
cursor = nextCursor;
total += keys.length;
} while (cursor !== '0');
return total;
}
export async function viewerCountSync() {
const streams = await prisma.streamInfo.findMany({
where: {
isLive: true
},
include: {
channel: true
}
})
if (streams.length === 0) {
return;
}
const redis = getRedisConnection();
const multi = redis.multi();
for (const stream of streams) {
multi.keys(`viewer:${stream.channel.name}:*`);
}
const results = await multi.exec();
await prisma.$transaction(async (tx) => {
const updates = results?.map((res, index) => {
const count = Array.isArray(res[1]) ? res[1].length : 0;
const stream = streams[index];
return tx.streamInfo.update({
try {
await trackWebJob('viewer_count_sync', async () => {
const streams = await prisma.streamInfo.findMany({
where: {
// using username here because it uses a map
username: stream.username
isLive: true,
},
data: {
viewers: count
select: {
username: true,
streamRegion: true,
channel: {
select: {
name: true,
},
},
},
});
if (streams.length === 0) {
setViewerSnapshot({
totalViewers: 0,
trackedStreams: 0,
streamsWithViewers: 0,
hottestStreamViewers: 0,
viewersByRegion: {},
});
return;
}
const viewersByRegion: Record<string, number> = {};
let totalViewers = 0;
let streamsWithViewers = 0;
let hottestStreamViewers = 0;
const streamCounts = await Promise.all(
streams.map(async (stream) => ({
stream,
count: await countViewersForChannel(stream.channel.name),
}))
);
for (const { stream, count } of streamCounts) {
totalViewers += count;
if (stream.streamRegion) {
viewersByRegion[stream.streamRegion] =
(viewersByRegion[stream.streamRegion] ?? 0) + count;
}
})
})
await Promise.all(updates || []);
})
}
if (count > 0) {
streamsWithViewers += 1;
}
if (count > hottestStreamViewers) {
hottestStreamViewers = count;
}
await prisma.streamInfo.update({
where: {
username: stream.username,
},
data: {
viewers: count,
},
});
}
setViewerSnapshot({
totalViewers,
trackedStreams: streams.length,
streamsWithViewers,
hottestStreamViewers,
viewersByRegion,
});
});
} catch (error) {
console.error('Error syncing viewer counts:', error);
}
}

View File

@@ -1,17 +1,35 @@
import { prisma } from "@hctv/db";
import { getRedisConnection } from "@hctv/db";
import { getRedisConnection, prisma } from '@hctv/db';
import { setCacheEntryCount, trackWebJob } from '../metrics';
async function deleteSessionKeys() {
const redis = getRedisConnection();
let cursor = '0';
do {
const [nextCursor, keys] = await redis.scan(cursor, 'MATCH', 'sessions:*', 'COUNT', 200);
cursor = nextCursor;
if (keys.length > 0) {
await redis.unlink(...keys);
}
} while (cursor !== '0');
}
export default async function writeSessions() {
const sessions = await prisma.session.findMany();
const sessionIds = sessions.map((session) => session.id);
const redis = getRedisConnection();
const multi = redis.multi();
multi.del('sessions:*')
for (const sessionId of sessionIds) {
multi.set(`sessions:${sessionId}`, '');
}
await multi.exec();
return trackWebJob('write_sessions', async () => {
const sessions = await prisma.session.findMany();
const sessionIds = sessions.map((session) => session.id);
console.log("Sessions written to Redis");
}
await deleteSessionKeys();
const redis = getRedisConnection();
const multi = redis.multi();
for (const sessionId of sessionIds) {
multi.set(`sessions:${sessionId}`, '');
}
await multi.exec();
setCacheEntryCount('sessions', sessionIds.length);
console.log('Sessions written to Redis');
});
}

261
apps/web/src/lib/metrics.ts Normal file
View File

@@ -0,0 +1,261 @@
import { collectDefaultMetrics, Counter, Gauge, Histogram, Registry } from 'prom-client';
function createMetricsStore() {
const register = new Registry();
register.setDefaultLabels({ app: 'web' });
collectDefaultMetrics({
prefix: 'hctv_web_',
register,
});
const backgroundJobRuns = new Counter({
name: 'hctv_web_background_job_runs_total',
help: 'Total number of background jobs run by the web app.',
labelNames: ['job', 'status'],
registers: [register],
});
const backgroundJobDuration = new Histogram({
name: 'hctv_web_background_job_duration_seconds',
help: 'Background job execution time in seconds.',
labelNames: ['job', 'status'],
buckets: [0.01, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30],
registers: [register],
});
const liveStreams = new Gauge({
name: 'hctv_web_live_streams',
help: 'Current number of live streams grouped by MediaMTX region.',
labelNames: ['region'],
registers: [register],
});
const streamPathsSeen = new Gauge({
name: 'hctv_web_stream_paths_seen',
help: 'Current number of ready MediaMTX paths seen during the latest sync.',
labelNames: ['region'],
registers: [register],
});
const liveStreamTransitions = new Counter({
name: 'hctv_web_live_stream_transitions_total',
help: 'Live stream state transitions observed by the web app.',
labelNames: ['transition', 'region'],
registers: [register],
});
const streamSyncScrapes = new Counter({
name: 'hctv_web_stream_sync_scrapes_total',
help: 'MediaMTX region scrapes attempted by stream sync.',
labelNames: ['region', 'status'],
registers: [register],
});
const activeViewers = new Gauge({
name: 'hctv_web_active_viewers',
help: 'Current number of active viewers across all live streams.',
registers: [register],
});
const activeViewersByRegion = new Gauge({
name: 'hctv_web_active_viewers_by_region',
help: 'Current number of active viewers grouped by stream region.',
labelNames: ['region'],
registers: [register],
});
const viewerCountTrackedStreams = new Gauge({
name: 'hctv_web_viewer_count_tracked_streams',
help: 'Number of live streams included in the latest viewer sync.',
registers: [register],
});
const streamsWithViewers = new Gauge({
name: 'hctv_web_streams_with_viewers',
help: 'Current number of live streams with at least one viewer.',
registers: [register],
});
const hottestStreamViewers = new Gauge({
name: 'hctv_web_hottest_stream_viewers',
help: 'Current viewer count of the most watched live stream.',
registers: [register],
});
const thumbnailJobsEnqueued = new Counter({
name: 'hctv_web_thumbnail_jobs_enqueued_total',
help: 'Total thumbnail refresh jobs enqueued by region.',
labelNames: ['region'],
registers: [register],
});
const thumbnailRefreshTargets = new Gauge({
name: 'hctv_web_thumbnail_refresh_targets',
help: 'Number of live streams targeted in the latest thumbnail refresh run.',
registers: [register],
});
const notificationsEnqueued = new Counter({
name: 'hctv_web_notifications_enqueued_total',
help: 'Notification jobs enqueued when streams go live.',
labelNames: ['target'],
registers: [register],
});
const cacheEntries = new Gauge({
name: 'hctv_web_cache_entries',
help: 'Current number of records mirrored into Redis by cache-sync jobs.',
labelNames: ['cache'],
registers: [register],
});
const platformInventory = new Gauge({
name: 'hctv_web_platform_inventory',
help: 'High-level counts of important platform records.',
labelNames: ['entity'],
registers: [register],
});
const mediamtxAuthRequests = new Counter({
name: 'hctv_web_mediamtx_auth_requests_total',
help: 'Total MediaMTX auth decisions handled by the web app.',
labelNames: ['action', 'protocol', 'outcome'],
registers: [register],
});
const mediamtxAuthDuration = new Histogram({
name: 'hctv_web_mediamtx_auth_duration_seconds',
help: 'MediaMTX auth request duration in seconds.',
labelNames: ['action', 'protocol', 'outcome'],
buckets: [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5],
registers: [register],
});
return {
register,
activeViewers,
activeViewersByRegion,
backgroundJobDuration,
backgroundJobRuns,
cacheEntries,
hottestStreamViewers,
liveStreams,
liveStreamTransitions,
mediamtxAuthDuration,
mediamtxAuthRequests,
notificationsEnqueued,
platformInventory,
streamPathsSeen,
streamsWithViewers,
streamSyncScrapes,
thumbnailRefreshTargets,
thumbnailJobsEnqueued,
viewerCountTrackedStreams,
};
}
const globalForMetrics = globalThis as typeof globalThis & {
__hctvWebMetrics?: ReturnType<typeof createMetricsStore>;
};
const metrics = (globalForMetrics.__hctvWebMetrics ??= createMetricsStore());
export const webMetricsRegistry = metrics.register;
export async function trackWebJob<T>(job: string, fn: () => Promise<T>): Promise<T> {
const stopTimer = metrics.backgroundJobDuration.startTimer({ job });
let status = 'success';
try {
return await fn();
} catch (error) {
status = 'error';
throw error;
} finally {
metrics.backgroundJobRuns.inc({ job, status });
stopTimer({ job, status });
}
}
export function setLiveStreamsByRegion(streamsByRegion: Record<string, number>): void {
metrics.liveStreams.reset();
for (const [region, count] of Object.entries(streamsByRegion)) {
metrics.liveStreams.set({ region }, count);
}
}
export function setStreamPathsByRegion(pathsByRegion: Record<string, number>): void {
metrics.streamPathsSeen.reset();
for (const [region, count] of Object.entries(pathsByRegion)) {
metrics.streamPathsSeen.set({ region }, count);
}
}
export function recordLiveStreamTransition(transition: 'online' | 'offline', region: string): void {
metrics.liveStreamTransitions.inc({ transition, region });
}
export function recordStreamSyncScrape(region: string, status: 'success' | 'error'): void {
metrics.streamSyncScrapes.inc({ region, status });
}
export function setViewerSnapshot(snapshot: {
totalViewers: number;
trackedStreams: number;
viewersByRegion: Record<string, number>;
streamsWithViewers: number;
hottestStreamViewers: number;
}): void {
metrics.activeViewers.set(snapshot.totalViewers);
metrics.viewerCountTrackedStreams.set(snapshot.trackedStreams);
metrics.streamsWithViewers.set(snapshot.streamsWithViewers);
metrics.hottestStreamViewers.set(snapshot.hottestStreamViewers);
metrics.activeViewersByRegion.reset();
for (const [region, count] of Object.entries(snapshot.viewersByRegion)) {
metrics.activeViewersByRegion.set({ region }, count);
}
}
export function recordThumbnailJobsEnqueued(jobsByRegion: Record<string, number>): void {
for (const [region, count] of Object.entries(jobsByRegion)) {
if (count > 0) {
metrics.thumbnailJobsEnqueued.inc({ region }, count);
}
}
}
export function setThumbnailRefreshTargets(count: number): void {
metrics.thumbnailRefreshTargets.set(count);
}
export function recordNotificationsEnqueued(target: 'channel' | 'dm', count: number): void {
if (count > 0) {
metrics.notificationsEnqueued.inc({ target }, count);
}
}
export function setCacheEntryCount(cache: 'sessions' | 'stream_keys', count: number): void {
metrics.cacheEntries.set({ cache }, count);
}
export function setPlatformInventory(snapshot: Record<string, number>): void {
metrics.platformInventory.reset();
for (const [entity, count] of Object.entries(snapshot)) {
metrics.platformInventory.set({ entity }, count);
}
}
export function recordMediamtxAuth(
action: string,
protocol: string,
outcome: string,
durationSeconds: number
): void {
metrics.mediamtxAuthRequests.inc({ action, protocol, outcome });
metrics.mediamtxAuthDuration.observe({ action, protocol, outcome }, durationSeconds);
}

View File

@@ -1,8 +1,11 @@
import { registerNotificationWorker } from './worker/notification';
import { registerThumbnailWorker } from './worker/thumbnails';
import { trackWebJob } from '../metrics';
export async function registerWorkers(): Promise<void> {
await registerNotificationWorker();
await registerThumbnailWorker();
console.log('All workers registered successfully');
}
await trackWebJob('register_workers', async () => {
await registerNotificationWorker();
await registerThumbnailWorker();
console.log('All workers registered successfully');
});
}

View File

@@ -63,3 +63,44 @@ services:
dockerfile: docker/mediamtx/Dockerfile
ports:
- '8890:8890/udp'
postgres-exporter:
image: 'prometheuscommunity/postgres-exporter:v0.17.1'
environment:
DATA_SOURCE_NAME: 'postgresql://postgres:${PG_PASS}@postgres:5432/hctv?sslmode=disable'
redis-exporter:
image: 'oliver006/redis_exporter:v1.67.0'
environment:
REDIS_ADDR: 'redis://redis:6379'
prometheus:
image: 'prom/prometheus:v3.4.2'
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--config.expand-env'
- '--storage.tsdb.path=/prometheus'
- '--web.enable-lifecycle'
env_file:
- .env
volumes:
- './observability/prometheus.yml:/etc/prometheus/prometheus.yml:ro'
- 'hctv_prometheus_data:/prometheus'
extra_hosts:
- 'host.docker.internal:host-gateway'
grafana:
image: 'grafana/grafana:11.6.0'
depends_on:
- prometheus
environment:
GF_SECURITY_ADMIN_USER: '${GRAFANA_ADMIN_USER:-admin}'
GF_SECURITY_ADMIN_PASSWORD: '${GRAFANA_ADMIN_PASSWORD:-admin}'
GF_USERS_DEFAULT_THEME: light
volumes:
- './observability/grafana/provisioning/datasources:/etc/grafana/provisioning/datasources:ro'
- './observability/grafana/provisioning/dashboards:/etc/grafana/provisioning/dashboards:ro'
- './observability/grafana/dashboards:/var/lib/grafana/dashboards:ro'
- 'hctv_grafana_data:/var/lib/grafana'
volumes:
hctv_pgdata:
hctv_redis:
hctv_prometheus_data:
hctv_grafana_data:

View File

@@ -9,22 +9,59 @@ services:
- ./psql:/var/lib/postgresql
ports:
- 5555:5432
postgres-exporter:
image: prometheuscommunity/postgres-exporter:v0.17.1
environment:
DATA_SOURCE_NAME: postgresql://postgres:skbiditoilet@psql:5432/postgres?sslmode=disable
redis:
image: redis:7.4-alpine
volumes:
- ./redis:/data
ports:
- 6379:6379
redis-exporter:
image: oliver006/redis_exporter:v1.67.0
environment:
REDIS_ADDR: redis://redis:6379
mediamtx:
image: bluenviron/mediamtx:latest
ports:
- 8890:8890/udp
- 8891:8888
- 9997:9997
- 9998:9998
volumes:
- ./mediamtx.yml:/mediamtx.yml
extra_hosts:
- "host.docker.internal:host-gateway"
- 'host.docker.internal:host-gateway'
prometheus:
image: prom/prometheus:v3.4.2
command:
- --config.file=/etc/prometheus/prometheus.yml
- --storage.tsdb.path=/prometheus
- --web.enable-lifecycle
volumes:
- ../observability/prometheus.dev.yml:/etc/prometheus/prometheus.yml:ro
- prometheus_data:/prometheus
ports:
- 9090:9090
extra_hosts:
- 'host.docker.internal:host-gateway'
grafana:
image: grafana/grafana:11.6.0
depends_on:
- prometheus
environment:
GF_SECURITY_ADMIN_USER: admin
GF_SECURITY_ADMIN_PASSWORD: admin
GF_USERS_DEFAULT_THEME: light
volumes:
- ../observability/grafana/provisioning/datasources:/etc/grafana/provisioning/datasources:ro
- ../observability/grafana/provisioning/dashboards:/etc/grafana/provisioning/dashboards:ro
- ../observability/grafana/dashboards:/var/lib/grafana/dashboards:ro
- grafana_data:/var/lib/grafana
ports:
- 3001:3000
# mediamtx2:
# image: bluenviron/mediamtx:latest
# ports:
@@ -34,4 +71,8 @@ services:
# volumes:
# - ./mediamtx.yml:/mediamtx.yml
# extra_hosts:
# - "host.docker.internal:host-gateway"
# - "host.docker.internal:host-gateway"
volumes:
prometheus_data:
grafana_data:

View File

@@ -15,3 +15,5 @@ authMethod: http
authHTTPAddress: http://host.docker.internal:3000/api/mediamtx/publish
api: yes
metrics: yes
metricsAddress: :9998

View File

@@ -15,3 +15,5 @@ authMethod: http
authHTTPAddress: http://hctv:3000/api/mediamtx/publish
api: yes
metrics: yes
metricsAddress: :9998

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,10 @@
apiVersion: 1
providers:
- name: HackClubTV
folder: HackClub TV
type: file
disableDeletion: false
editable: true
options:
path: /var/lib/grafana/dashboards

View File

@@ -0,0 +1,10 @@
apiVersion: 1
datasources:
- name: Prometheus
uid: prometheus
type: prometheus
access: proxy
url: http://prometheus:9090
isDefault: true
editable: false

View File

@@ -0,0 +1,39 @@
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: web
metrics_path: /api/metrics
static_configs:
- targets:
- host.docker.internal:3000
- job_name: chat
metrics_path: /metrics
static_configs:
- targets:
- host.docker.internal:8000
- job_name: mediamtx
metrics_path: /metrics
static_configs:
- targets:
- mediamtx:9998
- job_name: redis
metrics_path: /metrics
static_configs:
- targets:
- redis-exporter:9121
- job_name: postgres
metrics_path: /metrics
static_configs:
- targets:
- postgres-exporter:9187
- job_name: prometheus
static_configs:
- targets:
- localhost:9090

View File

@@ -0,0 +1,45 @@
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: web
metrics_path: /api/metrics
basic_auth:
username: ${METRICS_USER}
password: ${METRICS_PASS}
static_configs:
- targets:
- hctv:3000
- job_name: chat
metrics_path: /metrics
basic_auth:
username: ${METRICS_USER}
password: ${METRICS_PASS}
static_configs:
- targets:
- chat:8000
- job_name: mediamtx
metrics_path: /metrics
static_configs:
- targets:
- mediamtx:9998
- job_name: redis
metrics_path: /metrics
static_configs:
- targets:
- redis-exporter:9121
- job_name: postgres
metrics_path: /metrics
static_configs:
- targets:
- postgres-exporter:9187
- job_name: prometheus
static_configs:
- targets:
- localhost:9090

27
pnpm-lock.yaml generated
View File

@@ -41,6 +41,9 @@ importers:
hono:
specifier: ^4.7.5
version: 4.11.3
prom-client:
specifier: ^15.1.3
version: 15.1.3
devDependencies:
'@types/node':
specifier: ^20.11.17
@@ -219,6 +222,9 @@ importers:
pg-boss:
specifier: ^10.1.6
version: 10.4.0
prom-client:
specifier: ^15.1.3
version: 15.1.3
react:
specifier: ^19.2.3
version: 19.2.3
@@ -3890,6 +3896,9 @@ packages:
resolution: {integrity: sha512-Ceh+7ox5qe7LJuLHoY0feh3pHuUDHAcRUeyL2VYghZwfpkNIy/+8Ocg0a3UuSoYzavmylwuLWQOf3hl0jjMMIw==}
engines: {node: '>=8'}
bintrees@1.0.2:
resolution: {integrity: sha512-VOMgTMwjAaUG580SXn3LacVgjurrbMme7ZZNYGSSV7mmtY6QQRh0Eg3pwIcntQ77DErK1L0NxkbetjcoXzVwKw==}
bl@5.1.0:
resolution: {integrity: sha512-tv1ZJHLfTDnXE6tMHv73YgSJaWR2AFuPwMntBe7XL/GBFHnT0CLnsHMogfk5+GzCDC5ZWarSCYaIGATZt9dNsQ==}
@@ -6683,6 +6692,10 @@ packages:
resolution: {integrity: sha512-7PiHtLll5LdnKIMw100I+8xJXR5gW2QwWYkT6iJva0bXitZKa/XMrSbdmg3r2Xnaidz9Qumd0VPaMrZlF9V9sA==}
engines: {node: '>=0.4.0'}
prom-client@15.1.3:
resolution: {integrity: sha512-6ZiOBfCywsD4k1BN9IX0uZhF+tJkV8q8llP64G5Hajs4JOeVLPCwpPVcpXy3BwYiUGgyJzsJJQeOIv7+hDSq8g==}
engines: {node: ^16 || ^18 || >=20}
prompts@2.4.2:
resolution: {integrity: sha512-NxNv/kLguCA7p3jE8oL2aEBsrJWgAakBpgmgK6lpPWV+WuOmY6r2/zbAVnP+T8bQlA0nzHXSJSJW0Hq7ylaD2Q==}
engines: {node: '>= 6'}
@@ -7351,6 +7364,9 @@ packages:
resolution: {integrity: sha512-g9ljZiwki/LfxmQADO3dEY1CbpmXT5Hm2fJ+QaGKwSXUylMybePR7/67YW7jOrrvjEgL1Fmz5kzyAjWVWLlucg==}
engines: {node: '>=6'}
tdigest@0.1.2:
resolution: {integrity: sha512-+G0LLgjjo9BZX2MfdvPfH+MKLCrxlXSYec5DaPYP1fe6Iyhf0/fSmJ0bFiZ1F8BT6cGXl2LpltQptzjXKWEkKA==}
terser-webpack-plugin@5.3.16:
resolution: {integrity: sha512-h9oBFCWrq78NyWWVcSwZarJkZ01c2AyGrzs1crmHZO3QUg9D61Wu4NPjBy69n7JqylFF5y+CsUZYmYEIZ3mR+Q==}
engines: {node: '>= 10.13.0'}
@@ -12331,6 +12347,8 @@ snapshots:
binary-extensions@2.3.0: {}
bintrees@1.0.2: {}
bl@5.1.0:
dependencies:
buffer: 6.0.3
@@ -15871,6 +15889,11 @@ snapshots:
progress@2.0.3: {}
prom-client@15.1.3:
dependencies:
'@opentelemetry/api': 1.9.0
tdigest: 0.1.2
prompts@2.4.2:
dependencies:
kleur: 3.0.3
@@ -16813,6 +16836,10 @@ snapshots:
tapable@2.3.0: {}
tdigest@0.1.2:
dependencies:
bintrees: 1.0.2
terser-webpack-plugin@5.3.16(webpack@5.104.1):
dependencies:
'@jridgewell/trace-mapping': 0.3.31