|
@@ -8,7 +8,10 @@ namespace Discord.Audio.Streams |
|
|
///<summary> Reads the payload from an RTP frame </summary> |
|
|
///<summary> Reads the payload from an RTP frame </summary> |
|
|
public class InputStream : AudioInStream |
|
|
public class InputStream : AudioInStream |
|
|
{ |
|
|
{ |
|
|
|
|
|
private const int MaxFrames = 100; |
|
|
|
|
|
|
|
|
private ConcurrentQueue<RTPFrame> _frames; |
|
|
private ConcurrentQueue<RTPFrame> _frames; |
|
|
|
|
|
private SemaphoreSlim _signal; |
|
|
private ushort _nextSeq; |
|
|
private ushort _nextSeq; |
|
|
private uint _nextTimestamp; |
|
|
private uint _nextTimestamp; |
|
|
private bool _hasHeader; |
|
|
private bool _hasHeader; |
|
@@ -21,28 +24,27 @@ namespace Discord.Audio.Streams |
|
|
public InputStream() |
|
|
public InputStream() |
|
|
{ |
|
|
{ |
|
|
_frames = new ConcurrentQueue<RTPFrame>(); |
|
|
_frames = new ConcurrentQueue<RTPFrame>(); |
|
|
|
|
|
_signal = new SemaphoreSlim(0, MaxFrames); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
public override Task<RTPFrame?> ReadFrameAsync(CancellationToken cancelToken) |
|
|
|
|
|
|
|
|
public override async Task<RTPFrame> ReadFrameAsync(CancellationToken cancelToken) |
|
|
{ |
|
|
{ |
|
|
cancelToken.ThrowIfCancellationRequested(); |
|
|
cancelToken.ThrowIfCancellationRequested(); |
|
|
|
|
|
|
|
|
if (_frames.TryDequeue(out var frame)) |
|
|
|
|
|
return Task.FromResult<RTPFrame?>(frame); |
|
|
|
|
|
return Task.FromResult<RTPFrame?>(null); |
|
|
|
|
|
|
|
|
RTPFrame frame; |
|
|
|
|
|
await _signal.WaitAsync(cancelToken).ConfigureAwait(false); |
|
|
|
|
|
_frames.TryDequeue(out frame); |
|
|
|
|
|
return frame; |
|
|
} |
|
|
} |
|
|
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancelToken) |
|
|
|
|
|
|
|
|
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancelToken) |
|
|
{ |
|
|
{ |
|
|
cancelToken.ThrowIfCancellationRequested(); |
|
|
cancelToken.ThrowIfCancellationRequested(); |
|
|
|
|
|
|
|
|
if (_frames.TryDequeue(out var frame)) |
|
|
|
|
|
{ |
|
|
|
|
|
if (count < frame.Payload.Length) |
|
|
|
|
|
throw new InvalidOperationException("Buffer is too small."); |
|
|
|
|
|
Buffer.BlockCopy(frame.Payload, 0, buffer, offset, frame.Payload.Length); |
|
|
|
|
|
return Task.FromResult(frame.Payload.Length); |
|
|
|
|
|
} |
|
|
|
|
|
return Task.FromResult(0); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
var frame = await ReadFrameAsync(cancelToken).ConfigureAwait(false); |
|
|
|
|
|
if (count < frame.Payload.Length) |
|
|
|
|
|
throw new InvalidOperationException("Buffer is too small."); |
|
|
|
|
|
Buffer.BlockCopy(frame.Payload, 0, buffer, offset, frame.Payload.Length); |
|
|
|
|
|
return frame.Payload.Length; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
public void WriteHeader(ushort seq, uint timestamp) |
|
|
public void WriteHeader(ushort seq, uint timestamp) |
|
@@ -57,7 +59,7 @@ namespace Discord.Audio.Streams |
|
|
{ |
|
|
{ |
|
|
cancelToken.ThrowIfCancellationRequested(); |
|
|
cancelToken.ThrowIfCancellationRequested(); |
|
|
|
|
|
|
|
|
if (_frames.Count > 100) //1-2 seconds |
|
|
|
|
|
|
|
|
if (_frames.Count > MaxFrames) //1-2 seconds |
|
|
{ |
|
|
{ |
|
|
_hasHeader = false; |
|
|
_hasHeader = false; |
|
|
return Task.Delay(0); //Buffer overloaded |
|
|
return Task.Delay(0); //Buffer overloaded |
|
@@ -72,6 +74,7 @@ namespace Discord.Audio.Streams |
|
|
timestamp: _nextTimestamp, |
|
|
timestamp: _nextTimestamp, |
|
|
payload: payload |
|
|
payload: payload |
|
|
)); |
|
|
)); |
|
|
|
|
|
_signal.Release(); |
|
|
_hasHeader = false; |
|
|
_hasHeader = false; |
|
|
return Task.Delay(0); |
|
|
return Task.Delay(0); |
|
|
} |
|
|
} |
|
|