@@ -11,7 +11,10 @@ namespace Discord.Audio | |||||
public override bool CanSeek => false; | public override bool CanSeek => false; | ||||
public override bool CanWrite => false; | public override bool CanWrite => false; | ||||
public virtual void WriteHeader(ushort seq, uint timestamp, bool missed) { } | |||||
public virtual void WriteHeader(ushort seq, uint timestamp, bool missed) | |||||
{ | |||||
throw new InvalidOperationException("This stream does not accept headers"); | |||||
} | |||||
public override void Write(byte[] buffer, int offset, int count) | public override void Write(byte[] buffer, int offset, int count) | ||||
{ | { | ||||
WriteAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult(); | WriteAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult(); | ||||
@@ -142,31 +142,31 @@ namespace Discord.Audio | |||||
public AudioOutStream CreateOpusStream(int bufferMillis) | public AudioOutStream CreateOpusStream(int bufferMillis) | ||||
{ | { | ||||
var outputStream = new OutputStream(ApiClient); | |||||
var sodiumEncrypter = new SodiumEncryptStream( outputStream, this); | |||||
var rtpWriter = new RTPWriteStream(sodiumEncrypter, _ssrc); | |||||
return new BufferedWriteStream(rtpWriter, this, bufferMillis, _connection.CancelToken, _audioLogger); | |||||
var outputStream = new OutputStream(ApiClient); //Ignores header | |||||
var sodiumEncrypter = new SodiumEncryptStream( outputStream, this); //Passes header | |||||
var rtpWriter = new RTPWriteStream(sodiumEncrypter, _ssrc); //Consumes header, passes | |||||
return new BufferedWriteStream(rtpWriter, this, bufferMillis, _connection.CancelToken, _audioLogger); //Generates header | |||||
} | } | ||||
public AudioOutStream CreateDirectOpusStream() | public AudioOutStream CreateDirectOpusStream() | ||||
{ | { | ||||
var outputStream = new OutputStream(ApiClient); | |||||
var sodiumEncrypter = new SodiumEncryptStream(outputStream, this); | |||||
return new RTPWriteStream(sodiumEncrypter, _ssrc); | |||||
var outputStream = new OutputStream(ApiClient); //Ignores header | |||||
var sodiumEncrypter = new SodiumEncryptStream(outputStream, this); //Passes header | |||||
return new RTPWriteStream(sodiumEncrypter, _ssrc); //Consumes header (external input), passes | |||||
} | } | ||||
public AudioOutStream CreatePCMStream(AudioApplication application, int? bitrate, int bufferMillis) | public AudioOutStream CreatePCMStream(AudioApplication application, int? bitrate, int bufferMillis) | ||||
{ | { | ||||
var outputStream = new OutputStream(ApiClient); | |||||
var sodiumEncrypter = new SodiumEncryptStream(outputStream, this); | |||||
var rtpWriter = new RTPWriteStream(sodiumEncrypter, _ssrc); | |||||
var bufferedStream = new BufferedWriteStream(rtpWriter, this, bufferMillis, _connection.CancelToken, _audioLogger); | |||||
return new OpusEncodeStream(bufferedStream, bitrate ?? (96 * 1024), application); | |||||
var outputStream = new OutputStream(ApiClient); //Ignores header | |||||
var sodiumEncrypter = new SodiumEncryptStream(outputStream, this); //Passes header | |||||
var rtpWriter = new RTPWriteStream(sodiumEncrypter, _ssrc); //Consumes header, passes | |||||
var bufferedStream = new BufferedWriteStream(rtpWriter, this, bufferMillis, _connection.CancelToken, _audioLogger); //Ignores header, generates header | |||||
return new OpusEncodeStream(bufferedStream, bitrate ?? (96 * 1024), application); //Generates header | |||||
} | } | ||||
public AudioOutStream CreateDirectPCMStream(AudioApplication application, int? bitrate) | public AudioOutStream CreateDirectPCMStream(AudioApplication application, int? bitrate) | ||||
{ | { | ||||
var outputStream = new OutputStream(ApiClient); | |||||
var sodiumEncrypter = new SodiumEncryptStream(outputStream, this); | |||||
var rtpWriter = new RTPWriteStream(sodiumEncrypter, _ssrc); | |||||
return new OpusEncodeStream(rtpWriter, bitrate ?? (96 * 1024), application); | |||||
var outputStream = new OutputStream(ApiClient); //Ignores header | |||||
var sodiumEncrypter = new SodiumEncryptStream(outputStream, this); //Passes header | |||||
var rtpWriter = new RTPWriteStream(sodiumEncrypter, _ssrc); //Consumes header, passes | |||||
return new OpusEncodeStream(rtpWriter, bitrate ?? (96 * 1024), application); //Generates header | |||||
} | } | ||||
internal async Task CreateInputStreamAsync(ulong userId) | internal async Task CreateInputStreamAsync(ulong userId) | ||||
@@ -174,11 +174,11 @@ namespace Discord.Audio | |||||
//Assume Thread-safe | //Assume Thread-safe | ||||
if (!_streams.ContainsKey(userId)) | if (!_streams.ContainsKey(userId)) | ||||
{ | { | ||||
var readerStream = new InputStream(); | |||||
var opusDecoder = new OpusDecodeStream(readerStream); | |||||
var readerStream = new InputStream(); //Consumes header | |||||
var opusDecoder = new OpusDecodeStream(readerStream); //Passes header | |||||
//var jitterBuffer = new JitterBuffer(opusDecoder, _audioLogger); | //var jitterBuffer = new JitterBuffer(opusDecoder, _audioLogger); | ||||
var rtpReader = new RTPReadStream(opusDecoder); | |||||
var decryptStream = new SodiumDecryptStream(rtpReader, this); | |||||
var rtpReader = new RTPReadStream(opusDecoder); //Generates header | |||||
var decryptStream = new SodiumDecryptStream(rtpReader, this); //No header | |||||
_streams.TryAdd(userId, new StreamPair(readerStream, decryptStream)); | _streams.TryAdd(userId, new StreamPair(readerStream, decryptStream)); | ||||
await _streamCreatedEvent.InvokeAsync(userId, readerStream); | await _streamCreatedEvent.InvokeAsync(userId, readerStream); | ||||
} | } | ||||
@@ -88,11 +88,12 @@ namespace Discord.Audio.Streams | |||||
if (_queuedFrames.TryDequeue(out Frame frame)) | if (_queuedFrames.TryDequeue(out Frame frame)) | ||||
{ | { | ||||
await _client.SetSpeakingAsync(true).ConfigureAwait(false); | await _client.SetSpeakingAsync(true).ConfigureAwait(false); | ||||
_next.WriteHeader(seq++, timestamp, false); | |||||
_next.WriteHeader(seq, timestamp, false); | |||||
await _next.WriteAsync(frame.Buffer, 0, frame.Bytes).ConfigureAwait(false); | await _next.WriteAsync(frame.Buffer, 0, frame.Bytes).ConfigureAwait(false); | ||||
_bufferPool.Enqueue(frame.Buffer); | _bufferPool.Enqueue(frame.Buffer); | ||||
_queueLock.Release(); | _queueLock.Release(); | ||||
nextTick += _ticksPerFrame; | nextTick += _ticksPerFrame; | ||||
seq++; | |||||
timestamp += OpusEncoder.FrameSamplesPerChannel; | timestamp += OpusEncoder.FrameSamplesPerChannel; | ||||
_silenceFrames = 0; | _silenceFrames = 0; | ||||
#if DEBUG | #if DEBUG | ||||
@@ -105,12 +106,13 @@ namespace Discord.Audio.Streams | |||||
{ | { | ||||
if (_silenceFrames++ < MaxSilenceFrames) | if (_silenceFrames++ < MaxSilenceFrames) | ||||
{ | { | ||||
_next.WriteHeader(seq++, timestamp, false); | |||||
_next.WriteHeader(seq, timestamp, false); | |||||
await _next.WriteAsync(_silenceFrame, 0, _silenceFrame.Length).ConfigureAwait(false); | await _next.WriteAsync(_silenceFrame, 0, _silenceFrame.Length).ConfigureAwait(false); | ||||
} | } | ||||
else | else | ||||
await _client.SetSpeakingAsync(false).ConfigureAwait(false); | await _client.SetSpeakingAsync(false).ConfigureAwait(false); | ||||
nextTick += _ticksPerFrame; | nextTick += _ticksPerFrame; | ||||
seq++; | |||||
timestamp += OpusEncoder.FrameSamplesPerChannel; | timestamp += OpusEncoder.FrameSamplesPerChannel; | ||||
} | } | ||||
#if DEBUG | #if DEBUG | ||||
@@ -126,6 +128,7 @@ namespace Discord.Audio.Streams | |||||
}); | }); | ||||
} | } | ||||
public override void WriteHeader(ushort seq, uint timestamp, bool missed) { } //Ignore, we use our own timing | |||||
public override async Task WriteAsync(byte[] data, int offset, int count, CancellationToken cancelToken) | public override async Task WriteAsync(byte[] data, int offset, int count, CancellationToken cancelToken) | ||||
{ | { | ||||
if (cancelToken.CanBeCanceled) | if (cancelToken.CanBeCanceled) | ||||
@@ -1,4 +1,4 @@ | |||||
using Discord.Logging; | |||||
/*using Discord.Logging; | |||||
using System; | using System; | ||||
using System.Collections.Concurrent; | using System.Collections.Concurrent; | ||||
using System.Threading; | using System.Threading; | ||||
@@ -243,4 +243,4 @@ namespace Discord.Audio.Streams | |||||
return Task.Delay(0); | return Task.Delay(0); | ||||
} | } | ||||
} | } | ||||
} | |||||
}*/ |
@@ -25,12 +25,13 @@ namespace Discord.Audio.Streams | |||||
public override void WriteHeader(ushort seq, uint timestamp, bool missed) | public override void WriteHeader(ushort seq, uint timestamp, bool missed) | ||||
{ | { | ||||
if (_hasHeader) | if (_hasHeader) | ||||
throw new InvalidOperationException("Header received with no payload"); | |||||
_nextMissed = missed; | |||||
throw new InvalidOperationException("Header received with no payload"); | |||||
_hasHeader = true; | _hasHeader = true; | ||||
_nextMissed = missed; | |||||
_next.WriteHeader(seq, timestamp, missed); | _next.WriteHeader(seq, timestamp, missed); | ||||
} | } | ||||
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) | |||||
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancelToken) | |||||
{ | { | ||||
if (!_hasHeader) | if (!_hasHeader) | ||||
throw new InvalidOperationException("Received payload without an RTP header"); | throw new InvalidOperationException("Received payload without an RTP header"); | ||||
@@ -39,17 +40,17 @@ namespace Discord.Audio.Streams | |||||
if (!_nextMissed) | if (!_nextMissed) | ||||
{ | { | ||||
count = _decoder.DecodeFrame(buffer, offset, count, _buffer, 0, false); | count = _decoder.DecodeFrame(buffer, offset, count, _buffer, 0, false); | ||||
await _next.WriteAsync(_buffer, 0, count, cancellationToken).ConfigureAwait(false); | |||||
await _next.WriteAsync(_buffer, 0, count, cancelToken).ConfigureAwait(false); | |||||
} | } | ||||
else if (count > 0) | else if (count > 0) | ||||
{ | { | ||||
count = _decoder.DecodeFrame(buffer, offset, count, _buffer, 0, true); | |||||
await _next.WriteAsync(_buffer, 0, count, cancellationToken).ConfigureAwait(false); | |||||
count = _decoder.DecodeFrame(buffer, offset, count, _buffer, 0, true); | |||||
await _next.WriteAsync(_buffer, 0, count, cancelToken).ConfigureAwait(false); | |||||
} | } | ||||
else | else | ||||
{ | { | ||||
count = _decoder.DecodeFrame(null, 0, 0, _buffer, 0, true); | |||||
await _next.WriteAsync(_buffer, 0, count, cancellationToken).ConfigureAwait(false); | |||||
count = _decoder.DecodeFrame(null, 0, 0, _buffer, 0, true); | |||||
await _next.WriteAsync(_buffer, 0, count, cancelToken).ConfigureAwait(false); | |||||
} | } | ||||
} | } | ||||
@@ -13,6 +13,8 @@ namespace Discord.Audio.Streams | |||||
private readonly OpusEncoder _encoder; | private readonly OpusEncoder _encoder; | ||||
private readonly byte[] _buffer; | private readonly byte[] _buffer; | ||||
private int _partialFramePos; | private int _partialFramePos; | ||||
private ushort _seq; | |||||
private uint _timestamp; | |||||
public OpusEncodeStream(AudioStream next, int bitrate, AudioApplication application) | public OpusEncodeStream(AudioStream next, int bitrate, AudioApplication application) | ||||
{ | { | ||||
@@ -21,7 +23,7 @@ namespace Discord.Audio.Streams | |||||
_buffer = new byte[OpusConverter.FrameBytes]; | _buffer = new byte[OpusConverter.FrameBytes]; | ||||
} | } | ||||
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) | |||||
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancelToken) | |||||
{ | { | ||||
//Assume threadsafe | //Assume threadsafe | ||||
while (count > 0) | while (count > 0) | ||||
@@ -30,10 +32,13 @@ namespace Discord.Audio.Streams | |||||
{ | { | ||||
//We have enough data and no partial frames. Pass the buffer directly to the encoder | //We have enough data and no partial frames. Pass the buffer directly to the encoder | ||||
int encFrameSize = _encoder.EncodeFrame(buffer, offset, _buffer, 0); | int encFrameSize = _encoder.EncodeFrame(buffer, offset, _buffer, 0); | ||||
await _next.WriteAsync(_buffer, 0, encFrameSize, cancellationToken).ConfigureAwait(false); | |||||
_next.WriteHeader(_seq, _timestamp, false); | |||||
await _next.WriteAsync(_buffer, 0, encFrameSize, cancelToken).ConfigureAwait(false); | |||||
offset += OpusConverter.FrameBytes; | offset += OpusConverter.FrameBytes; | ||||
count -= OpusConverter.FrameBytes; | count -= OpusConverter.FrameBytes; | ||||
_seq++; | |||||
_timestamp += OpusConverter.FrameBytes; | |||||
} | } | ||||
else if (_partialFramePos + count >= OpusConverter.FrameBytes) | else if (_partialFramePos + count >= OpusConverter.FrameBytes) | ||||
{ | { | ||||
@@ -41,11 +46,14 @@ namespace Discord.Audio.Streams | |||||
int partialSize = OpusConverter.FrameBytes - _partialFramePos; | int partialSize = OpusConverter.FrameBytes - _partialFramePos; | ||||
Buffer.BlockCopy(buffer, offset, _buffer, _partialFramePos, partialSize); | Buffer.BlockCopy(buffer, offset, _buffer, _partialFramePos, partialSize); | ||||
int encFrameSize = _encoder.EncodeFrame(_buffer, 0, _buffer, 0); | int encFrameSize = _encoder.EncodeFrame(_buffer, 0, _buffer, 0); | ||||
await _next.WriteAsync(_buffer, 0, encFrameSize, cancellationToken).ConfigureAwait(false); | |||||
_next.WriteHeader(_seq, _timestamp, false); | |||||
await _next.WriteAsync(_buffer, 0, encFrameSize, cancelToken).ConfigureAwait(false); | |||||
offset += partialSize; | offset += partialSize; | ||||
count -= partialSize; | count -= partialSize; | ||||
_partialFramePos = 0; | _partialFramePos = 0; | ||||
_seq++; | |||||
_timestamp += OpusConverter.FrameBytes; | |||||
} | } | ||||
else | else | ||||
{ | { | ||||
@@ -57,8 +65,8 @@ namespace Discord.Audio.Streams | |||||
} | } | ||||
} | } | ||||
/* | |||||
public override async Task FlushAsync(CancellationToken cancellationToken) | |||||
/* //Opus throws memory errors on bad frames | |||||
public override async Task FlushAsync(CancellationToken cancelToken) | |||||
{ | { | ||||
try | try | ||||
{ | { | ||||
@@ -67,7 +75,7 @@ namespace Discord.Audio.Streams | |||||
} | } | ||||
catch (Exception) { } //Incomplete frame | catch (Exception) { } //Incomplete frame | ||||
_partialFramePos = 0; | _partialFramePos = 0; | ||||
await base.FlushAsync(cancellationToken).ConfigureAwait(false); | |||||
await base.FlushAsync(cancelToken).ConfigureAwait(false); | |||||
}*/ | }*/ | ||||
public override async Task FlushAsync(CancellationToken cancelToken) | public override async Task FlushAsync(CancellationToken cancelToken) | ||||
@@ -13,7 +13,8 @@ namespace Discord.Audio.Streams | |||||
{ | { | ||||
_client = client; | _client = client; | ||||
} | } | ||||
public override void WriteHeader(ushort seq, uint timestamp, bool missed) { } //Ignore | |||||
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancelToken) | public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancelToken) | ||||
{ | { | ||||
cancelToken.ThrowIfCancellationRequested(); | cancelToken.ThrowIfCancellationRequested(); | ||||
@@ -33,14 +33,14 @@ namespace Discord.Audio.Streams | |||||
{ | { | ||||
if (_hasHeader) | if (_hasHeader) | ||||
throw new InvalidOperationException("Header received with no payload"); | throw new InvalidOperationException("Header received with no payload"); | ||||
_hasHeader = true; | _hasHeader = true; | ||||
_nextSeq = seq; | _nextSeq = seq; | ||||
_nextTimestamp = timestamp; | _nextTimestamp = timestamp; | ||||
} | } | ||||
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) | |||||
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancelToken) | |||||
{ | { | ||||
cancellationToken.ThrowIfCancellationRequested(); | |||||
cancelToken.ThrowIfCancellationRequested(); | |||||
if (!_hasHeader) | if (!_hasHeader) | ||||
throw new InvalidOperationException("Received payload without an RTP header"); | throw new InvalidOperationException("Received payload without an RTP header"); | ||||
_hasHeader = false; | _hasHeader = false; | ||||
@@ -57,6 +57,7 @@ namespace Discord.Audio.Streams | |||||
Buffer.BlockCopy(_header, 0, _buffer, 0, 12); //Copy RTP header from to the buffer | Buffer.BlockCopy(_header, 0, _buffer, 0, 12); //Copy RTP header from to the buffer | ||||
Buffer.BlockCopy(buffer, offset, _buffer, 12, count); | Buffer.BlockCopy(buffer, offset, _buffer, 12, count); | ||||
_next.WriteHeader(_nextSeq, _nextTimestamp, false); | |||||
await _next.WriteAsync(_buffer, 0, count + 12).ConfigureAwait(false); | await _next.WriteAsync(_buffer, 0, count + 12).ConfigureAwait(false); | ||||
} | } | ||||
@@ -10,6 +10,9 @@ namespace Discord.Audio.Streams | |||||
private readonly AudioClient _client; | private readonly AudioClient _client; | ||||
private readonly AudioStream _next; | private readonly AudioStream _next; | ||||
private readonly byte[] _nonce; | private readonly byte[] _nonce; | ||||
private bool _hasHeader; | |||||
private ushort _nextSeq; | |||||
private uint _nextTimestamp; | |||||
public SodiumEncryptStream(AudioStream next, IAudioClient client) | public SodiumEncryptStream(AudioStream next, IAudioClient client) | ||||
{ | { | ||||
@@ -17,16 +20,28 @@ namespace Discord.Audio.Streams | |||||
_client = (AudioClient)client; | _client = (AudioClient)client; | ||||
_nonce = new byte[24]; | _nonce = new byte[24]; | ||||
} | } | ||||
public override void WriteHeader(ushort seq, uint timestamp, bool missed) | |||||
{ | |||||
if (_hasHeader) | |||||
throw new InvalidOperationException("Header received with no payload"); | |||||
_nextSeq = seq; | |||||
_nextTimestamp = timestamp; | |||||
} | |||||
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancelToken) | public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancelToken) | ||||
{ | { | ||||
cancelToken.ThrowIfCancellationRequested(); | cancelToken.ThrowIfCancellationRequested(); | ||||
if (!_hasHeader) | |||||
throw new InvalidOperationException("Received payload without an RTP header"); | |||||
_hasHeader = false; | |||||
if (_client.SecretKey == null) | if (_client.SecretKey == null) | ||||
return; | return; | ||||
Buffer.BlockCopy(buffer, offset, _nonce, 0, 12); //Copy nonce from RTP header | Buffer.BlockCopy(buffer, offset, _nonce, 0, 12); //Copy nonce from RTP header | ||||
count = SecretBox.Encrypt(buffer, offset + 12, count - 12, buffer, 12, _nonce, _client.SecretKey); | count = SecretBox.Encrypt(buffer, offset + 12, count - 12, buffer, 12, _nonce, _client.SecretKey); | ||||
_next.WriteHeader(_nextSeq, _nextTimestamp, false); | |||||
await _next.WriteAsync(buffer, 0, count + 12, cancelToken).ConfigureAwait(false); | await _next.WriteAsync(buffer, 0, count + 12, cancelToken).ConfigureAwait(false); | ||||
} | } | ||||