@@ -11,6 +11,7 @@ namespace Discord.Audio | |||
public override bool CanSeek => false; | |||
public override bool CanWrite => false; | |||
public virtual void WriteHeader(ushort seq, uint timestamp, bool missed) { } | |||
public override void Write(byte[] buffer, int offset, int count) | |||
{ | |||
WriteAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult(); | |||
@@ -5,12 +5,14 @@ namespace Discord.Audio | |||
public readonly ushort Sequence; | |||
public readonly uint Timestamp; | |||
public readonly byte[] Payload; | |||
public readonly bool Missed; | |||
public RTPFrame(ushort sequence, uint timestamp, byte[] payload) | |||
public RTPFrame(ushort sequence, uint timestamp, byte[] payload, bool missed) | |||
{ | |||
Sequence = sequence; | |||
Timestamp = timestamp; | |||
Payload = payload; | |||
Missed = missed; | |||
} | |||
} | |||
} |
@@ -8,6 +8,8 @@ namespace Discord.Net.Udp | |||
{ | |||
event Func<byte[], int, int, Task> ReceivedDatagram; | |||
ushort Port { get; } | |||
void SetCancelToken(CancellationToken cancelToken); | |||
void SetDestination(string ip, int port); | |||
@@ -18,6 +18,8 @@ namespace Discord.Net.Providers.UDPClient | |||
private CancellationToken _cancelToken, _parentToken; | |||
private Task _task; | |||
private bool _isDisposed; | |||
public ushort Port => (ushort)((_udp?.Client.LocalEndPoint as IPEndPoint)?.Port ?? 0); | |||
public UDPClient() | |||
{ | |||
@@ -107,6 +107,7 @@ namespace Discord.Audio | |||
{ | |||
await _audioLogger.DebugAsync("Connecting ApiClient").ConfigureAwait(false); | |||
await ApiClient.ConnectAsync("wss://" + _url).ConfigureAwait(false); | |||
await _audioLogger.DebugAsync("Listening on port " + ApiClient.UdpPort).ConfigureAwait(false); | |||
await _audioLogger.DebugAsync("Sending Identity").ConfigureAwait(false); | |||
await ApiClient.SendIdentityAsync(_userId, _sessionId, _token).ConfigureAwait(false); | |||
@@ -175,7 +176,8 @@ namespace Discord.Audio | |||
{ | |||
var readerStream = new InputStream(); | |||
var opusDecoder = new OpusDecodeStream(readerStream); | |||
var rtpReader = new RTPReadStream(readerStream, opusDecoder); | |||
//var jitterBuffer = new JitterBuffer(opusDecoder, _audioLogger); | |||
var rtpReader = new RTPReadStream(opusDecoder); | |||
var decryptStream = new SodiumDecryptStream(rtpReader, this); | |||
_streams.TryAdd(userId, new StreamPair(readerStream, decryptStream)); | |||
await _streamCreatedEvent.InvokeAsync(userId, readerStream); | |||
@@ -14,7 +14,7 @@ namespace Discord.Audio | |||
public const int FrameSamplesPerChannel = SamplingRate / 1000 * FrameMillis; | |||
public const int FrameSamples = FrameSamplesPerChannel * Channels; | |||
public const int FrameBytes = FrameSamples * SampleBytes; | |||
public const int FrameBytes = FrameSamplesPerChannel * SampleBytes; | |||
protected bool _isDisposed = false; | |||
@@ -20,12 +20,12 @@ namespace Discord.Audio | |||
CheckError(error); | |||
} | |||
public unsafe int DecodeFrame(byte[] input, int inputOffset, int inputCount, byte[] output, int outputOffset) | |||
public unsafe int DecodeFrame(byte[] input, int inputOffset, int inputCount, byte[] output, int outputOffset, bool decodeFEC) | |||
{ | |||
int result = 0; | |||
fixed (byte* inPtr = input) | |||
fixed (byte* outPtr = output) | |||
result = Decode(_ptr, inPtr + inputOffset, inputCount, outPtr + outputOffset, FrameSamplesPerChannel, 1); | |||
result = Decode(_ptr, inPtr + inputOffset, inputCount, outPtr + outputOffset, FrameSamplesPerChannel, decodeFEC ? 1 : 0); | |||
CheckError(result); | |||
return result * SampleBytes; | |||
} | |||
@@ -35,7 +35,7 @@ namespace Discord.Audio.Streams | |||
private readonly SemaphoreSlim _queueLock; | |||
private readonly Logger _logger; | |||
private readonly int _ticksPerFrame, _queueLength; | |||
private bool _isPreloaded; | |||
private bool _isPreloaded, _isSpeaking; | |||
private int _silenceFrames; | |||
public BufferedWriteStream(AudioStream next, IAudioClient client, int bufferMillis, CancellationToken cancelToken, int maxFrameSize = 1500) | |||
@@ -45,7 +45,7 @@ namespace Discord.Audio.Streams | |||
//maxFrameSize = 1275 was too limiting at 128kbps,2ch,60ms | |||
_next = next; | |||
_client = client; | |||
_ticksPerFrame = OpusEncoder.FrameSamples / 48; | |||
_ticksPerFrame = OpusEncoder.FrameMillis; | |||
_logger = logger; | |||
_queueLength = (bufferMillis + (_ticksPerFrame - 1)) / _ticksPerFrame; //Round up | |||
@@ -60,6 +60,12 @@ namespace Discord.Audio.Streams | |||
_task = Run(); | |||
} | |||
protected override void Dispose(bool disposing) | |||
{ | |||
if (disposing) | |||
_cancelTokenSource.Cancel(); | |||
base.Dispose(disposing); | |||
} | |||
private Task Run() | |||
{ | |||
@@ -71,6 +77,8 @@ namespace Discord.Audio.Streams | |||
await Task.Delay(1).ConfigureAwait(false); | |||
long nextTick = Environment.TickCount; | |||
ushort seq = 0; | |||
uint timestamp = 0; | |||
while (!_cancelToken.IsCancellationRequested) | |||
{ | |||
long tick = Environment.TickCount; | |||
@@ -80,14 +88,20 @@ namespace Discord.Audio.Streams | |||
Frame frame; | |||
if (_queuedFrames.TryDequeue(out frame)) | |||
{ | |||
await _client.ApiClient.SendSetSpeaking(true).ConfigureAwait(false); | |||
if (!_isSpeaking) | |||
{ | |||
await _client.ApiClient.SendSetSpeaking(true).ConfigureAwait(false); | |||
_isSpeaking = true; | |||
} | |||
_next.WriteHeader(seq++, timestamp, false); | |||
await _next.WriteAsync(frame.Buffer, 0, frame.Bytes).ConfigureAwait(false); | |||
_bufferPool.Enqueue(frame.Buffer); | |||
_queueLock.Release(); | |||
nextTick += _ticksPerFrame; | |||
timestamp += OpusEncoder.FrameSamplesPerChannel; | |||
_silenceFrames = 0; | |||
#if DEBUG | |||
var _ = _logger.DebugAsync($"Sent {frame.Bytes} bytes ({_queuedFrames.Count} frames buffered)"); | |||
var _ = _logger?.DebugAsync($"Sent {frame.Bytes} bytes ({_queuedFrames.Count} frames buffered)"); | |||
#endif | |||
} | |||
else | |||
@@ -95,13 +109,20 @@ namespace Discord.Audio.Streams | |||
while ((nextTick - tick) <= 0) | |||
{ | |||
if (_silenceFrames++ < MaxSilenceFrames) | |||
{ | |||
_next.WriteHeader(seq++, timestamp, false); | |||
await _next.WriteAsync(_silenceFrame, 0, _silenceFrame.Length).ConfigureAwait(false); | |||
else | |||
} | |||
else if (_isSpeaking) | |||
{ | |||
await _client.ApiClient.SendSetSpeaking(false).ConfigureAwait(false); | |||
_isSpeaking = false; | |||
} | |||
nextTick += _ticksPerFrame; | |||
timestamp += OpusEncoder.FrameSamplesPerChannel; | |||
} | |||
#if DEBUG | |||
var _ = _logger.DebugAsync($"Buffer underrun"); | |||
var _ = _logger?.DebugAsync($"Buffer underrun"); | |||
#endif | |||
} | |||
} | |||
@@ -125,19 +146,16 @@ namespace Discord.Audio.Streams | |||
if (!_bufferPool.TryDequeue(out buffer)) | |||
{ | |||
#if DEBUG | |||
var _ = _logger.DebugAsync($"Buffer overflow"); //Should never happen because of the queueLock | |||
var _ = _logger?.DebugAsync($"Buffer overflow"); //Should never happen because of the queueLock | |||
#endif | |||
return; | |||
} | |||
Buffer.BlockCopy(data, offset, buffer, 0, count); | |||
_queuedFrames.Enqueue(new Frame(buffer, count)); | |||
#if DEBUG | |||
//var _ await _logger.DebugAsync($"Queued {count} bytes ({_queuedFrames.Count} frames buffered)"); | |||
#endif | |||
if (!_isPreloaded && _queuedFrames.Count == _queueLength) | |||
{ | |||
#if DEBUG | |||
var _ = _logger.DebugAsync($"Preloaded"); | |||
var _ = _logger?.DebugAsync($"Preloaded"); | |||
#endif | |||
_isPreloaded = true; | |||
} | |||
@@ -161,10 +179,5 @@ namespace Discord.Audio.Streams | |||
while (_queuedFrames.TryDequeue(out ignored)); | |||
return Task.Delay(0); | |||
} | |||
protected override void Dispose(bool disposing) | |||
{ | |||
if (disposing) | |||
_cancelTokenSource.Cancel(); | |||
} | |||
} | |||
} |
@@ -14,6 +14,7 @@ namespace Discord.Audio.Streams | |||
private SemaphoreSlim _signal; | |||
private ushort _nextSeq; | |||
private uint _nextTimestamp; | |||
private bool _nextMissed; | |||
private bool _hasHeader; | |||
private bool _isDisposed; | |||
@@ -60,13 +61,14 @@ namespace Discord.Audio.Streams | |||
return frame; | |||
} | |||
public void WriteHeader(ushort seq, uint timestamp) | |||
public override void WriteHeader(ushort seq, uint timestamp, bool missed) | |||
{ | |||
if (_hasHeader) | |||
throw new InvalidOperationException("Header received with no payload"); | |||
_hasHeader = true; | |||
_nextSeq = seq; | |||
_nextTimestamp = timestamp; | |||
_nextMissed = missed; | |||
} | |||
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancelToken) | |||
{ | |||
@@ -79,16 +81,17 @@ namespace Discord.Audio.Streams | |||
} | |||
if (!_hasHeader) | |||
throw new InvalidOperationException("Received payload without an RTP header"); | |||
_hasHeader = false; | |||
byte[] payload = new byte[count]; | |||
Buffer.BlockCopy(buffer, offset, payload, 0, count); | |||
_frames.Enqueue(new RTPFrame( | |||
sequence: _nextSeq, | |||
timestamp: _nextTimestamp, | |||
missed: _nextMissed, | |||
payload: payload | |||
)); | |||
_signal.Release(); | |||
_hasHeader = false; | |||
return Task.Delay(0); | |||
} | |||
@@ -1,4 +1,5 @@ | |||
using System.Threading; | |||
using System; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
namespace Discord.Audio.Streams | |||
@@ -11,6 +12,8 @@ namespace Discord.Audio.Streams | |||
private readonly AudioStream _next; | |||
private readonly OpusDecoder _decoder; | |||
private readonly byte[] _buffer; | |||
private bool _nextMissed; | |||
private bool _hasHeader; | |||
public OpusDecodeStream(AudioStream next) | |||
{ | |||
@@ -19,10 +22,35 @@ namespace Discord.Audio.Streams | |||
_decoder = new OpusDecoder(); | |||
} | |||
public override void WriteHeader(ushort seq, uint timestamp, bool missed) | |||
{ | |||
if (_hasHeader) | |||
throw new InvalidOperationException("Header received with no payload"); | |||
_nextMissed = missed; | |||
_hasHeader = true; | |||
_next.WriteHeader(seq, timestamp, missed); | |||
} | |||
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) | |||
{ | |||
count = _decoder.DecodeFrame(buffer, offset, count, _buffer, 0); | |||
await _next.WriteAsync(_buffer, 0, count, cancellationToken).ConfigureAwait(false); | |||
if (!_hasHeader) | |||
throw new InvalidOperationException("Received payload without an RTP header"); | |||
_hasHeader = false; | |||
if (!_nextMissed) | |||
{ | |||
count = _decoder.DecodeFrame(buffer, offset, count, _buffer, 0, false); | |||
await _next.WriteAsync(_buffer, 0, count, cancellationToken).ConfigureAwait(false); | |||
} | |||
else if (count > 0) | |||
{ | |||
count = _decoder.DecodeFrame(buffer, offset, count, _buffer, 0, true); | |||
await _next.WriteAsync(_buffer, 0, count, cancellationToken).ConfigureAwait(false); | |||
} | |||
else | |||
{ | |||
count = _decoder.DecodeFrame(null, 0, 0, _buffer, 0, true); | |||
await _next.WriteAsync(_buffer, 0, count, cancellationToken).ConfigureAwait(false); | |||
} | |||
} | |||
public override async Task FlushAsync(CancellationToken cancelToken) | |||
@@ -1,5 +1,4 @@ | |||
using System.IO; | |||
using System.Threading; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
namespace Discord.Audio.Streams | |||
@@ -7,7 +6,6 @@ namespace Discord.Audio.Streams | |||
///<summary> Reads the payload from an RTP frame </summary> | |||
public class RTPReadStream : AudioOutStream | |||
{ | |||
private readonly InputStream _queue; | |||
private readonly AudioStream _next; | |||
private readonly byte[] _buffer, _nonce; | |||
@@ -15,11 +13,8 @@ namespace Discord.Audio.Streams | |||
public override bool CanSeek => false; | |||
public override bool CanWrite => true; | |||
public RTPReadStream(InputStream queue, int bufferSize = 4000) | |||
: this(queue, null, bufferSize) { } | |||
public RTPReadStream(InputStream queue, AudioStream next, int bufferSize = 4000) | |||
public RTPReadStream(AudioStream next, int bufferSize = 4000) | |||
{ | |||
_queue = queue; | |||
_next = next; | |||
_buffer = new byte[bufferSize]; | |||
_nonce = new byte[24]; | |||
@@ -36,11 +31,11 @@ namespace Discord.Audio.Streams | |||
uint timestamp = (uint)((buffer[offset + 4] << 24) | | |||
(buffer[offset + 5] << 16) | | |||
(buffer[offset + 6] << 16) | | |||
(buffer[offset + 6] << 8) | | |||
(buffer[offset + 7] << 0)); | |||
_queue.WriteHeader(seq, timestamp); | |||
await (_next ?? _queue as Stream).WriteAsync(buffer, offset + headerSize, count - headerSize, cancelToken).ConfigureAwait(false); | |||
_next.WriteHeader(seq, timestamp, false); | |||
await _next.WriteAsync(buffer, offset + headerSize, count - headerSize, cancelToken).ConfigureAwait(false); | |||
} | |||
public static bool TryReadSsrc(byte[] buffer, int offset, out uint ssrc) | |||
@@ -58,7 +53,7 @@ namespace Discord.Audio.Streams | |||
ssrc = (uint)((buffer[offset + 8] << 24) | | |||
(buffer[offset + 9] << 16) | | |||
(buffer[offset + 10] << 16) | | |||
(buffer[offset + 10] << 8) | | |||
(buffer[offset + 11] << 0)); | |||
return true; | |||
} | |||
@@ -10,7 +10,10 @@ namespace Discord.Audio.Streams | |||
private readonly AudioStream _next; | |||
private readonly byte[] _header; | |||
protected readonly byte[] _buffer; | |||
private uint _ssrc, _timestamp = 0; | |||
private uint _ssrc; | |||
private ushort _nextSeq; | |||
private uint _nextTimestamp; | |||
private bool _hasHeader; | |||
public RTPWriteStream(AudioStream next, uint ssrc, int bufferSize = 4000) | |||
{ | |||
@@ -26,20 +29,30 @@ namespace Discord.Audio.Streams | |||
_header[11] = (byte)(_ssrc >> 0); | |||
} | |||
public override void WriteHeader(ushort seq, uint timestamp, bool missed) | |||
{ | |||
if (_hasHeader) | |||
throw new InvalidOperationException("Header received with no payload"); | |||
_hasHeader = true; | |||
_nextSeq = seq; | |||
_nextTimestamp = timestamp; | |||
} | |||
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) | |||
{ | |||
cancellationToken.ThrowIfCancellationRequested(); | |||
if (!_hasHeader) | |||
throw new InvalidOperationException("Received payload without an RTP header"); | |||
_hasHeader = false; | |||
unchecked | |||
{ | |||
if (_header[3]++ == byte.MaxValue) | |||
_header[2]++; | |||
_timestamp += (uint)OpusEncoder.FrameSamples; | |||
_header[4] = (byte)(_timestamp >> 24); | |||
_header[5] = (byte)(_timestamp >> 16); | |||
_header[6] = (byte)(_timestamp >> 8); | |||
_header[7] = (byte)(_timestamp >> 0); | |||
_header[2] = (byte)(_nextSeq >> 8); | |||
_header[3] = (byte)(_nextSeq >> 0); | |||
_header[4] = (byte)(_nextTimestamp >> 24); | |||
_header[5] = (byte)(_nextTimestamp >> 16); | |||
_header[6] = (byte)(_nextTimestamp >> 8); | |||
_header[7] = (byte)(_nextTimestamp >> 0); | |||
} | |||
Buffer.BlockCopy(_header, 0, _buffer, 0, 12); //Copy RTP header from to the buffer | |||
Buffer.BlockCopy(buffer, offset, _buffer, 12, count); | |||
@@ -48,6 +48,8 @@ namespace Discord.Audio | |||
internal IWebSocketClient WebSocketClient { get; } | |||
public ConnectionState ConnectionState { get; private set; } | |||
public ushort UdpPort => _udp.Port; | |||
internal DiscordVoiceAPIClient(ulong guildId, WebSocketProvider webSocketProvider, UdpSocketProvider udpSocketProvider, JsonSerializer serializer = null) | |||
{ | |||
GuildId = guildId; | |||
@@ -18,6 +18,8 @@ namespace Discord.Net.Udp | |||
private CancellationToken _cancelToken, _parentToken; | |||
private Task _task; | |||
private bool _isDisposed; | |||
public ushort Port => (ushort)((_udp?.Client.LocalEndPoint as IPEndPoint)?.Port ?? 0); | |||
public DefaultUdpSocket() | |||
{ | |||