Skip to content

Commit

Permalink
fix: role change not publishing/unpublishing when happened during off…
Browse files Browse the repository at this point in the history
…line -> online
  • Loading branch information
raviteja83 authored Jul 9, 2024
1 parent cb98240 commit 2a68b89
Showing 1 changed file with 38 additions and 108 deletions.
146 changes: 38 additions & 108 deletions packages/hms-video-store/src/transport/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { EventEmitter2 as EventEmitter } from 'eventemitter2';
import { JoinParameters } from './models/JoinParameters';
import { TransportFailureCategory } from './models/TransportFailureCategory';
import { TransportState } from './models/TransportState';
Expand Down Expand Up @@ -49,17 +50,9 @@ import {
} from '../utils/constants';
import HMSLogger from '../utils/logger';
import { getNetworkInfo } from '../utils/network-info';
import { PromiseCallbacks } from '../utils/promise';

const TAG = '[HMSTransport]:';

// @DISCUSS: action and extra are not used at all.
interface CallbackTriple {
promise: PromiseCallbacks<boolean>;
action: HMSAction;
extra: any;
}

interface NegotiateJoinParams {
name: string;
data: string;
Expand All @@ -80,6 +73,7 @@ export default class HMSTransport {
private subscribeStatsAnalytics?: SubscribeStatsAnalytics;
private maxSubscribeBitrate = 0;
private connectivityListener?: HMSDiagnosticsConnectivityListener;
private eventEmitter = new EventEmitter();
joinRetryCount = 0;

constructor(
Expand Down Expand Up @@ -115,13 +109,6 @@ export default class HMSTransport {
this.eventBus.localVideoEnabled.subscribe(({ track }) => this.trackUpdate(track));
}

/**
* Map of callbacks used to wait for an event to fire.
* Used here for:
* 1. publish/unpublish waits for [IPublishConnectionObserver.onRenegotiationNeeded] to complete
*/
private readonly callbacks = new Map<string, CallbackTriple>();

private signalObserver: ISignalEventsObserver = {
onOffer: async (jsep: RTCSessionDescriptionInit) => {
try {
Expand Down Expand Up @@ -348,13 +335,8 @@ export default class HMSTransport {
}

if (newState === 'connected') {
const callback = this.callbacks.get(SUBSCRIBE_ICE_CONNECTION_CALLBACK_ID);
this.callbacks.delete(SUBSCRIBE_ICE_CONNECTION_CALLBACK_ID);

this.connectivityListener?.onICESuccess(false);
if (callback) {
callback.promise.resolve(true);
}
this.eventEmitter.emit(SUBSCRIBE_ICE_CONNECTION_CALLBACK_ID, true);
}
},

Expand Down Expand Up @@ -619,46 +601,31 @@ export default class HMSTransport {
`${track}`,
);
this.trackStates.set(track.publishedTrackId, new TrackState(track));
const p = new Promise<boolean>((resolve, reject) => {
this.callbacks.set(RENEGOTIATION_CALLBACK_ID, {
promise: { resolve, reject },
action: HMSAction.PUBLISH,
extra: {},
});
}).catch(err => {
if (err instanceof HMSException && err.code === 1003) {
throw err;
}
});
const stream = track.stream as HMSLocalStream;
stream.setConnection(this.publishConnection!);
const simulcastLayers = this.store.getSimulcastLayers(track.source!);
stream.addTransceiver(track, simulcastLayers);
HMSLogger.time(`publish-${track.trackId}-${track.type}`);
try {
await p;
HMSLogger.timeEnd(`publish-${track.trackId}-${track.type}`);
// add track to store after publish
this.store.addTrack(track);

await stream
.setMaxBitrateAndFramerate(track)
.then(() => {
HMSLogger.d(
TAG,
`Setting maxBitrate=${track.settings.maxBitrate} kpbs${
track instanceof HMSLocalVideoTrack ? ` and maxFramerate=${track.settings.maxFramerate}` : ''
} for ${track.source} ${track.type} ${track.trackId}`,
);
})
.catch(error => HMSLogger.w(TAG, 'Failed setting maxBitrate and maxFramerate', error));

track.isPublished = true;
// @ts-ignore
await this.eventEmitter.waitFor(RENEGOTIATION_CALLBACK_ID, { handleError: true });
HMSLogger.timeEnd(`publish-${track.trackId}-${track.type}`);
// add track to store after publish
this.store.addTrack(track);

await stream
.setMaxBitrateAndFramerate(track)
.then(() => {
HMSLogger.d(
TAG,
`Setting maxBitrate=${track.settings.maxBitrate} kpbs${
track instanceof HMSLocalVideoTrack ? ` and maxFramerate=${track.settings.maxFramerate}` : ''
} for ${track.source} ${track.type} ${track.trackId}`,
);
})
.catch(error => HMSLogger.w(TAG, 'Failed setting maxBitrate and maxFramerate', error));

HMSLogger.d(TAG, `✅ publishTrack: trackId=${track.trackId}`, `${track}`, this.callbacks);
} catch (err) {
HMSLogger.e(TAG, 'Failed publishing track, will be retried', err);
}
track.isPublished = true;
HMSLogger.d(TAG, `✅ publishTrack: trackId=${track.trackId}`, `${track}`);
}

private async unpublishTrack(track: HMSLocalTrack): Promise<void> {
Expand All @@ -677,28 +644,14 @@ export default class HMSTransport {
this.trackStates.delete(originalTrackState.track_id);
}
}
const p = new Promise<boolean>((resolve, reject) => {
this.callbacks.set(RENEGOTIATION_CALLBACK_ID, {
promise: { resolve, reject },
action: HMSAction.UNPUBLISH,
extra: {},
});
}).catch(err => {
if (err instanceof HMSException && err.code === 1003) {
// do nothing, it will be resolved when network connected
}
});
const stream = track.stream as HMSLocalStream;
stream.removeSender(track);
try {
await p;
await track.cleanup();
// remove track from store on unpublish
this.store.removeTrack(track);
HMSLogger.d(TAG, `✅ unpublishTrack: trackId=${track.trackId}`, this.callbacks);
} catch (ex) {
HMSLogger.e(TAG, `Failed unpublishingTrack: trackId=${track.trackId}, will be retried`, ex);
}
// @ts-ignore
await this.eventEmitter.waitFor(RENEGOTIATION_CALLBACK_ID, { handleError: true });
await track.cleanup();
// remove track from store on unpublish
this.store.removeTrack(track);
HMSLogger.d(TAG, `✅ unpublishTrack: trackId=${track.trackId}`);
}

private waitForLocalRoleAvailability() {
Expand Down Expand Up @@ -880,10 +833,6 @@ export default class HMSTransport {

private async performPublishRenegotiation(constraints?: RTCOfferOptions) {
HMSLogger.d(TAG, `⏳ [role=PUBLISH] onRenegotiationNeeded START`, this.trackStates);
const callback = this.callbacks.get(RENEGOTIATION_CALLBACK_ID);
if (!callback) {
return;
}

if (!this.publishConnection) {
HMSLogger.e(TAG, 'Publish peer connection not found, cannot renegotiate');
Expand All @@ -895,10 +844,9 @@ export default class HMSTransport {
await this.publishConnection.setLocalDescription(offer);
HMSLogger.time(`renegotiation-offer-exchange`);
const answer = await this.signal.offer(offer, this.trackStates);
this.callbacks.delete(RENEGOTIATION_CALLBACK_ID);
HMSLogger.timeEnd(`renegotiation-offer-exchange`);
await this.publishConnection.setRemoteDescription(answer);
callback.promise.resolve(true);
this.eventEmitter.emit(RENEGOTIATION_CALLBACK_ID);
HMSLogger.d(TAG, `[role=PUBLISH] onRenegotiationNeeded DONE ✅`);
} catch (err) {
let ex: HMSException;
Expand All @@ -907,7 +855,7 @@ export default class HMSTransport {
} else {
ex = ErrorFactory.GenericErrors.Unknown(HMSAction.PUBLISH, (err as Error).message);
}
callback!.promise.reject(ex);
this.eventEmitter.emit(RENEGOTIATION_CALLBACK_ID, ex);
HMSLogger.d(TAG, `[role=PUBLISH] onRenegotiationNeeded FAILED ❌`);
}
}
Expand Down Expand Up @@ -1096,36 +1044,23 @@ export default class HMSTransport {
* Do iceRestart only if not connected
*/
if (this.publishConnection) {
const p = new Promise<boolean>((resolve, reject) => {
this.callbacks.set(RENEGOTIATION_CALLBACK_ID, {
promise: { resolve, reject },
action: HMSAction.RESTART_ICE,
extra: {},
});
});
await this.performPublishRenegotiation({ iceRestart: this.publishConnection.connectionState !== 'connected' });
await p;
this.performPublishRenegotiation({ iceRestart: this.publishConnection.connectionState !== 'connected' });
// @ts-ignore
await this.eventEmitter.waitFor(RENEGOTIATION_CALLBACK_ID, { handleError: true });
}

return true;
};

private retrySubscribeIceFailedTask = async () => {
if (this.subscribeConnection && this.subscribeConnection.connectionState !== 'connected') {
const p = new Promise<boolean>((resolve, reject) => {
// Use subscribe constant string
this.callbacks.set(SUBSCRIBE_ICE_CONNECTION_CALLBACK_ID, {
promise: { resolve, reject },
action: HMSAction.RESTART_ICE,
extra: {},
});
});
const p = this.eventEmitter.waitFor(SUBSCRIBE_ICE_CONNECTION_CALLBACK_ID);

const timeout = new Promise(resolve => {
setTimeout(resolve, SUBSCRIBE_TIMEOUT, false);
const timeout = new Promise<boolean>(resolve => {
setTimeout(() => resolve(false), SUBSCRIBE_TIMEOUT);
});

return Promise.race([p, timeout]) as Promise<boolean>;
return Promise.race([p.then(value => value[0] as boolean), timeout]);
}

return true;
Expand Down Expand Up @@ -1155,12 +1090,7 @@ export default class HMSTransport {

private handleSubscribeConnectionConnected() {
this.subscribeConnection?.handleSelectedIceCandidatePairs();
const callback = this.callbacks.get(SUBSCRIBE_ICE_CONNECTION_CALLBACK_ID);
this.callbacks.delete(SUBSCRIBE_ICE_CONNECTION_CALLBACK_ID);

if (callback) {
callback.promise.resolve(true);
}
this.eventEmitter.emit(SUBSCRIBE_ICE_CONNECTION_CALLBACK_ID, true);
}

private setTransportStateForConnect() {
Expand Down

0 comments on commit 2a68b89

Please sign in to comment.