feat(metrics): add more production ready metrics

This commit is contained in:
2026-03-06 21:30:14 +01:00
parent 527155a0c1
commit 07eefcf9c7
10 changed files with 1161 additions and 72 deletions

View File

@@ -12,7 +12,12 @@ import {
recordChatError,
recordChatModerationBlock,
recordDeliveredChatMessage,
recordDeliveredChatMessageBytes,
recordHistoryMessagesLoaded,
recordIncomingChatMessage,
recordUniqueChatter,
setChannelHistorySize,
setChatModerationState,
startChatMessageTimer,
} from './metrics.js';
import { getPersonalChannel } from './utils/personalChannel.js';
@@ -476,8 +481,17 @@ app.get(
socketState.isModerator = isModerator;
socket.metricsTracked = true;
socketState.metricsTracked = true;
socket.metricsAuthMethod = authMethod;
socketState.metricsAuthMethod = authMethod;
recordChatConnectionAccepted(authMethod);
recordChatConnectionAccepted(username, authMethod);
setChatModerationState(username, {
blockedTerms: moderationSettings.blockedTerms.length,
maxMessageLength: moderationSettings.maxMessageLength,
rateLimitCount: moderationSettings.rateLimitCount,
rateLimitWindowSeconds: moderationSettings.rateLimitWindowSeconds,
slowModeSeconds: moderationSettings.slowModeSeconds,
});
socket.send(
JSON.stringify({
@@ -507,6 +521,7 @@ app.get(
const messages = await redis.zrange(channelKey, 0, MESSAGE_HISTORY_SIZE - 1);
if (messages.length > 0) {
recordHistoryMessagesLoaded(username, messages.length);
socket.send(
JSON.stringify({
type: 'history',
@@ -514,6 +529,7 @@ app.get(
})
);
}
setChannelHistorySize(username, messages.length);
},
async onClose(evt, ws) {
const socket = ws as unknown as ChatSocket;
@@ -522,7 +538,10 @@ app.get(
if (!socketState.targetUsername) return;
if (socketState.metricsTracked) {
recordChatDisconnect();
recordChatDisconnect(
socketState.targetUsername,
socketState.metricsAuthMethod ?? 'unknown'
);
socketState.metricsTracked = false;
}
@@ -547,9 +566,10 @@ app.get(
try {
const socket = ws as unknown as ChatSocket;
const socketState = resolveSocketState(socket);
const msg = JSON.parse(evt.data.toString()) as IncomingMessage;
const rawPayload = evt.data.toString();
const msg = JSON.parse(rawPayload) as IncomingMessage;
messageType = typeof msg.type === 'string' ? msg.type : 'unknown';
recordIncomingChatMessage(messageType);
recordIncomingChatMessage(messageType, Buffer.byteLength(rawPayload));
stopTimer = startChatMessageTimer(messageType);
if (msg.type === 'ping') {
@@ -707,9 +727,16 @@ app.get(
await redis.zadd(channelKey, Date.now(), redisStr);
await redis.zremrangebyrank(channelKey, 0, -MESSAGE_HISTORY_SIZE - 1);
await redis.expire(channelKey, MESSAGE_TTL);
const historySize = await redis.zcard(channelKey);
setChannelHistorySize(targetUsername, historySize);
broadcastToChannel(targetUsername, socket, msgObj as unknown as Record<string, unknown>);
recordDeliveredChatMessage(chatUser.isBot ? 'bot' : 'user');
recordDeliveredChatMessageBytes(
chatUser.isBot ? 'bot' : 'user',
Buffer.byteLength(message)
);
recordUniqueChatter(chatUser.isBot ? 'bot' : 'user');
outcome = 'broadcast';
}
if (msg.type === 'emojiMsg') {

View File

@@ -15,6 +15,20 @@ function createMetricsStore() {
registers: [register],
});
const websocketConnectionsByChannel = new Gauge({
name: 'hctv_chat_websocket_connections_by_channel',
help: 'Current number of active chat websocket connections by target channel.',
labelNames: ['channel'],
registers: [register],
});
const websocketConnectionsByAuthMethod = new Gauge({
name: 'hctv_chat_websocket_connections_by_auth_method',
help: 'Current number of active chat websocket connections by auth method.',
labelNames: ['auth_method'],
registers: [register],
});
const websocketConnectionAttempts = new Counter({
name: 'hctv_chat_websocket_connection_attempts_total',
help: 'Total websocket connection attempts grouped by outcome and auth method.',
@@ -29,6 +43,13 @@ function createMetricsStore() {
registers: [register],
});
const inboundPayloadBytes = new Counter({
name: 'hctv_chat_inbound_payload_bytes_total',
help: 'Total inbound websocket payload bytes grouped by message type.',
labelNames: ['type'],
registers: [register],
});
const messageDuration = new Histogram({
name: 'hctv_chat_message_duration_seconds',
help: 'Chat websocket message processing time in seconds.',
@@ -44,6 +65,41 @@ function createMetricsStore() {
registers: [register],
});
const deliveredMessageBytes = new Counter({
name: 'hctv_chat_message_bytes_delivered_total',
help: 'Total message body bytes successfully broadcast, grouped by sender type.',
labelNames: ['sender_type'],
registers: [register],
});
const channelHistorySize = new Gauge({
name: 'hctv_chat_channel_history_size',
help: 'Current number of messages retained in Redis history for a channel.',
labelNames: ['channel'],
registers: [register],
});
const channelHistoryLoadedMessages = new Counter({
name: 'hctv_chat_history_messages_loaded_total',
help: 'Total history messages loaded from Redis during websocket joins.',
labelNames: ['channel'],
registers: [register],
});
const moderationState = new Gauge({
name: 'hctv_chat_moderation_state',
help: 'Current moderation settings by channel.',
labelNames: ['channel', 'setting'],
registers: [register],
});
const channelUniqueChatters = new Counter({
name: 'hctv_chat_unique_chatters_total',
help: 'Users who successfully sent at least one chat message, grouped by sender type.',
labelNames: ['sender_type'],
registers: [register],
});
const moderationActions = new Counter({
name: 'hctv_chat_moderation_actions_total',
help: 'Successful moderation actions performed in chat.',
@@ -67,14 +123,22 @@ function createMetricsStore() {
return {
deliveredMessages,
deliveredMessageBytes,
channelHistoryLoadedMessages,
channelHistorySize,
errors,
inboundPayloadBytes,
incomingMessages,
messageDuration,
moderationActions,
moderationBlocks,
moderationState,
register,
channelUniqueChatters,
websocketConnectionAttempts,
websocketConnections,
websocketConnectionsByAuthMethod,
websocketConnectionsByChannel,
};
}
@@ -86,21 +150,26 @@ const metrics = (globalForMetrics.__hctvChatMetrics ??= createMetricsStore());
export const chatMetricsRegistry = metrics.register;
export function recordChatConnectionAccepted(authMethod: string): void {
export function recordChatConnectionAccepted(channel: string, authMethod: string): void {
metrics.websocketConnectionAttempts.inc({ auth_method: authMethod, outcome: 'accepted' });
metrics.websocketConnections.inc();
metrics.websocketConnectionsByChannel.inc({ channel });
metrics.websocketConnectionsByAuthMethod.inc({ auth_method: authMethod });
}
export function recordChatConnectionRejected(authMethod: string): void {
metrics.websocketConnectionAttempts.inc({ auth_method: authMethod, outcome: 'rejected' });
}
export function recordChatDisconnect(): void {
export function recordChatDisconnect(channel: string, authMethod: string): void {
metrics.websocketConnections.dec();
metrics.websocketConnectionsByChannel.dec({ channel });
metrics.websocketConnectionsByAuthMethod.dec({ auth_method: authMethod });
}
export function recordIncomingChatMessage(type: string): void {
export function recordIncomingChatMessage(type: string, payloadBytes: number): void {
metrics.incomingMessages.inc({ type });
metrics.inboundPayloadBytes.inc({ type }, payloadBytes);
}
export function startChatMessageTimer(type: string) {
@@ -111,6 +180,47 @@ export function recordDeliveredChatMessage(senderType: string): void {
metrics.deliveredMessages.inc({ sender_type: senderType });
}
export function recordDeliveredChatMessageBytes(senderType: string, bytes: number): void {
metrics.deliveredMessageBytes.inc({ sender_type: senderType }, bytes);
}
export function setChannelHistorySize(channel: string, size: number): void {
metrics.channelHistorySize.set({ channel }, size);
}
export function recordHistoryMessagesLoaded(channel: string, count: number): void {
if (count > 0) {
metrics.channelHistoryLoadedMessages.inc({ channel }, count);
}
}
export function setChatModerationState(
channel: string,
settings: {
blockedTerms: number;
maxMessageLength: number;
rateLimitCount: number;
rateLimitWindowSeconds: number;
slowModeSeconds: number;
}
): void {
metrics.moderationState.set({ channel, setting: 'blocked_terms' }, settings.blockedTerms);
metrics.moderationState.set({ channel, setting: 'slow_mode_seconds' }, settings.slowModeSeconds);
metrics.moderationState.set(
{ channel, setting: 'max_message_length' },
settings.maxMessageLength
);
metrics.moderationState.set({ channel, setting: 'rate_limit_count' }, settings.rateLimitCount);
metrics.moderationState.set(
{ channel, setting: 'rate_limit_window_seconds' },
settings.rateLimitWindowSeconds
);
}
export function recordUniqueChatter(senderType: string): void {
metrics.channelUniqueChatters.inc({ sender_type: senderType });
}
export function recordChatModerationAction(action: string): void {
metrics.moderationActions.inc({ action });
}

View File

@@ -40,6 +40,7 @@ export interface ChatSocket {
viewerId?: string;
isModerator?: boolean;
metricsTracked?: boolean;
metricsAuthMethod?: string;
raw?:
| (ModifiedWebSocket & {
targetUsername?: string;
@@ -48,6 +49,7 @@ export interface ChatSocket {
personalChannel?: any;
isModerator?: boolean;
metricsTracked?: boolean;
metricsAuthMethod?: string;
})
| null;
}

View File

@@ -1,5 +1,5 @@
import { prisma } from '@hctv/db';
import { recordThumbnailJobsEnqueued, trackWebJob } from '../metrics';
import { recordThumbnailJobsEnqueued, setThumbnailRefreshTargets, trackWebJob } from '../metrics';
import { getThumbnailQueue } from '../workers';
export default async function getLiveThumb() {
@@ -15,6 +15,8 @@ export default async function getLiveThumb() {
const thumbQueue = getThumbnailQueue();
const jobsByRegion: Record<string, number> = {};
setThumbnailRefreshTargets(liveChannels.length);
for (const liveChannel of liveChannels) {
await thumbQueue.add('getLiveThumb', {
name: liveChannel.channel.name,

View File

@@ -1,5 +1,13 @@
import { prisma } from '@hctv/db';
import { setLiveStreamsByRegion, trackWebJob } from '../metrics';
import {
recordLiveStreamTransition,
recordNotificationsEnqueued,
recordStreamSyncScrape,
setLiveStreamsByRegion,
setPlatformInventory,
setStreamPathsByRegion,
trackWebJob,
} from '../metrics';
import { HttpFlv } from '../types/liveBackendJson';
import { getNotificationQueue } from '../workers';
import client from '../services/slackNotifier';
@@ -11,11 +19,30 @@ export default async function runner() {
if ((await prisma.user.count()) === 0) {
return;
}
await refreshPlatformInventory();
await initializeStreamInfo();
await syncStream();
setInterval(syncStream, 5000);
}
async function refreshPlatformInventory() {
const [channels, liveStreams, follows, botAccounts, users] = await Promise.all([
prisma.channel.count(),
prisma.streamInfo.count({ where: { isLive: true } }),
prisma.follow.count(),
prisma.botAccount.count(),
prisma.user.count(),
]);
setPlatformInventory({
bot_accounts: botAccounts,
channels,
follows,
live_stream_rows: liveStreams,
users,
});
}
export async function initializeStreamInfo(channelId?: string) {
const channels = await prisma.channel.findMany({
where: {
@@ -58,18 +85,22 @@ export async function syncStream() {
const allActiveStreams = new Map<string, keyof typeof MEDIAMTX_SERVER_REGIONS>();
const liveStreamsByRegion = Object.fromEntries(regions.map((region) => [region, 0]));
const pathsSeenByRegion = Object.fromEntries(regions.map((region) => [region, 0]));
for (const r of regions) {
const region = MEDIAMTX_SERVER_REGIONS[r];
const response = await fetch(`${region.apiUrl}/v3/paths/list?itemsPerPage=1000`);
if (!response.ok) {
recordStreamSyncScrape(r, 'error');
console.error(
`Failed to fetch ${r} stream stats: ${response.status} ${response.statusText}`
);
continue;
}
recordStreamSyncScrape(r, 'success');
type ResponseType =
paths['/v3/paths/list']['get']['responses']['200']['content']['application/json'];
const data = (await response.json()) as ResponseType;
@@ -79,12 +110,14 @@ export async function syncStream() {
if (stream.ready && stream.name) {
allActiveStreams.set(stream.name, r);
liveStreamsByRegion[r] += 1;
pathsSeenByRegion[r] += 1;
}
}
}
}
setLiveStreamsByRegion(liveStreamsByRegion);
setStreamPathsByRegion(pathsSeenByRegion);
const currentLiveStreams = await prisma.streamInfo.findMany({
where: { isLive: true },
@@ -92,6 +125,7 @@ export async function syncStream() {
for (const dbStream of currentLiveStreams) {
if (!allActiveStreams.has(dbStream.username)) {
recordLiveStreamTransition('offline', dbStream.streamRegion);
await prisma.streamInfo.update({
where: { username: dbStream.username },
data: {
@@ -111,6 +145,7 @@ export async function syncStream() {
if (existingStream && !existingStream.isLive) {
console.log(`Stream ${username} is now live in region ${regionKey}`);
recordLiveStreamTransition('online', regionKey);
await prisma.streamInfo.update({
where: { username },
data: {
@@ -131,7 +166,6 @@ export async function syncStream() {
});
const queue = getNotificationQueue();
if (!existingStream.channel.is247) {
queue.add(`streamStartChannel:${existingStream.username}`, {
text: `${existingStream.username} is now *live*, streaming *${existingStream.title}* (${existingStream.category})!\n<https://hackclub.tv/${existingStream.username}|Go check them out>`,
@@ -149,8 +183,18 @@ export async function syncStream() {
});
}
}
recordNotificationsEnqueued('channel', existingStream.channel.is247 ? 0 : 1);
recordNotificationsEnqueued(
'dm',
existingStream.enableNotifications && !existingStream.channel.is247
? subscribedFollowers.length
: 0
);
}
}
await refreshPlatformInventory();
});
} catch (error) {
console.error('Error syncing stream status:', error);

View File

@@ -1,5 +1,5 @@
import { prisma, getRedisConnection } from '@hctv/db';
import { trackWebJob } from '../metrics';
import { setCacheEntryCount, trackWebJob } from '../metrics';
export default async function syncStreamKeys() {
console.log('Syncing stream keys to Redis...');
@@ -26,6 +26,7 @@ export default async function syncStreamKeys() {
}
await pipeline.exec();
setCacheEntryCount('stream_keys', keys.length);
console.log(`Synced ${keys.length} stream keys to Redis`);
});
} catch (error) {

View File

@@ -14,7 +14,13 @@ export async function viewerCountSync() {
});
if (streams.length === 0) {
setViewerSnapshot(0, 0);
setViewerSnapshot({
hottestStreamViewers: 0,
streamsWithViewers: 0,
totalViewers: 0,
trackedStreams: 0,
viewersByRegion: {},
});
return;
}
@@ -25,12 +31,23 @@ export async function viewerCountSync() {
}
const results = await multi.exec();
let totalViewers = 0;
let streamsWithViewers = 0;
let hottestStreamViewers = 0;
const viewersByRegion: Record<string, number> = {};
await prisma.$transaction(async (tx) => {
const updates = results?.map((res, index) => {
const count = Array.isArray(res[1]) ? res[1].length : 0;
totalViewers += count;
if (count > 0) {
streamsWithViewers += 1;
}
if (count > hottestStreamViewers) {
hottestStreamViewers = count;
}
const stream = streams[index];
viewersByRegion[stream.streamRegion] =
(viewersByRegion[stream.streamRegion] ?? 0) + count;
return tx.streamInfo.update({
where: {
username: stream.username,
@@ -43,7 +60,13 @@ export async function viewerCountSync() {
await Promise.all(updates || []);
});
setViewerSnapshot(totalViewers, streams.length);
setViewerSnapshot({
hottestStreamViewers,
streamsWithViewers,
totalViewers,
trackedStreams: streams.length,
viewersByRegion,
});
});
} catch (error) {
console.error('Error syncing viewer counts:', error);

View File

@@ -1,5 +1,5 @@
import { getRedisConnection, prisma } from '@hctv/db';
import { trackWebJob } from '../metrics';
import { setCacheEntryCount, trackWebJob } from '../metrics';
export default async function writeSessions() {
return trackWebJob('write_sessions', async () => {
@@ -13,6 +13,7 @@ export default async function writeSessions() {
multi.set(`sessions:${sessionId}`, '');
}
await multi.exec();
setCacheEntryCount('sessions', sessionIds.length);
console.log('Sessions written to Redis');
});

View File

@@ -31,18 +31,58 @@ function createMetricsStore() {
registers: [register],
});
const streamPathsSeen = new Gauge({
name: 'hctv_web_stream_paths_seen',
help: 'Current number of ready MediaMTX paths seen during the latest sync.',
labelNames: ['region'],
registers: [register],
});
const liveStreamTransitions = new Counter({
name: 'hctv_web_live_stream_transitions_total',
help: 'Live stream state transitions observed by the web app.',
labelNames: ['transition', 'region'],
registers: [register],
});
const streamSyncScrapes = new Counter({
name: 'hctv_web_stream_sync_scrapes_total',
help: 'MediaMTX region scrapes attempted by stream sync.',
labelNames: ['region', 'status'],
registers: [register],
});
const activeViewers = new Gauge({
name: 'hctv_web_active_viewers',
help: 'Current number of active viewers across all live streams.',
registers: [register],
});
const activeViewersByRegion = new Gauge({
name: 'hctv_web_active_viewers_by_region',
help: 'Current number of active viewers grouped by stream region.',
labelNames: ['region'],
registers: [register],
});
const viewerCountTrackedStreams = new Gauge({
name: 'hctv_web_viewer_count_tracked_streams',
help: 'Number of live streams included in the latest viewer sync.',
registers: [register],
});
const streamsWithViewers = new Gauge({
name: 'hctv_web_streams_with_viewers',
help: 'Current number of live streams with at least one viewer.',
registers: [register],
});
const hottestStreamViewers = new Gauge({
name: 'hctv_web_hottest_stream_viewers',
help: 'Current viewer count of the most watched live stream.',
registers: [register],
});
const thumbnailJobsEnqueued = new Counter({
name: 'hctv_web_thumbnail_jobs_enqueued_total',
help: 'Total thumbnail refresh jobs enqueued by region.',
@@ -50,6 +90,33 @@ function createMetricsStore() {
registers: [register],
});
const thumbnailRefreshTargets = new Gauge({
name: 'hctv_web_thumbnail_refresh_targets',
help: 'Number of live streams targeted in the latest thumbnail refresh run.',
registers: [register],
});
const notificationsEnqueued = new Counter({
name: 'hctv_web_notifications_enqueued_total',
help: 'Notification jobs enqueued when streams go live.',
labelNames: ['target'],
registers: [register],
});
const cacheEntries = new Gauge({
name: 'hctv_web_cache_entries',
help: 'Current number of records mirrored into Redis by cache-sync jobs.',
labelNames: ['cache'],
registers: [register],
});
const platformInventory = new Gauge({
name: 'hctv_web_platform_inventory',
help: 'High-level counts of important platform records.',
labelNames: ['entity'],
registers: [register],
});
const mediamtxAuthRequests = new Counter({
name: 'hctv_web_mediamtx_auth_requests_total',
help: 'Total MediaMTX auth decisions handled by the web app.',
@@ -68,11 +135,21 @@ function createMetricsStore() {
return {
register,
activeViewers,
activeViewersByRegion,
backgroundJobDuration,
backgroundJobRuns,
cacheEntries,
hottestStreamViewers,
liveStreams,
liveStreamTransitions,
mediamtxAuthDuration,
mediamtxAuthRequests,
notificationsEnqueued,
platformInventory,
streamPathsSeen,
streamsWithViewers,
streamSyncScrapes,
thumbnailRefreshTargets,
thumbnailJobsEnqueued,
viewerCountTrackedStreams,
};
@@ -109,9 +186,38 @@ export function setLiveStreamsByRegion(streamsByRegion: Record<string, number>):
}
}
export function setViewerSnapshot(totalViewers: number, trackedStreams: number): void {
metrics.activeViewers.set(totalViewers);
metrics.viewerCountTrackedStreams.set(trackedStreams);
export function setStreamPathsByRegion(pathsByRegion: Record<string, number>): void {
metrics.streamPathsSeen.reset();
for (const [region, count] of Object.entries(pathsByRegion)) {
metrics.streamPathsSeen.set({ region }, count);
}
}
export function recordLiveStreamTransition(transition: 'online' | 'offline', region: string): void {
metrics.liveStreamTransitions.inc({ transition, region });
}
export function recordStreamSyncScrape(region: string, status: 'success' | 'error'): void {
metrics.streamSyncScrapes.inc({ region, status });
}
export function setViewerSnapshot(snapshot: {
totalViewers: number;
trackedStreams: number;
viewersByRegion: Record<string, number>;
streamsWithViewers: number;
hottestStreamViewers: number;
}): void {
metrics.activeViewers.set(snapshot.totalViewers);
metrics.viewerCountTrackedStreams.set(snapshot.trackedStreams);
metrics.streamsWithViewers.set(snapshot.streamsWithViewers);
metrics.hottestStreamViewers.set(snapshot.hottestStreamViewers);
metrics.activeViewersByRegion.reset();
for (const [region, count] of Object.entries(snapshot.viewersByRegion)) {
metrics.activeViewersByRegion.set({ region }, count);
}
}
export function recordThumbnailJobsEnqueued(jobsByRegion: Record<string, number>): void {
@@ -122,6 +228,28 @@ export function recordThumbnailJobsEnqueued(jobsByRegion: Record<string, number>
}
}
export function setThumbnailRefreshTargets(count: number): void {
metrics.thumbnailRefreshTargets.set(count);
}
export function recordNotificationsEnqueued(target: 'channel' | 'dm', count: number): void {
if (count > 0) {
metrics.notificationsEnqueued.inc({ target }, count);
}
}
export function setCacheEntryCount(cache: 'sessions' | 'stream_keys', count: number): void {
metrics.cacheEntries.set({ cache }, count);
}
export function setPlatformInventory(snapshot: Record<string, number>): void {
metrics.platformInventory.reset();
for (const [entity, count] of Object.entries(snapshot)) {
metrics.platformInventory.set({ entity }, count);
}
}
export function recordMediamtxAuth(
action: string,
protocol: string,

File diff suppressed because it is too large Load Diff