Skip to content

Commit

Permalink
feat: add missing data in stream detector
Browse files Browse the repository at this point in the history
  • Loading branch information
eugeny-dementev committed Dec 24, 2024
1 parent be1df2e commit 7416eea
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/WebRTCIssueDetector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
} from './detectors';
import { CompositeRTCStatsParser, RTCStatsParser } from './parser';
import createLogger from './utils/logger';
import { MissingStreamDataDetector } from './detectors/MissingStreamDataDetector';

class WebRTCIssueDetector {
readonly eventEmitter: WebRTCIssueEmitter;
Expand Down Expand Up @@ -67,6 +68,7 @@ class WebRTCIssueDetector {
new AvailableOutgoingBitrateIssueDetector(),
new UnknownVideoDecoderImplementationDetector(),
new FrozenVideoTrackDetector(),
new MissingStreamDataDetector(),
];

this.networkScoresCalculator = params.networkScoresCalculator ?? new DefaultNetworkScoresCalculator();
Expand Down
152 changes: 152 additions & 0 deletions src/detectors/MissingStreamDataDetector.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import {
IssueDetectorResult,
IssueReason,
IssueType,
ParsedInboundAudioStreamStats,
ParsedInboundVideoStreamStats,
WebRTCStatsParsed,
} from '../types';
import BaseIssueDetector from './BaseIssueDetector';

interface MissingVideoStreamDetectorParams {
timeoutMs?: number;
}

export class MissingStreamDataDetector extends BaseIssueDetector {
readonly #lastMarkedAt = new Map<string, number>();
readonly #timeoutMs: number;

constructor(params: MissingVideoStreamDetectorParams = {}) {
super();
this.#timeoutMs = params.timeoutMs ?? 10_000;
}

performDetection(data: WebRTCStatsParsed): IssueDetectorResult {
const { connection: { id: connectionId } } = data;
const issues = this.processData(data);
this.setLastProcessedStats(connectionId, data);
return issues;
}

private processData(data: WebRTCStatsParsed): IssueDetectorResult {
const { connection: { id: connectionId } } = data;
const previousStats = this.getLastProcessedStats(connectionId);
const issues: IssueDetectorResult = [];

if (!previousStats) {
return issues;
}

const { video: { inbound: newVideoInbound } } = data;
const { video: { inbound: prevVideoInbound } } = previousStats;
const { audio: { inbound: newAudioInbound } } = data;
const { audio: { inbound: prevAudioInbound } } = previousStats;

const mapVideoStatsByTrackId = (items: ParsedInboundVideoStreamStats[]) => new Map<string, ParsedInboundVideoStreamStats>(
items.map((item) => [item.track.trackIdentifier, item] as const),
);
const mapAudioStatsByTrackId = (items: ParsedInboundAudioStreamStats[]) => new Map<string, ParsedInboundAudioStreamStats>(
items.map((item) => [item.track.trackIdentifier, item] as const),
);

const newVideoInboundByTrackId = mapVideoStatsByTrackId(newVideoInbound);
const prevVideoInboundByTrackId = mapVideoStatsByTrackId(prevVideoInbound);
const newAudioInboundByTrackId = mapAudioStatsByTrackId(newAudioInbound);
const prevAudioInboundByTrackId = mapAudioStatsByTrackId(prevAudioInbound);
const unvisitedTrackIds = new Set(this.#lastMarkedAt.keys());

Array.from(newVideoInboundByTrackId.entries()).forEach(([trackId, newInboundItem]) => {
unvisitedTrackIds.delete(trackId);

const prevInboundItem = prevVideoInboundByTrackId.get(trackId);
if (!prevInboundItem) {
return;
}

const deltaFramesReceived = newInboundItem.framesReceived - prevInboundItem.framesReceived;

if (deltaFramesReceived === 0 && !newInboundItem.track.detached && !newInboundItem.track.ended) {
const hasIssue = this.markIssue(trackId);

if (!hasIssue) {
return;
}

const statsSample = {
framesReceived: newInboundItem.framesReceived,
framesDropped: newInboundItem.framesDropped,
trackDetached: newInboundItem.track.detached,
trackEnded: newInboundItem.track.ended,
};

issues.push({
type: IssueType.Stream,
reason: IssueReason.MissingVideoStreamData,
statsSample,
});
} else {
this.removeMarkIssue(trackId);
}
});

Array.from(newAudioInboundByTrackId.entries()).forEach(([trackId, newInboundItem]) => {
unvisitedTrackIds.delete(trackId);

const prevInboundItem = prevAudioInboundByTrackId.get(trackId);
if (!prevInboundItem) {
return;
}

const deltaFramesReceived = newInboundItem.bytesReceived - prevInboundItem.bytesReceived;

if (deltaFramesReceived === 0 && !newInboundItem.track.detached && !newInboundItem.track.ended) {
const hasIssue = this.markIssue(trackId);

if (!hasIssue) {
return;
}

const statsSample = {
bytesReceived: newInboundItem.bytesReceived,
packetsDiscarded: newInboundItem.packetsDiscarded,
trackDetached: newInboundItem.track.detached,
trackEnded: newInboundItem.track.ended,
};

issues.push({
type: IssueType.Stream,
reason: IssueReason.MissingAudioStreamData,
statsSample,
});
} else {
this.removeMarkIssue(trackId);
}
});

unvisitedTrackIds.forEach((trackId) => {
const lastMarkedAt = this.#lastMarkedAt.get(trackId);
if (lastMarkedAt && Date.now() - lastMarkedAt > this.#timeoutMs) {
this.removeMarkIssue(trackId);
}
});

return issues;
}

private markIssue(trackId: string): boolean {
const now = Date.now();
const lastMarkedAt = this.#lastMarkedAt.get(trackId);

if (!lastMarkedAt || now - lastMarkedAt > this.#timeoutMs) {
this.#lastMarkedAt.set(trackId, now);
return true;
}

return false;
}

private removeMarkIssue(trackId: string): void {
this.#lastMarkedAt.delete(trackId);
}
}

2 changes: 2 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ export enum IssueReason {
LowInboundMOS = 'low-inbound-mean-opinion-score',
LowOutboundMOS = 'low-outbound-mean-opinion-score',
FrozenVideoTrack = 'frozen-video-track',
MissingVideoStreamData = 'missing-video-stream-data',
MissingAudioStreamData = 'missing-audio-stream-data',
}

export type IssuePayload = {
Expand Down

0 comments on commit 7416eea

Please sign in to comment.