You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1937 lines
46 KiB
1937 lines
46 KiB
const EventEmitter = require('events').EventEmitter;
|
|
const mediasoup = require('mediasoup');
|
|
const protoo = require('protoo-server');
|
|
// const rtp = require('rtp.js');
|
|
const throttle = require('@sitespeed.io/throttle');
|
|
const Logger = require('./Logger');
|
|
const utils = require('./utils');
|
|
const config = require('../config');
|
|
const Bot = require('./Bot');
|
|
|
|
const logger = new Logger('Room');
|
|
|
|
/**
|
|
* Room class.
|
|
*
|
|
* This is not a "mediasoup Room" by itself, by a custom class that holds
|
|
* a protoo Room (for signaling with WebSocket clients) and a mediasoup Router
|
|
* (for sending and receiving media to/from those WebSocket peers).
|
|
*/
|
|
class Room extends EventEmitter
|
|
{
|
|
/**
|
|
* Factory function that creates and returns Room instance.
|
|
*
|
|
* @async
|
|
*
|
|
* @param {mediasoup.Worker} mediasoupWorker - The mediasoup Worker in which a new
|
|
* mediasoup Router must be created.
|
|
* @param {String} roomId - Id of the Room instance.
|
|
*/
|
|
static async create({ mediasoupWorker, roomId, consumerReplicas })
|
|
{
|
|
logger.info('create() [roomId:%s]', roomId);
|
|
|
|
// Create a protoo Room instance.
|
|
const protooRoom = new protoo.Room();
|
|
|
|
// Router media codecs.
|
|
const { mediaCodecs } = config.mediasoup.routerOptions;
|
|
|
|
// Create a mediasoup Router.
|
|
const mediasoupRouter = await mediasoupWorker.createRouter({ mediaCodecs });
|
|
|
|
// Create a mediasoup AudioLevelObserver.
|
|
const audioLevelObserver = await mediasoupRouter.createAudioLevelObserver(
|
|
{
|
|
maxEntries : 1,
|
|
threshold : -80,
|
|
interval : 800
|
|
});
|
|
|
|
// Create a mediasoup ActiveSpeakerObserver.
|
|
const activeSpeakerObserver = await mediasoupRouter.createActiveSpeakerObserver();
|
|
|
|
const bot = await Bot.create({ mediasoupRouter });
|
|
|
|
return new Room(
|
|
{
|
|
roomId,
|
|
protooRoom,
|
|
webRtcServer : mediasoupWorker.appData.webRtcServer,
|
|
mediasoupRouter,
|
|
audioLevelObserver,
|
|
activeSpeakerObserver,
|
|
consumerReplicas,
|
|
bot
|
|
});
|
|
}
|
|
|
|
constructor(
|
|
{
|
|
roomId,
|
|
protooRoom,
|
|
webRtcServer,
|
|
mediasoupRouter,
|
|
audioLevelObserver,
|
|
activeSpeakerObserver,
|
|
consumerReplicas,
|
|
bot
|
|
})
|
|
{
|
|
super();
|
|
|
|
this.setMaxListeners(Infinity);
|
|
|
|
// Room id.
|
|
// @type {String}
|
|
this._roomId = roomId;
|
|
|
|
// Closed flag.
|
|
// @type {Boolean}
|
|
this._closed = false;
|
|
|
|
// protoo Room instance.
|
|
// @type {protoo.Room}
|
|
this._protooRoom = protooRoom;
|
|
|
|
// Map of broadcasters indexed by id. Each Object has:
|
|
// - {String} id
|
|
// - {Object} data
|
|
// - {String} displayName
|
|
// - {Object} device
|
|
// - {RTCRtpCapabilities} rtpCapabilities
|
|
// - {Map<String, mediasoup.Transport>} transports
|
|
// - {Map<String, mediasoup.Producer>} producers
|
|
// - {Map<String, mediasoup.Consumers>} consumers
|
|
// - {Map<String, mediasoup.DataProducer>} dataProducers
|
|
// - {Map<String, mediasoup.DataConsumers>} dataConsumers
|
|
// @type {Map<String, Object>}
|
|
this._broadcasters = new Map();
|
|
|
|
// mediasoup WebRtcServer instance.
|
|
// @type {mediasoup.WebRtcServer}
|
|
this._webRtcServer = webRtcServer;
|
|
|
|
// mediasoup Router instance.
|
|
// @type {mediasoup.Router}
|
|
this._mediasoupRouter = mediasoupRouter;
|
|
|
|
// mediasoup AudioLevelObserver.
|
|
// @type {mediasoup.AudioLevelObserver}
|
|
this._audioLevelObserver = audioLevelObserver;
|
|
|
|
// mediasoup ActiveSpeakerObserver.
|
|
// @type {mediasoup.ActiveSpeakerObserver}
|
|
this._activeSpeakerObserver = activeSpeakerObserver;
|
|
|
|
// DataChannel bot.
|
|
// @type {Bot}
|
|
this._bot = bot;
|
|
|
|
// Consumer replicas.
|
|
// @type {Number}
|
|
this._consumerReplicas = consumerReplicas || 0;
|
|
|
|
// Network throttled.
|
|
// @type {Boolean}
|
|
this._networkThrottled = false;
|
|
|
|
// Handle audioLevelObserver.
|
|
this._handleAudioLevelObserver();
|
|
|
|
// Handle activeSpeakerObserver.
|
|
this._handleActiveSpeakerObserver();
|
|
|
|
// For debugging.
|
|
global.audioLevelObserver = this._audioLevelObserver;
|
|
global.activeSpeakerObserver = this._activeSpeakerObserver;
|
|
global.bot = this._bot;
|
|
}
|
|
|
|
/**
|
|
* Closes the Room instance by closing the protoo Room and the mediasoup Router.
|
|
*/
|
|
close()
|
|
{
|
|
logger.debug('close()');
|
|
|
|
this._closed = true;
|
|
|
|
// Close the protoo Room.
|
|
this._protooRoom.close();
|
|
|
|
// Close the mediasoup Router.
|
|
this._mediasoupRouter.close();
|
|
|
|
// Close the Bot.
|
|
this._bot.close();
|
|
|
|
// Emit 'close' event.
|
|
this.emit('close');
|
|
|
|
// Stop network throttling.
|
|
if (this._networkThrottled)
|
|
{
|
|
logger.debug('close() | stopping network throttle');
|
|
|
|
throttle.stop({})
|
|
.catch((error) =>
|
|
{
|
|
logger.error(`close() | failed to stop network throttle:${error}`);
|
|
});
|
|
}
|
|
}
|
|
|
|
logStatus()
|
|
{
|
|
logger.info(
|
|
'logStatus() [roomId:%s, protoo Peers:%s]',
|
|
this._roomId,
|
|
this._protooRoom.peers.length);
|
|
}
|
|
|
|
/**
|
|
* Called from server.js upon a protoo WebSocket connection request from a
|
|
* browser.
|
|
*
|
|
* @param {String} peerId - The id of the protoo peer to be created.
|
|
* @param {Boolean} consume - Whether this peer wants to consume from others.
|
|
* @param {protoo.WebSocketTransport} protooWebSocketTransport - The associated
|
|
* protoo WebSocket transport.
|
|
*/
|
|
handleProtooConnection({ peerId, consume, protooWebSocketTransport })
|
|
{
|
|
const existingPeer = this._protooRoom.getPeer(peerId);
|
|
|
|
if (existingPeer)
|
|
{
|
|
logger.warn(
|
|
'handleProtooConnection() | there is already a protoo Peer with same peerId, closing it [peerId:%s]',
|
|
peerId);
|
|
|
|
existingPeer.close();
|
|
}
|
|
|
|
let peer;
|
|
|
|
// Create a new protoo Peer with the given peerId.
|
|
try
|
|
{
|
|
peer = this._protooRoom.createPeer(peerId, protooWebSocketTransport);
|
|
}
|
|
catch (error)
|
|
{
|
|
logger.error('protooRoom.createPeer() failed:%o', error);
|
|
}
|
|
|
|
// Notify mediasoup version to the peer.
|
|
peer.notify('mediasoup-version', { version: mediasoup.version })
|
|
.catch(() => {});
|
|
|
|
// Use the peer.data object to store mediasoup related objects.
|
|
|
|
// Not joined after a custom protoo 'join' request is later received.
|
|
peer.data.consume = consume;
|
|
peer.data.joined = false;
|
|
peer.data.displayName = undefined;
|
|
peer.data.device = undefined;
|
|
peer.data.rtpCapabilities = undefined;
|
|
peer.data.sctpCapabilities = undefined;
|
|
|
|
// Have mediasoup related maps ready even before the Peer joins since we
|
|
// allow creating Transports before joining.
|
|
peer.data.transports = new Map();
|
|
peer.data.producers = new Map();
|
|
peer.data.consumers = new Map();
|
|
peer.data.dataProducers = new Map();
|
|
peer.data.dataConsumers = new Map();
|
|
|
|
peer.on('request', (request, accept, reject) =>
|
|
{
|
|
logger.debug(
|
|
'protoo Peer "request" event [method:%s, peerId:%s]',
|
|
request.method, peer.id);
|
|
|
|
this._handleProtooRequest(peer, request, accept, reject)
|
|
.catch((error) =>
|
|
{
|
|
logger.error('request failed:%o', error);
|
|
|
|
reject(error);
|
|
});
|
|
});
|
|
|
|
peer.on('close', () =>
|
|
{
|
|
if (this._closed)
|
|
return;
|
|
|
|
logger.debug('protoo Peer "close" event [peerId:%s]', peer.id);
|
|
|
|
// If the Peer was joined, notify all Peers.
|
|
if (peer.data.joined)
|
|
{
|
|
for (const otherPeer of this._getJoinedPeers({ excludePeer: peer }))
|
|
{
|
|
otherPeer.notify('peerClosed', { peerId: peer.id })
|
|
.catch(() => {});
|
|
}
|
|
}
|
|
|
|
// Iterate and close all mediasoup Transport associated to this Peer, so all
|
|
// its Producers and Consumers will also be closed.
|
|
for (const transport of peer.data.transports.values())
|
|
{
|
|
transport.close();
|
|
}
|
|
|
|
// If this is the latest Peer in the room, close the room.
|
|
if (this._protooRoom.peers.length === 0)
|
|
{
|
|
logger.info(
|
|
'last Peer in the room left, closing the room [roomId:%s]',
|
|
this._roomId);
|
|
|
|
this.close();
|
|
}
|
|
});
|
|
}
|
|
|
|
getRouterRtpCapabilities()
|
|
{
|
|
return this._mediasoupRouter.rtpCapabilities;
|
|
}
|
|
|
|
/**
|
|
* Create a Broadcaster. This is for HTTP API requests (see server.js).
|
|
*
|
|
* @async
|
|
*
|
|
* @type {String} id - Broadcaster id.
|
|
* @type {String} displayName - Descriptive name.
|
|
* @type {Object} [device] - Additional info with name, version and flags fields.
|
|
* @type {RTCRtpCapabilities} [rtpCapabilities] - Device RTP capabilities.
|
|
*/
|
|
async createBroadcaster({ id, displayName, device = {}, rtpCapabilities })
|
|
{
|
|
if (typeof id !== 'string' || !id)
|
|
throw new TypeError('missing body.id');
|
|
else if (typeof displayName !== 'string' || !displayName)
|
|
throw new TypeError('missing body.displayName');
|
|
else if (typeof device.name !== 'string' || !device.name)
|
|
throw new TypeError('missing body.device.name');
|
|
else if (rtpCapabilities && typeof rtpCapabilities !== 'object')
|
|
throw new TypeError('wrong body.rtpCapabilities');
|
|
|
|
if (this._broadcasters.has(id))
|
|
throw new Error(`broadcaster with id "${id}" already exists`);
|
|
|
|
const broadcaster =
|
|
{
|
|
id,
|
|
data :
|
|
{
|
|
displayName,
|
|
device :
|
|
{
|
|
flag : 'broadcaster',
|
|
name : device.name || 'Unknown device',
|
|
version : device.version
|
|
},
|
|
rtpCapabilities,
|
|
transports : new Map(),
|
|
producers : new Map(),
|
|
consumers : new Map(),
|
|
dataProducers : new Map(),
|
|
dataConsumers : new Map()
|
|
}
|
|
};
|
|
|
|
// Store the Broadcaster into the map.
|
|
this._broadcasters.set(broadcaster.id, broadcaster);
|
|
|
|
// Notify the new Broadcaster to all Peers.
|
|
for (const otherPeer of this._getJoinedPeers())
|
|
{
|
|
otherPeer.notify(
|
|
'newPeer',
|
|
{
|
|
id : broadcaster.id,
|
|
displayName : broadcaster.data.displayName,
|
|
device : broadcaster.data.device
|
|
})
|
|
.catch(() => {});
|
|
}
|
|
|
|
// Reply with the list of Peers and their Producers.
|
|
const peerInfos = [];
|
|
const joinedPeers = this._getJoinedPeers();
|
|
|
|
// Just fill the list of Peers if the Broadcaster provided its rtpCapabilities.
|
|
if (rtpCapabilities)
|
|
{
|
|
for (const joinedPeer of joinedPeers)
|
|
{
|
|
const peerInfo =
|
|
{
|
|
id : joinedPeer.id,
|
|
displayName : joinedPeer.data.displayName,
|
|
device : joinedPeer.data.device,
|
|
producers : []
|
|
};
|
|
|
|
for (const producer of joinedPeer.data.producers.values())
|
|
{
|
|
// Ignore Producers that the Broadcaster cannot consume.
|
|
if (
|
|
!this._mediasoupRouter.canConsume(
|
|
{
|
|
producerId : producer.id,
|
|
rtpCapabilities
|
|
})
|
|
)
|
|
{
|
|
continue;
|
|
}
|
|
|
|
peerInfo.producers.push(
|
|
{
|
|
id : producer.id,
|
|
kind : producer.kind
|
|
});
|
|
}
|
|
|
|
peerInfos.push(peerInfo);
|
|
}
|
|
}
|
|
|
|
return { peers: peerInfos };
|
|
}
|
|
|
|
/**
|
|
* Delete a Broadcaster.
|
|
*
|
|
* @type {String} broadcasterId
|
|
*/
|
|
deleteBroadcaster({ broadcasterId })
|
|
{
|
|
const broadcaster = this._broadcasters.get(broadcasterId);
|
|
|
|
if (!broadcaster)
|
|
throw new Error(`broadcaster with id "${broadcasterId}" does not exist`);
|
|
|
|
for (const transport of broadcaster.data.transports.values())
|
|
{
|
|
transport.close();
|
|
}
|
|
|
|
this._broadcasters.delete(broadcasterId);
|
|
|
|
for (const peer of this._getJoinedPeers())
|
|
{
|
|
peer.notify('peerClosed', { peerId: broadcasterId })
|
|
.catch(() => {});
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Create a mediasoup Transport associated to a Broadcaster. It can be a
|
|
* PlainTransport or a WebRtcTransport.
|
|
*
|
|
* @async
|
|
*
|
|
* @type {String} broadcasterId
|
|
* @type {String} type - Can be 'plain' (PlainTransport) or 'webrtc'
|
|
* (WebRtcTransport).
|
|
* @type {Boolean} [rtcpMux=false] - Just for PlainTransport, use RTCP mux.
|
|
* @type {Boolean} [comedia=true] - Just for PlainTransport, enable remote IP:port
|
|
* autodetection.
|
|
* @type {Object} [sctpCapabilities] - SCTP capabilities
|
|
*/
|
|
async createBroadcasterTransport(
|
|
{
|
|
broadcasterId,
|
|
type,
|
|
rtcpMux = false,
|
|
comedia = true,
|
|
sctpCapabilities
|
|
})
|
|
{
|
|
const broadcaster = this._broadcasters.get(broadcasterId);
|
|
|
|
if (!broadcaster)
|
|
throw new Error(`broadcaster with id "${broadcasterId}" does not exist`);
|
|
|
|
switch (type)
|
|
{
|
|
case 'webrtc':
|
|
{
|
|
const webRtcTransportOptions =
|
|
{
|
|
...utils.clone(config.mediasoup.webRtcTransportOptions),
|
|
webRtcServer : this._webRtcServer,
|
|
iceConsentTimeout : 20,
|
|
enableSctp : Boolean(sctpCapabilities),
|
|
numSctpStreams : (sctpCapabilities || {}).numStreams
|
|
};
|
|
|
|
const transport =
|
|
await this._mediasoupRouter.createWebRtcTransport(webRtcTransportOptions);
|
|
|
|
// Store it.
|
|
broadcaster.data.transports.set(transport.id, transport);
|
|
|
|
return {
|
|
id : transport.id,
|
|
iceParameters : transport.iceParameters,
|
|
iceCandidates : transport.iceCandidates,
|
|
dtlsParameters : transport.dtlsParameters,
|
|
sctpParameters : transport.sctpParameters
|
|
};
|
|
}
|
|
|
|
case 'plain':
|
|
{
|
|
const plainTransportOptions =
|
|
{
|
|
...utils.clone(config.mediasoup.plainTransportOptions),
|
|
rtcpMux : rtcpMux,
|
|
comedia : comedia
|
|
};
|
|
|
|
const transport = await this._mediasoupRouter.createPlainTransport(
|
|
plainTransportOptions);
|
|
|
|
// Store it.
|
|
broadcaster.data.transports.set(transport.id, transport);
|
|
|
|
return {
|
|
id : transport.id,
|
|
ip : transport.tuple.localIp,
|
|
port : transport.tuple.localPort,
|
|
rtcpPort : transport.rtcpTuple ? transport.rtcpTuple.localPort : undefined
|
|
};
|
|
}
|
|
|
|
default:
|
|
{
|
|
throw new TypeError('invalid type');
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Connect a Broadcaster mediasoup WebRtcTransport.
|
|
*
|
|
* @async
|
|
*
|
|
* @type {String} broadcasterId
|
|
* @type {String} transportId
|
|
* @type {RTCDtlsParameters} dtlsParameters - Remote DTLS parameters.
|
|
*/
|
|
async connectBroadcasterTransport(
|
|
{
|
|
broadcasterId,
|
|
transportId,
|
|
dtlsParameters
|
|
}
|
|
)
|
|
{
|
|
const broadcaster = this._broadcasters.get(broadcasterId);
|
|
|
|
if (!broadcaster)
|
|
throw new Error(`broadcaster with id "${broadcasterId}" does not exist`);
|
|
|
|
const transport = broadcaster.data.transports.get(transportId);
|
|
|
|
if (!transport)
|
|
throw new Error(`transport with id "${transportId}" does not exist`);
|
|
|
|
if (transport.constructor.name !== 'WebRtcTransport')
|
|
{
|
|
throw new Error(
|
|
`transport with id "${transportId}" is not a WebRtcTransport`);
|
|
}
|
|
|
|
await transport.connect({ dtlsParameters });
|
|
}
|
|
|
|
/**
|
|
* Create a mediasoup Producer associated to a Broadcaster.
|
|
*
|
|
* @async
|
|
*
|
|
* @type {String} broadcasterId
|
|
* @type {String} transportId
|
|
* @type {String} kind - 'audio' or 'video' kind for the Producer.
|
|
* @type {RTCRtpParameters} rtpParameters - RTP parameters for the Producer.
|
|
*/
|
|
async createBroadcasterProducer(
|
|
{
|
|
broadcasterId,
|
|
transportId,
|
|
kind,
|
|
rtpParameters
|
|
}
|
|
)
|
|
{
|
|
const broadcaster = this._broadcasters.get(broadcasterId);
|
|
|
|
if (!broadcaster)
|
|
throw new Error(`broadcaster with id "${broadcasterId}" does not exist`);
|
|
|
|
const transport = broadcaster.data.transports.get(transportId);
|
|
|
|
if (!transport)
|
|
throw new Error(`transport with id "${transportId}" does not exist`);
|
|
|
|
const producer =
|
|
await transport.produce({ kind, rtpParameters });
|
|
|
|
// Store it.
|
|
broadcaster.data.producers.set(producer.id, producer);
|
|
|
|
// Set Producer events.
|
|
// producer.on('score', (score) =>
|
|
// {
|
|
// logger.debug(
|
|
// 'broadcaster producer "score" event [producerId:%s, score:%o]',
|
|
// producer.id, score);
|
|
// });
|
|
|
|
producer.on('videoorientationchange', (videoOrientation) =>
|
|
{
|
|
logger.debug(
|
|
'broadcaster producer "videoorientationchange" event [producerId:%s, videoOrientation:%o]',
|
|
producer.id, videoOrientation);
|
|
});
|
|
|
|
// Optimization: Create a server-side Consumer for each Peer.
|
|
for (const peer of this._getJoinedPeers())
|
|
{
|
|
this._createConsumer(
|
|
{
|
|
consumerPeer : peer,
|
|
producerPeer : broadcaster,
|
|
producer
|
|
});
|
|
}
|
|
|
|
// Add into the AudioLevelObserver and ActiveSpeakerObserver.
|
|
if (producer.kind === 'audio')
|
|
{
|
|
this._audioLevelObserver.addProducer({ producerId: producer.id })
|
|
.catch(() => {});
|
|
|
|
this._activeSpeakerObserver.addProducer({ producerId: producer.id })
|
|
.catch(() => {});
|
|
}
|
|
|
|
return { id: producer.id };
|
|
}
|
|
|
|
/**
|
|
* Create a mediasoup Consumer associated to a Broadcaster.
|
|
*
|
|
* @async
|
|
*
|
|
* @type {String} broadcasterId
|
|
* @type {String} transportId
|
|
* @type {String} producerId
|
|
*/
|
|
async createBroadcasterConsumer(
|
|
{
|
|
broadcasterId,
|
|
transportId,
|
|
producerId
|
|
}
|
|
)
|
|
{
|
|
const broadcaster = this._broadcasters.get(broadcasterId);
|
|
|
|
if (!broadcaster)
|
|
throw new Error(`broadcaster with id "${broadcasterId}" does not exist`);
|
|
|
|
if (!broadcaster.data.rtpCapabilities)
|
|
throw new Error('broadcaster does not have rtpCapabilities');
|
|
|
|
const transport = broadcaster.data.transports.get(transportId);
|
|
|
|
if (!transport)
|
|
throw new Error(`transport with id "${transportId}" does not exist`);
|
|
|
|
const consumer = await transport.consume(
|
|
{
|
|
producerId,
|
|
rtpCapabilities : broadcaster.data.rtpCapabilities
|
|
});
|
|
|
|
// Store it.
|
|
broadcaster.data.consumers.set(consumer.id, consumer);
|
|
|
|
// Set Consumer events.
|
|
consumer.on('transportclose', () =>
|
|
{
|
|
// Remove from its map.
|
|
broadcaster.data.consumers.delete(consumer.id);
|
|
});
|
|
|
|
consumer.on('producerclose', () =>
|
|
{
|
|
// Remove from its map.
|
|
broadcaster.data.consumers.delete(consumer.id);
|
|
});
|
|
|
|
return {
|
|
id : consumer.id,
|
|
producerId,
|
|
kind : consumer.kind,
|
|
rtpParameters : consumer.rtpParameters,
|
|
type : consumer.type
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Create a mediasoup DataConsumer associated to a Broadcaster.
|
|
*
|
|
* @async
|
|
*
|
|
* @type {String} broadcasterId
|
|
* @type {String} transportId
|
|
* @type {String} dataProducerId
|
|
*/
|
|
async createBroadcasterDataConsumer(
|
|
{
|
|
broadcasterId,
|
|
transportId,
|
|
dataProducerId
|
|
}
|
|
)
|
|
{
|
|
const broadcaster = this._broadcasters.get(broadcasterId);
|
|
|
|
if (!broadcaster)
|
|
throw new Error(`broadcaster with id "${broadcasterId}" does not exist`);
|
|
|
|
if (!broadcaster.data.rtpCapabilities)
|
|
throw new Error('broadcaster does not have rtpCapabilities');
|
|
|
|
const transport = broadcaster.data.transports.get(transportId);
|
|
|
|
if (!transport)
|
|
throw new Error(`transport with id "${transportId}" does not exist`);
|
|
|
|
const dataConsumer = await transport.consumeData(
|
|
{
|
|
dataProducerId
|
|
});
|
|
|
|
// Store it.
|
|
broadcaster.data.dataConsumers.set(dataConsumer.id, dataConsumer);
|
|
|
|
// Set Consumer events.
|
|
dataConsumer.on('transportclose', () =>
|
|
{
|
|
// Remove from its map.
|
|
broadcaster.data.dataConsumers.delete(dataConsumer.id);
|
|
});
|
|
|
|
dataConsumer.on('dataproducerclose', () =>
|
|
{
|
|
// Remove from its map.
|
|
broadcaster.data.dataConsumers.delete(dataConsumer.id);
|
|
});
|
|
|
|
return {
|
|
id : dataConsumer.id,
|
|
streamId : dataConsumer.sctpStreamParameters.streamId
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Create a mediasoup DataProducer associated to a Broadcaster.
|
|
*
|
|
* @async
|
|
*
|
|
* @type {String} broadcasterId
|
|
* @type {String} transportId
|
|
*/
|
|
async createBroadcasterDataProducer(
|
|
{
|
|
broadcasterId,
|
|
transportId,
|
|
label,
|
|
protocol,
|
|
sctpStreamParameters,
|
|
appData
|
|
}
|
|
)
|
|
{
|
|
const broadcaster = this._broadcasters.get(broadcasterId);
|
|
|
|
if (!broadcaster)
|
|
throw new Error(`broadcaster with id "${broadcasterId}" does not exist`);
|
|
|
|
// if (!broadcaster.data.sctpCapabilities)
|
|
// throw new Error('broadcaster does not have sctpCapabilities');
|
|
|
|
const transport = broadcaster.data.transports.get(transportId);
|
|
|
|
if (!transport)
|
|
throw new Error(`transport with id "${transportId}" does not exist`);
|
|
|
|
const dataProducer = await transport.produceData(
|
|
{
|
|
sctpStreamParameters,
|
|
label,
|
|
protocol,
|
|
appData
|
|
});
|
|
|
|
// Store it.
|
|
broadcaster.data.dataProducers.set(dataProducer.id, dataProducer);
|
|
|
|
// Set Consumer events.
|
|
dataProducer.on('transportclose', () =>
|
|
{
|
|
// Remove from its map.
|
|
broadcaster.data.dataProducers.delete(dataProducer.id);
|
|
});
|
|
|
|
// // Optimization: Create a server-side Consumer for each Peer.
|
|
// for (const peer of this._getJoinedPeers())
|
|
// {
|
|
// this._createDataConsumer(
|
|
// {
|
|
// dataConsumerPeer : peer,
|
|
// dataProducerPeer : broadcaster,
|
|
// dataProducer: dataProducer
|
|
// });
|
|
// }
|
|
|
|
return {
|
|
id : dataProducer.id
|
|
};
|
|
}
|
|
|
|
_handleAudioLevelObserver()
|
|
{
|
|
this._audioLevelObserver.on('volumes', (volumes) =>
|
|
{
|
|
const { producer, volume } = volumes[0];
|
|
|
|
logger.debug(
|
|
'audioLevelObserver "volumes" event [producerId:%s, volume:%s]',
|
|
producer.id, volume);
|
|
|
|
// Notify all Peers.
|
|
for (const peer of this._getJoinedPeers())
|
|
{
|
|
peer.notify(
|
|
'activeSpeaker',
|
|
{
|
|
peerId : producer.appData.peerId,
|
|
volume : volume
|
|
})
|
|
.catch(() => {});
|
|
}
|
|
});
|
|
|
|
this._audioLevelObserver.on('silence', () =>
|
|
{
|
|
logger.debug('audioLevelObserver "silence" event');
|
|
|
|
// Notify all Peers.
|
|
for (const peer of this._getJoinedPeers())
|
|
{
|
|
peer.notify('activeSpeaker', { peerId: null })
|
|
.catch(() => {});
|
|
}
|
|
});
|
|
}
|
|
|
|
_handleActiveSpeakerObserver()
|
|
{
|
|
this._activeSpeakerObserver.on('dominantspeaker', (dominantSpeaker) =>
|
|
{
|
|
logger.debug(
|
|
'activeSpeakerObserver "dominantspeaker" event [producerId:%s]',
|
|
dominantSpeaker.producer.id);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Handle protoo requests from browsers.
|
|
*
|
|
* @async
|
|
*/
|
|
async _handleProtooRequest(peer, request, accept, reject)
|
|
{
|
|
switch (request.method)
|
|
{
|
|
case 'getRouterRtpCapabilities':
|
|
{
|
|
accept(this._mediasoupRouter.rtpCapabilities);
|
|
|
|
break;
|
|
}
|
|
|
|
case 'join':
|
|
{
|
|
// Ensure the Peer is not already joined.
|
|
if (peer.data.joined)
|
|
throw new Error('Peer already joined');
|
|
|
|
const {
|
|
displayName,
|
|
device,
|
|
rtpCapabilities,
|
|
sctpCapabilities
|
|
} = request.data;
|
|
|
|
// Store client data into the protoo Peer data object.
|
|
peer.data.joined = true;
|
|
peer.data.displayName = displayName;
|
|
peer.data.device = device;
|
|
peer.data.rtpCapabilities = rtpCapabilities;
|
|
peer.data.sctpCapabilities = sctpCapabilities;
|
|
|
|
// Tell the new Peer about already joined Peers.
|
|
// And also create Consumers for existing Producers.
|
|
|
|
const joinedPeers =
|
|
[
|
|
...this._getJoinedPeers(),
|
|
...this._broadcasters.values()
|
|
];
|
|
|
|
// Reply now the request with the list of joined peers (all but the new one).
|
|
const peerInfos = joinedPeers
|
|
.filter((joinedPeer) => joinedPeer.id !== peer.id)
|
|
.map((joinedPeer) => ({
|
|
id : joinedPeer.id,
|
|
displayName : joinedPeer.data.displayName,
|
|
device : joinedPeer.data.device
|
|
}));
|
|
|
|
accept({ peers: peerInfos });
|
|
|
|
// Mark the new Peer as joined.
|
|
peer.data.joined = true;
|
|
|
|
for (const joinedPeer of joinedPeers)
|
|
{
|
|
// Create Consumers for existing Producers.
|
|
for (const producer of joinedPeer.data.producers.values())
|
|
{
|
|
this._createConsumer(
|
|
{
|
|
consumerPeer : peer,
|
|
producerPeer : joinedPeer,
|
|
producer
|
|
});
|
|
}
|
|
|
|
// Create DataConsumers for existing DataProducers.
|
|
for (const dataProducer of joinedPeer.data.dataProducers.values())
|
|
{
|
|
if (dataProducer.label === 'bot')
|
|
continue;
|
|
|
|
this._createDataConsumer(
|
|
{
|
|
dataConsumerPeer : peer,
|
|
dataProducerPeer : joinedPeer,
|
|
dataProducer
|
|
});
|
|
}
|
|
}
|
|
|
|
// Create DataConsumers for bot DataProducer.
|
|
this._createDataConsumer(
|
|
{
|
|
dataConsumerPeer : peer,
|
|
dataProducerPeer : null,
|
|
dataProducer : this._bot.dataProducer
|
|
});
|
|
|
|
// Notify the new Peer to all other Peers.
|
|
for (const otherPeer of this._getJoinedPeers({ excludePeer: peer }))
|
|
{
|
|
otherPeer.notify(
|
|
'newPeer',
|
|
{
|
|
id : peer.id,
|
|
displayName : peer.data.displayName,
|
|
device : peer.data.device
|
|
})
|
|
.catch(() => {});
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case 'createWebRtcTransport':
|
|
{
|
|
// NOTE: Don't require that the Peer is joined here, so the client can
|
|
// initiate mediasoup Transports and be ready when he later joins.
|
|
|
|
const {
|
|
forceTcp,
|
|
producing,
|
|
consuming,
|
|
sctpCapabilities
|
|
} = request.data;
|
|
|
|
const webRtcTransportOptions =
|
|
{
|
|
...utils.clone(config.mediasoup.webRtcTransportOptions),
|
|
webRtcServer : this._webRtcServer,
|
|
iceConsentTimeout : 20,
|
|
enableSctp : Boolean(sctpCapabilities),
|
|
numSctpStreams : (sctpCapabilities || {}).numStreams,
|
|
appData : { producing, consuming }
|
|
};
|
|
|
|
if (forceTcp)
|
|
{
|
|
webRtcTransportOptions.listenInfos = webRtcTransportOptions.listenInfos
|
|
.filter((listenInfo) => listenInfo.protocol === 'tcp');
|
|
|
|
webRtcTransportOptions.enableUdp = false;
|
|
webRtcTransportOptions.enableTcp = true;
|
|
}
|
|
|
|
const transport =
|
|
await this._mediasoupRouter.createWebRtcTransport(webRtcTransportOptions);
|
|
|
|
transport.on('icestatechange', (iceState) =>
|
|
{
|
|
if (iceState === 'disconnected' || iceState === 'closed')
|
|
{
|
|
logger.warn('WebRtcTransport "icestatechange" event [iceState:%s], closing peer', iceState);
|
|
|
|
peer.close();
|
|
}
|
|
});
|
|
|
|
transport.on('sctpstatechange', (sctpState) =>
|
|
{
|
|
logger.debug('WebRtcTransport "sctpstatechange" event [sctpState:%s]', sctpState);
|
|
});
|
|
|
|
transport.on('dtlsstatechange', (dtlsState) =>
|
|
{
|
|
if (dtlsState === 'failed' || dtlsState === 'closed')
|
|
{
|
|
logger.warn('WebRtcTransport "dtlsstatechange" event [dtlsState:%s], closing peer', dtlsState);
|
|
|
|
peer.close();
|
|
}
|
|
});
|
|
|
|
// NOTE: For testing.
|
|
// await transport.enableTraceEvent([ 'probation', 'bwe' ]);
|
|
await transport.enableTraceEvent([ 'bwe' ]);
|
|
|
|
transport.on('trace', (trace) =>
|
|
{
|
|
logger.debug(
|
|
'transport "trace" event [transportId:%s, trace.type:%s, trace:%o]',
|
|
transport.id, trace.type, trace);
|
|
|
|
if (trace.type === 'bwe' && trace.direction === 'out')
|
|
{
|
|
peer.notify(
|
|
'downlinkBwe',
|
|
{
|
|
desiredBitrate : trace.info.desiredBitrate,
|
|
effectiveDesiredBitrate : trace.info.effectiveDesiredBitrate,
|
|
availableBitrate : trace.info.availableBitrate
|
|
})
|
|
.catch(() => {});
|
|
}
|
|
});
|
|
|
|
// Store the WebRtcTransport into the protoo Peer data Object.
|
|
peer.data.transports.set(transport.id, transport);
|
|
|
|
accept(
|
|
{
|
|
id : transport.id,
|
|
iceParameters : transport.iceParameters,
|
|
iceCandidates : transport.iceCandidates,
|
|
dtlsParameters : transport.dtlsParameters,
|
|
sctpParameters : transport.sctpParameters
|
|
});
|
|
|
|
const { maxIncomingBitrate } = config.mediasoup.webRtcTransportOptions;
|
|
|
|
// If set, apply max incoming bitrate limit.
|
|
if (maxIncomingBitrate)
|
|
{
|
|
try { await transport.setMaxIncomingBitrate(maxIncomingBitrate); }
|
|
catch (error) {}
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case 'connectWebRtcTransport':
|
|
{
|
|
const { transportId, dtlsParameters } = request.data;
|
|
const transport = peer.data.transports.get(transportId);
|
|
|
|
if (!transport)
|
|
throw new Error(`transport with id "${transportId}" not found`);
|
|
|
|
await transport.connect({ dtlsParameters });
|
|
|
|
accept();
|
|
|
|
break;
|
|
}
|
|
|
|
case 'restartIce':
|
|
{
|
|
const { transportId } = request.data;
|
|
const transport = peer.data.transports.get(transportId);
|
|
|
|
if (!transport)
|
|
throw new Error(`transport with id "${transportId}" not found`);
|
|
|
|
const iceParameters = await transport.restartIce();
|
|
|
|
accept(iceParameters);
|
|
|
|
break;
|
|
}
|
|
|
|
case 'produce':
|
|
{
|
|
// Ensure the Peer is joined.
|
|
if (!peer.data.joined)
|
|
throw new Error('Peer not yet joined');
|
|
|
|
const { transportId, kind, rtpParameters } = request.data;
|
|
let { appData } = request.data;
|
|
const transport = peer.data.transports.get(transportId);
|
|
|
|
if (!transport)
|
|
throw new Error(`transport with id "${transportId}" not found`);
|
|
|
|
// Add peerId into appData to later get the associated Peer during
|
|
// the 'loudest' event of the audioLevelObserver.
|
|
appData = { ...appData, peerId: peer.id };
|
|
|
|
const producer = await transport.produce(
|
|
{
|
|
kind,
|
|
rtpParameters,
|
|
appData
|
|
// keyFrameRequestDelay: 5000
|
|
});
|
|
|
|
// Store the Producer into the protoo Peer data Object.
|
|
peer.data.producers.set(producer.id, producer);
|
|
|
|
// Set Producer events.
|
|
producer.on('score', (score) =>
|
|
{
|
|
// logger.debug(
|
|
// 'producer "score" event [producerId:%s, score:%o]',
|
|
// producer.id, score);
|
|
|
|
peer.notify('producerScore', { producerId: producer.id, score })
|
|
.catch(() => {});
|
|
});
|
|
|
|
producer.on('videoorientationchange', (videoOrientation) =>
|
|
{
|
|
logger.debug(
|
|
'producer "videoorientationchange" event [producerId:%s, videoOrientation:%o]',
|
|
producer.id, videoOrientation);
|
|
});
|
|
|
|
// NOTE: For testing.
|
|
// await producer.enableTraceEvent([ 'rtp', 'keyframe', 'nack', 'pli', 'fir' ]);
|
|
// await producer.enableTraceEvent([ 'pli', 'fir' ]);
|
|
// await producer.enableTraceEvent([ 'keyframe' ]);
|
|
|
|
producer.on('trace', (trace) =>
|
|
{
|
|
logger.debug(
|
|
'producer "trace" event [producerId:%s, trace.type:%s, trace:%o]',
|
|
producer.id, trace.type, trace);
|
|
});
|
|
|
|
accept({ id: producer.id });
|
|
|
|
// Optimization: Create a server-side Consumer for each Peer.
|
|
for (const otherPeer of this._getJoinedPeers({ excludePeer: peer }))
|
|
{
|
|
this._createConsumer(
|
|
{
|
|
consumerPeer : otherPeer,
|
|
producerPeer : peer,
|
|
producer
|
|
});
|
|
}
|
|
|
|
/* Test rtpjs lib. */
|
|
|
|
// const directTransport = await this._mediasoupRouter.createDirectTransport();
|
|
|
|
// directTransport.on('rtcp', (buffer) =>
|
|
// {
|
|
// const rtcpPacket =
|
|
// new rtp.packets.CompoundPacket(rtp.utils.nodeBufferToDataView(buffer));
|
|
|
|
// logger.info('RTCP packet');
|
|
// logger.info(rtcpPacket.dump());
|
|
// });
|
|
|
|
// const directConsumer = await directTransport.consume(
|
|
// {
|
|
// producerId : producer.id,
|
|
// rtpCapabilities : this._mediasoupRouter.rtpCapabilities
|
|
// }
|
|
// );
|
|
|
|
// const directProducer = await directTransport.produce(
|
|
// {
|
|
// kind : directConsumer.kind,
|
|
// rtpParameters : directConsumer.rtpParameters
|
|
// });
|
|
|
|
// directConsumer.on('rtp', (buffer) =>
|
|
// {
|
|
// const rtpPacket =
|
|
// new rtp.packets.RtpPacket(rtp.utils.nodeBufferToDataView(buffer));
|
|
|
|
// // logger.info('RTP packet');
|
|
// // logger.info(rtpPacket.dump());
|
|
|
|
// directProducer.send(buffer);
|
|
// });
|
|
|
|
// Add into the AudioLevelObserver and ActiveSpeakerObserver.
|
|
if (producer.kind === 'audio')
|
|
{
|
|
this._audioLevelObserver.addProducer({ producerId: producer.id })
|
|
.catch(() => {});
|
|
|
|
this._activeSpeakerObserver.addProducer({ producerId: producer.id })
|
|
.catch(() => {});
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case 'closeProducer':
|
|
{
|
|
// Ensure the Peer is joined.
|
|
if (!peer.data.joined)
|
|
throw new Error('Peer not yet joined');
|
|
|
|
const { producerId } = request.data;
|
|
const producer = peer.data.producers.get(producerId);
|
|
|
|
if (!producer)
|
|
throw new Error(`producer with id "${producerId}" not found`);
|
|
|
|
producer.close();
|
|
|
|
// Remove from its map.
|
|
peer.data.producers.delete(producer.id);
|
|
|
|
accept();
|
|
|
|
break;
|
|
}
|
|
|
|
case 'pauseProducer':
|
|
{
|
|
// Ensure the Peer is joined.
|
|
if (!peer.data.joined)
|
|
throw new Error('Peer not yet joined');
|
|
|
|
const { producerId } = request.data;
|
|
const producer = peer.data.producers.get(producerId);
|
|
|
|
if (!producer)
|
|
throw new Error(`producer with id "${producerId}" not found`);
|
|
|
|
await producer.pause();
|
|
|
|
accept();
|
|
|
|
break;
|
|
}
|
|
|
|
case 'resumeProducer':
|
|
{
|
|
// Ensure the Peer is joined.
|
|
if (!peer.data.joined)
|
|
throw new Error('Peer not yet joined');
|
|
|
|
const { producerId } = request.data;
|
|
const producer = peer.data.producers.get(producerId);
|
|
|
|
if (!producer)
|
|
throw new Error(`producer with id "${producerId}" not found`);
|
|
|
|
await producer.resume();
|
|
|
|
accept();
|
|
|
|
break;
|
|
}
|
|
|
|
case 'pauseConsumer':
|
|
{
|
|
// Ensure the Peer is joined.
|
|
if (!peer.data.joined)
|
|
throw new Error('Peer not yet joined');
|
|
|
|
const { consumerId } = request.data;
|
|
const consumer = peer.data.consumers.get(consumerId);
|
|
|
|
if (!consumer)
|
|
throw new Error(`consumer with id "${consumerId}" not found`);
|
|
|
|
await consumer.pause();
|
|
|
|
accept();
|
|
|
|
break;
|
|
}
|
|
|
|
case 'resumeConsumer':
|
|
{
|
|
// Ensure the Peer is joined.
|
|
if (!peer.data.joined)
|
|
throw new Error('Peer not yet joined');
|
|
|
|
const { consumerId } = request.data;
|
|
const consumer = peer.data.consumers.get(consumerId);
|
|
|
|
if (!consumer)
|
|
throw new Error(`consumer with id "${consumerId}" not found`);
|
|
|
|
await consumer.resume();
|
|
|
|
accept();
|
|
|
|
break;
|
|
}
|
|
|
|
case 'setConsumerPreferredLayers':
|
|
{
|
|
// Ensure the Peer is joined.
|
|
if (!peer.data.joined)
|
|
throw new Error('Peer not yet joined');
|
|
|
|
const { consumerId, spatialLayer, temporalLayer } = request.data;
|
|
const consumer = peer.data.consumers.get(consumerId);
|
|
|
|
if (!consumer)
|
|
throw new Error(`consumer with id "${consumerId}" not found`);
|
|
|
|
await consumer.setPreferredLayers({ spatialLayer, temporalLayer });
|
|
|
|
accept();
|
|
|
|
break;
|
|
}
|
|
|
|
case 'setConsumerPriority':
|
|
{
|
|
// Ensure the Peer is joined.
|
|
if (!peer.data.joined)
|
|
throw new Error('Peer not yet joined');
|
|
|
|
const { consumerId, priority } = request.data;
|
|
const consumer = peer.data.consumers.get(consumerId);
|
|
|
|
if (!consumer)
|
|
throw new Error(`consumer with id "${consumerId}" not found`);
|
|
|
|
await consumer.setPriority(priority);
|
|
|
|
accept();
|
|
|
|
break;
|
|
}
|
|
|
|
case 'requestConsumerKeyFrame':
|
|
{
|
|
// Ensure the Peer is joined.
|
|
if (!peer.data.joined)
|
|
throw new Error('Peer not yet joined');
|
|
|
|
const { consumerId } = request.data;
|
|
const consumer = peer.data.consumers.get(consumerId);
|
|
|
|
if (!consumer)
|
|
throw new Error(`consumer with id "${consumerId}" not found`);
|
|
|
|
await consumer.requestKeyFrame();
|
|
|
|
accept();
|
|
|
|
break;
|
|
}
|
|
|
|
case 'produceData':
|
|
{
|
|
// Ensure the Peer is joined.
|
|
if (!peer.data.joined)
|
|
throw new Error('Peer not yet joined');
|
|
|
|
const {
|
|
transportId,
|
|
sctpStreamParameters,
|
|
label,
|
|
protocol,
|
|
appData
|
|
} = request.data;
|
|
|
|
const transport = peer.data.transports.get(transportId);
|
|
|
|
if (!transport)
|
|
throw new Error(`transport with id "${transportId}" not found`);
|
|
|
|
const dataProducer = await transport.produceData(
|
|
{
|
|
sctpStreamParameters,
|
|
label,
|
|
protocol,
|
|
appData
|
|
});
|
|
|
|
// Store the Producer into the protoo Peer data Object.
|
|
peer.data.dataProducers.set(dataProducer.id, dataProducer);
|
|
|
|
accept({ id: dataProducer.id });
|
|
|
|
switch (dataProducer.label)
|
|
{
|
|
case 'chat':
|
|
{
|
|
// Create a server-side DataConsumer for each Peer.
|
|
for (const otherPeer of this._getJoinedPeers({ excludePeer: peer }))
|
|
{
|
|
this._createDataConsumer(
|
|
{
|
|
dataConsumerPeer : otherPeer,
|
|
dataProducerPeer : peer,
|
|
dataProducer
|
|
});
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case 'bot':
|
|
{
|
|
// Pass it to the bot.
|
|
this._bot.handlePeerDataProducer(
|
|
{
|
|
dataProducerId : dataProducer.id,
|
|
peer
|
|
});
|
|
|
|
break;
|
|
}
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case 'changeDisplayName':
|
|
{
|
|
// Ensure the Peer is joined.
|
|
if (!peer.data.joined)
|
|
throw new Error('Peer not yet joined');
|
|
|
|
const { displayName } = request.data;
|
|
const oldDisplayName = peer.data.displayName;
|
|
|
|
// Store the display name into the custom data Object of the protoo
|
|
// Peer.
|
|
peer.data.displayName = displayName;
|
|
|
|
// Notify other joined Peers.
|
|
for (const otherPeer of this._getJoinedPeers({ excludePeer: peer }))
|
|
{
|
|
otherPeer.notify(
|
|
'peerDisplayNameChanged',
|
|
{
|
|
peerId : peer.id,
|
|
displayName,
|
|
oldDisplayName
|
|
})
|
|
.catch(() => {});
|
|
}
|
|
|
|
accept();
|
|
|
|
break;
|
|
}
|
|
|
|
case 'getTransportStats':
|
|
{
|
|
const { transportId } = request.data;
|
|
const transport = peer.data.transports.get(transportId);
|
|
|
|
if (!transport)
|
|
throw new Error(`transport with id "${transportId}" not found`);
|
|
|
|
const stats = await transport.getStats();
|
|
|
|
accept(stats);
|
|
|
|
break;
|
|
}
|
|
|
|
case 'getProducerStats':
|
|
{
|
|
const { producerId } = request.data;
|
|
const producer = peer.data.producers.get(producerId);
|
|
|
|
if (!producer)
|
|
throw new Error(`producer with id "${producerId}" not found`);
|
|
|
|
const stats = await producer.getStats();
|
|
|
|
accept(stats);
|
|
|
|
break;
|
|
}
|
|
|
|
case 'getConsumerStats':
|
|
{
|
|
const { consumerId } = request.data;
|
|
const consumer = peer.data.consumers.get(consumerId);
|
|
|
|
if (!consumer)
|
|
throw new Error(`consumer with id "${consumerId}" not found`);
|
|
|
|
const stats = await consumer.getStats();
|
|
|
|
accept(stats);
|
|
|
|
break;
|
|
}
|
|
|
|
case 'getDataProducerStats':
|
|
{
|
|
const { dataProducerId } = request.data;
|
|
const dataProducer = peer.data.dataProducers.get(dataProducerId);
|
|
|
|
if (!dataProducer)
|
|
throw new Error(`dataProducer with id "${dataProducerId}" not found`);
|
|
|
|
const stats = await dataProducer.getStats();
|
|
|
|
accept(stats);
|
|
|
|
break;
|
|
}
|
|
|
|
case 'getDataConsumerStats':
|
|
{
|
|
const { dataConsumerId } = request.data;
|
|
const dataConsumer = peer.data.dataConsumers.get(dataConsumerId);
|
|
|
|
if (!dataConsumer)
|
|
throw new Error(`dataConsumer with id "${dataConsumerId}" not found`);
|
|
|
|
const stats = await dataConsumer.getStats();
|
|
|
|
accept(stats);
|
|
|
|
break;
|
|
}
|
|
|
|
case 'applyNetworkThrottle':
|
|
{
|
|
const DefaultUplink = 1000000;
|
|
const DefaultDownlink = 1000000;
|
|
const DefaultRtt = 0;
|
|
const DefaultPacketLoss = 0;
|
|
|
|
const { secret, uplink, downlink, rtt, packetLoss } = request.data;
|
|
|
|
if (!secret || secret !== process.env.NETWORK_THROTTLE_SECRET)
|
|
{
|
|
reject(403, 'operation NOT allowed, modda fuckaa');
|
|
|
|
return;
|
|
}
|
|
|
|
try
|
|
{
|
|
this._networkThrottled = true;
|
|
|
|
await throttle.start(
|
|
{
|
|
up : uplink || DefaultUplink,
|
|
down : downlink || DefaultDownlink,
|
|
rtt : rtt || DefaultRtt,
|
|
packetLoss : packetLoss || DefaultPacketLoss
|
|
});
|
|
|
|
logger.warn(
|
|
'network throttle set [uplink:%s, downlink:%s, rtt:%s, packetLoss:%s]',
|
|
uplink || DefaultUplink,
|
|
downlink || DefaultDownlink,
|
|
rtt || DefaultRtt,
|
|
packetLoss || DefaultPacketLoss);
|
|
|
|
accept();
|
|
}
|
|
catch (error)
|
|
{
|
|
logger.error('network throttle apply failed: %o', error);
|
|
|
|
reject(500, error.toString());
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case 'resetNetworkThrottle':
|
|
{
|
|
const { secret } = request.data;
|
|
|
|
if (!secret || secret !== process.env.NETWORK_THROTTLE_SECRET)
|
|
{
|
|
reject(403, 'operation NOT allowed, modda fuckaa');
|
|
|
|
return;
|
|
}
|
|
|
|
try
|
|
{
|
|
await throttle.stop({});
|
|
|
|
logger.warn('network throttle stopped');
|
|
|
|
accept();
|
|
}
|
|
catch (error)
|
|
{
|
|
logger.error('network throttle stop failed: %o', error);
|
|
|
|
reject(500, error.toString());
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
default:
|
|
{
|
|
logger.error('unknown request.method "%s"', request.method);
|
|
|
|
reject(500, `unknown request.method "${request.method}"`);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Helper to get the list of joined protoo peers.
|
|
*/
|
|
_getJoinedPeers({ excludePeer = undefined } = {})
|
|
{
|
|
return this._protooRoom.peers
|
|
.filter((peer) => peer.data.joined && peer !== excludePeer);
|
|
}
|
|
|
|
/**
|
|
* Creates a mediasoup Consumer for the given mediasoup Producer.
|
|
*
|
|
* @async
|
|
*/
|
|
async _createConsumer({ consumerPeer, producerPeer, producer })
|
|
{
|
|
// Optimization:
|
|
// - Create the server-side Consumer in paused mode.
|
|
// - Tell its Peer about it and wait for its response.
|
|
// - Upon receipt of the response, resume the server-side Consumer.
|
|
// - If video, this will mean a single key frame requested by the
|
|
// server-side Consumer (when resuming it).
|
|
// - If audio (or video), it will avoid that RTP packets are received by the
|
|
// remote endpoint *before* the Consumer is locally created in the endpoint
|
|
// (and before the local SDP O/A procedure ends). If that happens (RTP
|
|
// packets are received before the SDP O/A is done) the PeerConnection may
|
|
// fail to associate the RTP stream.
|
|
|
|
// NOTE: Don't create the Consumer if the remote Peer cannot consume it.
|
|
if (
|
|
!consumerPeer.data.rtpCapabilities ||
|
|
!this._mediasoupRouter.canConsume(
|
|
{
|
|
producerId : producer.id,
|
|
rtpCapabilities : consumerPeer.data.rtpCapabilities
|
|
})
|
|
)
|
|
{
|
|
return;
|
|
}
|
|
|
|
// Must take the Transport the remote Peer is using for consuming.
|
|
const transport = Array.from(consumerPeer.data.transports.values())
|
|
.find((t) => t.appData.consuming);
|
|
|
|
// This should not happen.
|
|
if (!transport)
|
|
{
|
|
logger.warn('_createConsumer() | Transport for consuming not found');
|
|
|
|
return;
|
|
}
|
|
|
|
const promises = [];
|
|
|
|
const consumerCount = 1 + this._consumerReplicas;
|
|
|
|
for (let i=0; i<consumerCount; i++)
|
|
{
|
|
promises.push(
|
|
// eslint-disable-next-line no-async-promise-executor
|
|
new Promise(async (resolve) =>
|
|
{
|
|
// Create the Consumer in paused mode.
|
|
let consumer;
|
|
|
|
try
|
|
{
|
|
consumer = await transport.consume(
|
|
{
|
|
producerId : producer.id,
|
|
rtpCapabilities : consumerPeer.data.rtpCapabilities,
|
|
// Enable NACK for OPUS.
|
|
enableRtx : true,
|
|
paused : true,
|
|
ignoreDtx : true
|
|
});
|
|
}
|
|
catch (error)
|
|
{
|
|
logger.warn('_createConsumer() | transport.consume():%o', error);
|
|
|
|
resolve();
|
|
|
|
return;
|
|
}
|
|
|
|
// Store the Consumer into the protoo consumerPeer data Object.
|
|
consumerPeer.data.consumers.set(consumer.id, consumer);
|
|
|
|
// Set Consumer events.
|
|
consumer.on('transportclose', () =>
|
|
{
|
|
// Remove from its map.
|
|
consumerPeer.data.consumers.delete(consumer.id);
|
|
});
|
|
|
|
consumer.on('producerclose', () =>
|
|
{
|
|
// Remove from its map.
|
|
consumerPeer.data.consumers.delete(consumer.id);
|
|
|
|
consumerPeer.notify('consumerClosed', { consumerId: consumer.id })
|
|
.catch(() => {});
|
|
});
|
|
|
|
consumer.on('producerpause', () =>
|
|
{
|
|
consumerPeer.notify('consumerPaused', { consumerId: consumer.id })
|
|
.catch(() => {});
|
|
});
|
|
|
|
consumer.on('producerresume', () =>
|
|
{
|
|
consumerPeer.notify('consumerResumed', { consumerId: consumer.id })
|
|
.catch(() => {});
|
|
});
|
|
|
|
consumer.on('score', (score) =>
|
|
{
|
|
// logger.debug(
|
|
// 'consumer "score" event [consumerId:%s, score:%o]',
|
|
// consumer.id, score);
|
|
|
|
consumerPeer.notify('consumerScore', { consumerId: consumer.id, score })
|
|
.catch(() => {});
|
|
});
|
|
|
|
consumer.on('layerschange', (layers) =>
|
|
{
|
|
consumerPeer.notify(
|
|
'consumerLayersChanged',
|
|
{
|
|
consumerId : consumer.id,
|
|
spatialLayer : layers ? layers.spatialLayer : null,
|
|
temporalLayer : layers ? layers.temporalLayer : null
|
|
})
|
|
.catch(() => {});
|
|
});
|
|
|
|
// NOTE: For testing.
|
|
// await consumer.enableTraceEvent([ 'rtp', 'keyframe', 'nack', 'pli', 'fir' ]);
|
|
// await consumer.enableTraceEvent([ 'pli', 'fir' ]);
|
|
// await consumer.enableTraceEvent([ 'keyframe' ]);
|
|
|
|
consumer.on('trace', (trace) =>
|
|
{
|
|
logger.debug(
|
|
'consumer "trace" event [producerId:%s, trace.type:%s, trace:%o]',
|
|
consumer.id, trace.type, trace);
|
|
});
|
|
|
|
// Send a protoo request to the remote Peer with Consumer parameters.
|
|
try
|
|
{
|
|
await consumerPeer.request(
|
|
'newConsumer',
|
|
{
|
|
peerId : producerPeer.id,
|
|
producerId : producer.id,
|
|
id : consumer.id,
|
|
kind : consumer.kind,
|
|
rtpParameters : consumer.rtpParameters,
|
|
type : consumer.type,
|
|
appData : producer.appData,
|
|
producerPaused : consumer.producerPaused
|
|
});
|
|
|
|
// Now that we got the positive response from the remote endpoint, resume
|
|
// the Consumer so the remote endpoint will receive the a first RTP packet
|
|
// of this new stream once its PeerConnection is already ready to process
|
|
// and associate it.
|
|
await consumer.resume();
|
|
|
|
consumerPeer.notify(
|
|
'consumerScore',
|
|
{
|
|
consumerId : consumer.id,
|
|
score : consumer.score
|
|
})
|
|
.catch(() => {});
|
|
|
|
resolve();
|
|
}
|
|
catch (error)
|
|
{
|
|
logger.warn('_createConsumer() | failed:%o', error);
|
|
|
|
resolve();
|
|
}
|
|
})
|
|
);
|
|
}
|
|
|
|
try
|
|
{
|
|
await Promise.all(promises);
|
|
}
|
|
catch (error)
|
|
{
|
|
logger.warn('_createConsumer() | failed:%o', error);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Creates a mediasoup DataConsumer for the given mediasoup DataProducer.
|
|
*
|
|
* @async
|
|
*/
|
|
async _createDataConsumer(
|
|
{
|
|
dataConsumerPeer,
|
|
dataProducerPeer = null, // This is null for the bot DataProducer.
|
|
dataProducer
|
|
})
|
|
{
|
|
// NOTE: Don't create the DataConsumer if the remote Peer cannot consume it.
|
|
if (!dataConsumerPeer.data.sctpCapabilities)
|
|
return;
|
|
|
|
// Must take the Transport the remote Peer is using for consuming.
|
|
const transport = Array.from(dataConsumerPeer.data.transports.values())
|
|
.find((t) => t.appData.consuming);
|
|
|
|
// This should not happen.
|
|
if (!transport)
|
|
{
|
|
logger.warn('_createDataConsumer() | Transport for consuming not found');
|
|
|
|
return;
|
|
}
|
|
|
|
// Create the DataConsumer.
|
|
let dataConsumer;
|
|
|
|
try
|
|
{
|
|
dataConsumer = await transport.consumeData(
|
|
{
|
|
dataProducerId : dataProducer.id
|
|
});
|
|
}
|
|
catch (error)
|
|
{
|
|
logger.warn('_createDataConsumer() | transport.consumeData():%o', error);
|
|
|
|
return;
|
|
}
|
|
|
|
// Store the DataConsumer into the protoo dataConsumerPeer data Object.
|
|
dataConsumerPeer.data.dataConsumers.set(dataConsumer.id, dataConsumer);
|
|
|
|
// Set DataConsumer events.
|
|
dataConsumer.on('transportclose', () =>
|
|
{
|
|
// Remove from its map.
|
|
dataConsumerPeer.data.dataConsumers.delete(dataConsumer.id);
|
|
});
|
|
|
|
dataConsumer.on('dataproducerclose', () =>
|
|
{
|
|
// Remove from its map.
|
|
dataConsumerPeer.data.dataConsumers.delete(dataConsumer.id);
|
|
|
|
dataConsumerPeer.notify(
|
|
'dataConsumerClosed', { dataConsumerId: dataConsumer.id })
|
|
.catch(() => {});
|
|
});
|
|
|
|
// Send a protoo request to the remote Peer with Consumer parameters.
|
|
try
|
|
{
|
|
await dataConsumerPeer.request(
|
|
'newDataConsumer',
|
|
{
|
|
// This is null for bot DataProducer.
|
|
peerId : dataProducerPeer ? dataProducerPeer.id : null,
|
|
dataProducerId : dataProducer.id,
|
|
id : dataConsumer.id,
|
|
sctpStreamParameters : dataConsumer.sctpStreamParameters,
|
|
label : dataConsumer.label,
|
|
protocol : dataConsumer.protocol,
|
|
appData : dataProducer.appData
|
|
});
|
|
}
|
|
catch (error)
|
|
{
|
|
logger.warn('_createDataConsumer() | failed:%o', error);
|
|
}
|
|
}
|
|
}
|
|
|
|
module.exports = Room;
|