Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(WIP): Add option to record audio in mka. #2220

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,16 @@ class AudioLevelReader(

if (!silence) stats.nonSilence(AudioLevelHeaderExtension.getVad(ext))
if (silence && forwardedSilencePackets > forwardedSilencePacketsLimit) {
packetInfo.shouldDiscard = true
// packetInfo.shouldDiscard = true
stats.discardedSilence()
} else if (this@AudioLevelReader.forceMute) {
packetInfo.shouldDiscard = true
// packetInfo.shouldDiscard = true
stats.discardedForceMute()
} else {
forwardedSilencePackets = if (silence) forwardedSilencePackets + 1 else 0
audioLevelListener?.let { listener ->
if (listener.onLevelReceived(audioRtpPacket.ssrc, (127 - level).toPositiveLong())) {
packetInfo.shouldDiscard = true
// packetInfo.shouldDiscard = true
stats.discardedRanking()
}
}
Expand Down
4 changes: 4 additions & 0 deletions jvb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@
</profiles>

<dependencies>
<dependency>
<groupId>com.github.kokorin</groupId>
<artifactId>jebml</artifactId>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
Expand Down
19 changes: 18 additions & 1 deletion jvb/src/main/java/org/jitsi/videobridge/Conference.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.jitsi.utils.logging2.*;
import org.jitsi.utils.queue.*;
import org.jitsi.videobridge.colibri2.*;
import org.jitsi.videobridge.export.*;
import org.jitsi.videobridge.message.*;
import org.jitsi.videobridge.metrics.*;
import org.jitsi.videobridge.relay.*;
Expand All @@ -40,7 +41,6 @@
import org.json.simple.*;
import org.jxmpp.jid.*;

import java.time.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
Expand Down Expand Up @@ -180,6 +180,9 @@ public long getLocalVideoSsrc()
@Nullable
private final String meetingId;

@NotNull
private final Exporter exporter = new Exporter();

/**
* A regex pattern to trim UUIDs to just their first 8 hex characters.
*/
Expand Down Expand Up @@ -599,6 +602,7 @@ void expire()
logger.debug(() -> "Expiring endpoints.");
getEndpoints().forEach(AbstractEndpoint::expire);
getRelays().forEach(Relay::expire);
exporter.stop();
speechActivity.expire();

updateStatisticsOnExpire();
Expand Down Expand Up @@ -1118,6 +1122,14 @@ private void sendOut(PacketInfo packetInfo)
prevHandler = relay;
}
}
if (exporter.wants(packetInfo))
{
if (prevHandler != null)
{
prevHandler.send(packetInfo.clone());
}
prevHandler = exporter;
}

if (prevHandler != null)
{
Expand All @@ -1130,6 +1142,11 @@ private void sendOut(PacketInfo packetInfo)
}
}

public void setExports(List<Export> exports)
{
exporter.setExports(exports);
}

public boolean hasRelays()
{
return !relaysById.isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.jitsi.videobridge;

import org.jetbrains.annotations.*;
import org.jitsi.nlj.*;

public interface PotentialPacketHandler
Expand All @@ -26,11 +27,11 @@ public interface PotentialPacketHandler
* @param packet the RTP/RTCP packet
* @return true if this handler wants the given packet, false otherwise
*/
boolean wants(PacketInfo packet);
boolean wants(@NotNull PacketInfo packet);

/**
* Send the given RTP/RTCP 'packet' (which came from 'source')
* @param packet the RTP/RTCP packet
*/
void send(PacketInfo packet);
void send(@NotNull PacketInfo packet);
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class Colibri2ConferenceHandler(
for (e in conferenceModifyIQ.endpoints) {
responseBuilder.addEndpoint(handleColibri2Endpoint(e, ignoreUnknownEndpoints))
}
conferenceModifyIQ.exports?.let { conference.setExports(it.getExports()) }
for (r in conferenceModifyIQ.relays) {
if (!RelayConfig.config.enabled) {
throw IqProcessingException(Condition.feature_not_implemented, "Octo is disabled in configuration.")
Expand Down
49 changes: 49 additions & 0 deletions jvb/src/main/kotlin/org/jitsi/videobridge/export/Exporter.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package org.jitsi.videobridge.export

import org.jitsi.nlj.PacketInfo
import org.jitsi.nlj.rtp.AudioRtpPacket
import org.jitsi.utils.logging2.createLogger
import org.jitsi.videobridge.PotentialPacketHandler
import org.jitsi.videobridge.colibri2.FeatureNotImplementedException
import org.jitsi.videobridge.exporter.MediaJsonEncoder
import org.jitsi.videobridge.recorder.MediaJsonRecorder
import org.jitsi.videobridge.util.ByteBufferPool
import org.jitsi.xmpp.extensions.colibri2.Export

class Exporter : PotentialPacketHandler {
val logger = createLogger()
var started = false

private val encoder = MediaJsonEncoder { recorder.handleEvent(it) }
private val recorder = MediaJsonRecorder()

fun setExports(exports: List<Export>) {
when {
started && exports.isNotEmpty() -> throw FeatureNotImplementedException("Changing exports once enabled.")
exports.isEmpty() -> stop()
exports.size > 1 -> throw FeatureNotImplementedException("Multiple exports")
exports[0].video -> throw FeatureNotImplementedException("Video")
else -> start(exports[0])

}
}

override fun wants(packet: PacketInfo): Boolean = started && packet.packet is AudioRtpPacket

override fun send(packet: PacketInfo) {
if (started) {
encoder.encode(packet.packetAs(), packet.endpointId!!)
}
ByteBufferPool.returnBuffer(packet.packet.buffer)
}

fun stop() {
started = false
logger.info("Stopping.")
}

fun start(export: Export) {
logger.info("Starting with url=${export.url}")
started = true
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package org.jitsi.videobridge.exporter

import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import org.jitsi.mediajson.Event
import org.jitsi.mediajson.Media
import org.jitsi.mediajson.MediaEvent
import org.jitsi.mediajson.MediaFormat
import org.jitsi.mediajson.Start
import org.jitsi.mediajson.StartEvent
import org.jitsi.nlj.rtp.AudioRtpPacket
import org.jitsi.rtp.rtp.RtpPacket
import org.jitsi.utils.logging2.createLogger
import java.time.Clock
import java.time.Duration
import kotlin.io.encoding.Base64
import kotlin.io.encoding.ExperimentalEncodingApi

class MediaJsonEncoder(
val handleEvent: (Event) -> Unit
) {
val logger = createLogger()
val ref = Clock.systemUTC().instant()

private data class SsrcState(
val ssrc: Long,
val initialRtpTs: Long,
// Offset of this SSRC since the start time in RTP units
val offset: Long
)

private val ssrcsStarted = mutableSetOf<SsrcState>()
var seq = 0
val om = jacksonObjectMapper()

fun encode(p: AudioRtpPacket, epId: String) = synchronized(ssrcsStarted) {
if (ssrcsStarted.none { it.ssrc == p.ssrc } ) {
val offset: Long = ((Duration.between(ref, Clock.systemUTC().instant())).toNanos() * 48.0e-6).toLong()
val state = SsrcState(p.ssrc, p.timestamp, offset)
ssrcsStarted.add(state)
val e = StartEvent(
(++seq).toString(),
Start(
"$epId-${p.ssrc}",
MediaFormat(
"opus",
48000,
2
)
)
)
handleEvent(e)
}

seq++
handleEvent(p.encodeAsJson(epId))
}

@OptIn(ExperimentalEncodingApi::class)
private fun RtpPacket.encodeAsJson(epId: String): Event {
val ssrcState = ssrcsStarted.find { it.ssrc == this.ssrc }!!
val elapsedRtpTime = this.timestamp - ssrcState.initialRtpTs
val ts = elapsedRtpTime + ssrcState.offset
val p = MediaEvent(
seq.toString(),
media = Media(
"$epId-${this.ssrc}",
this.sequenceNumber.toString(),
ts.toString(),
Base64.encode(this.buffer, this.payloadOffset, this.payloadOffset + this.payloadLength)
)
)
return p
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.jitsi.videobridge.recorder

import org.bouncycastle.util.encoders.Base64
import org.jitsi.mediajson.Event
import org.jitsi.mediajson.MediaEvent
import org.jitsi.mediajson.StartEvent
import org.jitsi.utils.logging2.createLogger

class MediaJsonRecorder {
val mkaRecorder = MkaRecorder()
val logger = createLogger()

fun handleEvent(event: Event) {
when(event) {
is StartEvent -> {
logger.info("Start new stream: $event")
mkaRecorder.startTrack(event.start.tag)
}
is MediaEvent -> {
mkaRecorder.addFrame(
event.media.tag,
event.media.timestamp.toLong(),
Base64.decode(event.media.payload)
)
}
}
}

fun stop() {
logger.info("Stopping.")
mkaRecorder.close()
}
}
112 changes: 112 additions & 0 deletions jvb/src/main/kotlin/org/jitsi/videobridge/recorder/MkaRecorder.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package org.jitsi.videobridge.recorder

//import org.ebml.EBMLReader
//import org.ebml.Element
//import org.ebml.MasterElement
//import org.ebml.io.DataSource
//import org.ebml.io.FileDataSource
import org.ebml.io.FileDataWriter
import org.ebml.matroska.MatroskaFileFrame
import org.ebml.matroska.MatroskaFileTrack
import org.ebml.matroska.MatroskaFileTrack.TrackType
import org.ebml.matroska.MatroskaFileWriter
//import org.jitsi.rtp.extensions.get3Bytes
import org.jitsi.utils.logging2.createLogger
import java.io.File
import java.nio.ByteBuffer

class MkaRecorder {
private val logger = createLogger()
private val destination: File = File.createTempFile("test", ".mkv").apply {
logger.warn("Writing to $this")
}

private val ioDW = FileDataWriter(destination.path)
private val writer: MatroskaFileWriter = MatroskaFileWriter(ioDW)
private val tracks = mutableMapOf<String, MatroskaFileTrack>()

private var f = 0
fun startTrack(name: String) {
val track = MatroskaFileTrack().apply {
trackNo = tracks.size + 1
trackType = TrackType.AUDIO
codecID = "A_OPUS"
defaultDuration = 20000000
audio = MatroskaFileTrack.MatroskaAudioTrack().apply {
channels = 2
samplingFrequency = 48000F
}
}
tracks[name] = track
writer.addTrack(track)
}

fun addFrame(trackName: String, timecode: Long, payload: ByteArray) {
val track = tracks[trackName] ?: throw Exception("Track not started")
val frame = MatroskaFileFrame()
frame.data = ByteBuffer.wrap(payload)
frame.trackNo = track.trackNo
//frame.timecode = timecode / 48
logger.warn("Add to $trackName timecode=${timecode/48}")
writer.addFrame(frame)
}

fun close() {
writer.close()
ioDW.close()
//testDocTraversal()
//destination.delete()
}
//
// fun testDocTraversal() {
// val ioDS = FileDataSource(destination.path)
// val reader = EBMLReader(ioDS)
// var level0 = reader.readNextElement()
// while (level0 != null) {
// traverseElement(level0, ioDS, reader, 0)
// level0.skipData(ioDS)
// level0 = reader.readNextElement()
// }
// }
//
// var x = 0
// private fun traverseElement(levelN: Element?, ioDS: DataSource, reader: EBMLReader, level: Int) {
// if (levelN == null) {
// return
// }
//
// logger.info("Found element: ${".".repeat(level*2)} ${levelN.elementType.name}")
//// if (levelN.elementType.name == "TimecodeScale") {
//// levelN.readData(ioDS)
//// println("oops: "+ levelN.data.get3Bytes())
//// return
//// }
// if (levelN.elementType.name == "Timecode") {
// if (x == 0) { x++} else {
// levelN.readData(ioDS)
// if (levelN.data.capacity() == 1)
// println("oops: " + levelN.data.get().toInt())
// else if (levelN.data.capacity() == 2)
// println("oops: " + levelN.data.getShort().toInt())
// else if (levelN.data.capacity() == 3)
// println("oops: " + levelN.data.get3Bytes().toInt())
// else if (levelN.data.capacity() == 4)
// println("oops: " + levelN.data.getInt())
// return
// }
// }
//
// val elemLevel = levelN.elementType.level
// if (elemLevel != -1) {
// check(level.toLong() == elemLevel.toLong())
// }
// if (levelN is MasterElement) {
// var levelNPlusOne = levelN.readNextChild(reader)
// while (levelNPlusOne != null) {
// traverseElement(levelNPlusOne, ioDS, reader, level + 1)
// levelNPlusOne.skipData(ioDS)
// levelNPlusOne = levelN.readNextChild(reader)
// }
// }
// }
}
Loading
Loading