diff --git a/apps/chat/src/index.ts b/apps/chat/src/index.ts index cdc9039..0bde4ef 100644 --- a/apps/chat/src/index.ts +++ b/apps/chat/src/index.ts @@ -302,7 +302,10 @@ async function deleteMessageFromHistory(targetUsername: string, msgId: string): const app = new Hono(); const { injectWebSocket, upgradeWebSocket } = createNodeWebSocket({ app }); if (process.env.NODE_ENV === 'production') { - app.use('/metrics', basicAuth({ username: process.env.METRICS_USER!, password: process.env.METRICS_PASS! })); + app.use( + '/metrics', + basicAuth({ username: process.env.METRICS_USER!, password: process.env.METRICS_PASS! }) + ); } app.get('/', async (c) => { @@ -333,7 +336,7 @@ app.get( const botAuth = c.req.query('botAuth'); if (!token && (!grant || grant === 'null') && !authHeader && !botAuth) { - recordChatConnectionRejected('missing_auth'); + recordChatConnectionRejected(authMethod, 'missing_auth'); ws.close(); return; } @@ -407,14 +410,14 @@ app.get( : null; if (!chatUser && !dbGrant) { - recordChatConnectionRejected('auth_failed'); + recordChatConnectionRejected(authMethod, 'auth_failed'); ws.close(); return; } const { username } = c.req.param(); if (dbGrant && dbGrant.name !== username) { - recordChatConnectionRejected('grant_mismatch'); + recordChatConnectionRejected(authMethod, 'grant_mismatch'); ws.close(); return; } @@ -447,7 +450,7 @@ app.get( }); if (!channel) { - recordChatConnectionRejected(authMethod === 'unknown' ? 'channel_not_found' : authMethod); + recordChatConnectionRejected(authMethod, 'channel_not_found'); ws.close(); return; } @@ -522,7 +525,7 @@ app.get( rateLimitWindowSeconds: moderationSettings.rateLimitWindowSeconds, slowModeSeconds: moderationSettings.slowModeSeconds, }); - + socketState.excludeFromViewerCount = Boolean(dbGrant); socket.send( diff --git a/apps/chat/src/metrics.ts b/apps/chat/src/metrics.ts index b862525..e1208bd 100644 --- a/apps/chat/src/metrics.ts +++ b/apps/chat/src/metrics.ts @@ -31,8 +31,8 @@ function createMetricsStore() { const websocketConnectionAttempts = new Counter({ name: 'hctv_chat_websocket_connection_attempts_total', - help: 'Total websocket connection attempts grouped by outcome and auth method.', - labelNames: ['outcome', 'auth_method'], + help: 'Total websocket connection attempts grouped by outcome, auth method, and rejection reason.', + labelNames: ['outcome', 'auth_method', 'reason'], registers: [register], }); @@ -160,14 +160,18 @@ const metrics = (globalForMetrics.__hctvChatMetrics ??= createMetricsStore()); export const chatMetricsRegistry = metrics.register; export function recordChatConnectionAccepted(channel: string, authMethod: string): void { - metrics.websocketConnectionAttempts.inc({ auth_method: authMethod, outcome: 'accepted' }); + metrics.websocketConnectionAttempts.inc({ + auth_method: authMethod, + outcome: 'accepted', + reason: 'none', + }); metrics.websocketConnections.inc(); metrics.websocketConnectionsByChannel.inc({ channel }); metrics.websocketConnectionsByAuthMethod.inc({ auth_method: authMethod }); } -export function recordChatConnectionRejected(authMethod: string): void { - metrics.websocketConnectionAttempts.inc({ auth_method: authMethod, outcome: 'rejected' }); +export function recordChatConnectionRejected(authMethod: string, reason: string): void { + metrics.websocketConnectionAttempts.inc({ auth_method: authMethod, outcome: 'rejected', reason }); } export function recordChatDisconnect(channel: string, authMethod: string): void { diff --git a/apps/web/src/lib/instrumentation/streamInfo.ts b/apps/web/src/lib/instrumentation/streamInfo.ts index cdfb8c2..b2fc445 100644 --- a/apps/web/src/lib/instrumentation/streamInfo.ts +++ b/apps/web/src/lib/instrumentation/streamInfo.ts @@ -23,6 +23,7 @@ export default async function runner() { await initializeStreamInfo(); await syncStream(); setInterval(syncStream, 5000); + setInterval(refreshPlatformInventory, 60_000); } async function refreshPlatformInventory() { @@ -193,8 +194,6 @@ export async function syncStream() { ); } } - - await refreshPlatformInventory(); }); } catch (error) { console.error('Error syncing stream status:', error); diff --git a/apps/web/src/lib/instrumentation/viewerCountSync.ts b/apps/web/src/lib/instrumentation/viewerCountSync.ts index 5db888a..de0c56e 100644 --- a/apps/web/src/lib/instrumentation/viewerCountSync.ts +++ b/apps/web/src/lib/instrumentation/viewerCountSync.ts @@ -1,6 +1,26 @@ import { getRedisConnection, prisma } from '@hctv/db'; import { setViewerSnapshot, trackWebJob } from '../metrics'; +async function countViewersForChannel(channelName: string): Promise { + const redis = getRedisConnection(); + let cursor = '0'; + let total = 0; + + do { + const [nextCursor, keys] = await redis.scan( + cursor, + 'MATCH', + `viewer:${channelName}:*`, + 'COUNT', + 200 + ); + cursor = nextCursor; + total += keys.length; + } while (cursor !== '0'); + + return total; +} + export async function viewerCountSync() { try { await trackWebJob('viewer_count_sync', async () => { @@ -8,63 +28,70 @@ export async function viewerCountSync() { where: { isLive: true, }, - include: { - channel: true, + select: { + username: true, + streamRegion: true, + channel: { + select: { + name: true, + }, + }, }, }); if (streams.length === 0) { setViewerSnapshot({ - hottestStreamViewers: 0, - streamsWithViewers: 0, totalViewers: 0, trackedStreams: 0, + streamsWithViewers: 0, + hottestStreamViewers: 0, viewersByRegion: {}, }); return; } - const redis = getRedisConnection(); - const multi = redis.multi(); - for (const stream of streams) { - multi.keys(`viewer:${stream.channel.name}:*`); - } - const results = await multi.exec(); + const viewersByRegion: Record = {}; let totalViewers = 0; let streamsWithViewers = 0; let hottestStreamViewers = 0; - const viewersByRegion: Record = {}; - await prisma.$transaction(async (tx) => { - const updates = results?.map((res, index) => { - const count = Array.isArray(res[1]) ? res[1].length : 0; - totalViewers += count; - if (count > 0) { - streamsWithViewers += 1; - } - if (count > hottestStreamViewers) { - hottestStreamViewers = count; - } - const stream = streams[index]; + const streamCounts = await Promise.all( + streams.map(async (stream) => ({ + stream, + count: await countViewersForChannel(stream.channel.name), + })) + ); + + for (const { stream, count } of streamCounts) { + totalViewers += count; + + if (stream.streamRegion) { viewersByRegion[stream.streamRegion] = (viewersByRegion[stream.streamRegion] ?? 0) + count; - return tx.streamInfo.update({ - where: { - username: stream.username, - }, - data: { - viewers: count, - }, - }); + } + + if (count > 0) { + streamsWithViewers += 1; + } + if (count > hottestStreamViewers) { + hottestStreamViewers = count; + } + + await prisma.streamInfo.update({ + where: { + username: stream.username, + }, + data: { + viewers: count, + }, }); - await Promise.all(updates || []); - }); + } setViewerSnapshot({ - hottestStreamViewers, - streamsWithViewers, totalViewers, trackedStreams: streams.length, + streamsWithViewers, + hottestStreamViewers, viewersByRegion, }); }); diff --git a/apps/web/src/lib/instrumentation/writeSessions.ts b/apps/web/src/lib/instrumentation/writeSessions.ts index 585a05e..16a7660 100644 --- a/apps/web/src/lib/instrumentation/writeSessions.ts +++ b/apps/web/src/lib/instrumentation/writeSessions.ts @@ -1,14 +1,29 @@ import { getRedisConnection, prisma } from '@hctv/db'; import { setCacheEntryCount, trackWebJob } from '../metrics'; +async function deleteSessionKeys() { + const redis = getRedisConnection(); + let cursor = '0'; + + do { + const [nextCursor, keys] = await redis.scan(cursor, 'MATCH', 'sessions:*', 'COUNT', 200); + cursor = nextCursor; + + if (keys.length > 0) { + await redis.unlink(...keys); + } + } while (cursor !== '0'); +} + export default async function writeSessions() { return trackWebJob('write_sessions', async () => { const sessions = await prisma.session.findMany(); const sessionIds = sessions.map((session) => session.id); + await deleteSessionKeys(); + const redis = getRedisConnection(); const multi = redis.multi(); - multi.del('sessions:*'); for (const sessionId of sessionIds) { multi.set(`sessions:${sessionId}`, ''); } diff --git a/compose.yml b/compose.yml index e80593a..2c00a4f 100644 --- a/compose.yml +++ b/compose.yml @@ -84,6 +84,8 @@ services: - 'prometheus_data:/prometheus' extra_hosts: - 'host.docker.internal:host-gateway' + ports: + - '9090:9090' grafana: image: 'grafana/grafana:11.6.0' depends_on: @@ -97,6 +99,8 @@ services: - './observability/grafana/provisioning/dashboards:/etc/grafana/provisioning/dashboards:ro' - './observability/grafana/dashboards:/var/lib/grafana/dashboards:ro' - 'grafana_data:/var/lib/grafana' + ports: + - '3001:3000' volumes: hctv_pgdata: diff --git a/dev/docker-compose.yml b/dev/docker-compose.yml index 87f8588..d7816c2 100644 --- a/dev/docker-compose.yml +++ b/dev/docker-compose.yml @@ -3,8 +3,7 @@ services: image: postgres:18-alpine environment: POSTGRES_USER: postgres - # my condolences - POSTGRES_PASSWORD: skbiditoilet + POSTGRES_PASSWORD: ${PG_PASS:-postgres} volumes: - ./psql:/var/lib/postgresql ports: @@ -12,7 +11,7 @@ services: postgres-exporter: image: prometheuscommunity/postgres-exporter:v0.17.1 environment: - DATA_SOURCE_NAME: postgresql://postgres:skbiditoilet@psql:5432/postgres?sslmode=disable + DATA_SOURCE_NAME: postgresql://postgres:${PG_PASS:-postgres}@psql:5432/postgres?sslmode=disable redis: image: redis:7.4-alpine volumes: @@ -62,16 +61,6 @@ services: - grafana_data:/var/lib/grafana ports: - 3001:3000 - # mediamtx2: - # image: bluenviron/mediamtx:latest - # ports: - # - 8990:8890/udp - # - 8991:8891 - # - 9999:9997 - # volumes: - # - ./mediamtx.yml:/mediamtx.yml - # extra_hosts: - # - "host.docker.internal:host-gateway" volumes: prometheus_data: