@@ -17,7 +17,35 @@ namespace Discord.Audio | |||
Task DisconnectAsync(); | |||
/// <summary> | |||
/// Creates a new outgoing stream accepting Opus-encoded data. | |||
/// </summary> | |||
/// <param name="samplesPerFrame">Samples per frame. Must be 120, 240, 480, 960, 1920 or 2880, representing 2.5, 5, 10, 20, 40 or 60 milliseconds respectively.</param> | |||
/// <param name="bufferSize">The size of the internal buffer used for encryption.</param> | |||
/// <returns></returns> | |||
Stream CreateOpusStream(int samplesPerFrame, int bufferSize = 4000); | |||
/// <summary> | |||
/// Creates a new outgoing stream accepting Opus-encoded data. This is a direct stream with no internal timer. | |||
/// </summary> | |||
/// <param name="samplesPerFrame">Samples per frame. Must be 120, 240, 480, 960, 1920 or 2880, representing 2.5, 5, 10, 20, 40 or 60 milliseconds respectively.</param> | |||
/// <param name="bufferSize">The size of the internal buffer used for encryption.</param> | |||
/// <returns></returns> | |||
Stream CreateDirectOpusStream(int samplesPerFrame, int bufferSize = 4000); | |||
/// <summary> | |||
/// Creates a new outgoing stream accepting PCM (raw) data. | |||
/// </summary> | |||
/// <param name="samplesPerFrame">Samples per frame. Must be 120, 240, 480, 960, 1920 or 2880, representing 2.5, 5, 10, 20, 40 or 60 milliseconds respectively.</param> | |||
/// <param name="bitrate"></param> | |||
/// <param name="bufferSize">The size of the internal buffer used for encoding and encryption.</param> | |||
/// <returns></returns> | |||
Stream CreatePCMStream(int samplesPerFrame, int? bitrate = null, int bufferSize = 4000); | |||
/// <summary> | |||
/// Creates a new direct outgoing stream accepting PCM (raw) data. This is a direct stream with no internal timer. | |||
/// </summary> | |||
/// <param name="samplesPerFrame">Samples per frame. Must be 120, 240, 480, 960, 1920 or 2880, representing 2.5, 5, 10, 20, 40 or 60 milliseconds respectively.</param> | |||
/// <param name="bitrate"></param> | |||
/// <param name="bufferSize">The size of the internal buffer used for encoding and encryption.</param> | |||
/// <returns></returns> | |||
Stream CreateDirectPCMStream(int samplesPerFrame, int? bitrate = null, int bufferSize = 4000); | |||
} | |||
} |
@@ -39,7 +39,7 @@ namespace Discord.Audio | |||
private readonly JsonSerializer _serializer; | |||
private TaskCompletionSource<bool> _connectTask; | |||
private CancellationTokenSource _cancelToken; | |||
private CancellationTokenSource _cancelTokenSource; | |||
private Task _heartbeatTask; | |||
private long _heartbeatTime; | |||
private string _url; | |||
@@ -110,7 +110,7 @@ namespace Discord.Audio | |||
{ | |||
_url = url; | |||
_connectTask = new TaskCompletionSource<bool>(); | |||
_cancelToken = new CancellationTokenSource(); | |||
_cancelTokenSource = new CancellationTokenSource(); | |||
await ApiClient.ConnectAsync("wss://" + url).ConfigureAwait(false); | |||
await ApiClient.SendIdentityAsync(userId, sessionId, token).ConfigureAwait(false); | |||
@@ -152,7 +152,7 @@ namespace Discord.Audio | |||
await _audioLogger.InfoAsync("Disconnecting").ConfigureAwait(false); | |||
//Signal tasks to complete | |||
try { _cancelToken.Cancel(); } catch { } | |||
try { _cancelTokenSource.Cancel(); } catch { } | |||
//Disconnect from server | |||
await ApiClient.DisconnectAsync().ConfigureAwait(false); | |||
@@ -169,19 +169,35 @@ namespace Discord.Audio | |||
await _disconnectedEvent.InvokeAsync(ex).ConfigureAwait(false); | |||
} | |||
public void Send(byte[] data, int count) | |||
public Stream CreateOpusStream(int samplesPerFrame, int bufferSize = 4000) | |||
{ | |||
//TODO: Queue these? | |||
ApiClient.SendAsync(data, count).ConfigureAwait(false); | |||
CheckSamplesPerFrame(samplesPerFrame); | |||
var target = new BufferedAudioTarget(ApiClient, samplesPerFrame, _cancelTokenSource.Token); | |||
return new RTPWriteStream(target, _secretKey, samplesPerFrame, _ssrc, bufferSize = 4000); | |||
} | |||
public Stream CreateOpusStream(int samplesPerFrame, int bufferSize = 4000) | |||
public Stream CreateDirectOpusStream(int samplesPerFrame, int bufferSize = 4000) | |||
{ | |||
return new RTPWriteStream(this, _secretKey, samplesPerFrame, _ssrc, bufferSize = 4000); | |||
CheckSamplesPerFrame(samplesPerFrame); | |||
var target = new DirectAudioTarget(ApiClient); | |||
return new RTPWriteStream(target, _secretKey, samplesPerFrame, _ssrc, bufferSize = 4000); | |||
} | |||
public Stream CreatePCMStream(int samplesPerFrame, int? bitrate = null, int bufferSize = 4000) | |||
{ | |||
return new OpusEncodeStream(this, _secretKey, samplesPerFrame, _ssrc, bitrate, bufferSize); | |||
CheckSamplesPerFrame(samplesPerFrame); | |||
var target = new BufferedAudioTarget(ApiClient, samplesPerFrame, _cancelTokenSource.Token); | |||
return new OpusEncodeStream(target, _secretKey, samplesPerFrame, _ssrc, bitrate, bufferSize); | |||
} | |||
public Stream CreateDirectPCMStream(int samplesPerFrame, int? bitrate = null, int bufferSize = 4000) | |||
{ | |||
CheckSamplesPerFrame(samplesPerFrame); | |||
var target = new DirectAudioTarget(ApiClient); | |||
return new OpusEncodeStream(target, _secretKey, samplesPerFrame, _ssrc, bitrate, bufferSize); | |||
} | |||
private void CheckSamplesPerFrame(int samplesPerFrame) | |||
{ | |||
if (samplesPerFrame != 120 && samplesPerFrame != 240 && samplesPerFrame != 480 && | |||
samplesPerFrame != 960 && samplesPerFrame != 1920 && samplesPerFrame != 2880) | |||
throw new ArgumentException("Value must be 120, 240, 480, 960, 1920 or 2880", nameof(samplesPerFrame)); | |||
} | |||
private async Task ProcessMessageAsync(VoiceOpCode opCode, object payload) | |||
@@ -201,7 +217,7 @@ namespace Discord.Audio | |||
throw new InvalidOperationException($"Discord does not support {DiscordVoiceAPIClient.Mode}"); | |||
_heartbeatTime = 0; | |||
_heartbeatTask = RunHeartbeatAsync(data.HeartbeatInterval, _cancelToken.Token); | |||
_heartbeatTask = RunHeartbeatAsync(data.HeartbeatInterval, _cancelTokenSource.Token); | |||
ApiClient.SetUdpEndpoint(_url, data.Port); | |||
await ApiClient.SendDiscoveryAsync(_ssrc).ConfigureAwait(false); | |||
@@ -52,7 +52,7 @@ namespace Discord.Audio | |||
throw new Exception($"Opus Error: {(OpusError)result}"); | |||
} | |||
/// <summary> Gets or sets whether Forward Error Correction is enabled. </summary> | |||
/// <summary> Gets or sets the encoder's bitrate. </summary> | |||
public void SetBitrate(int value) | |||
{ | |||
if (value < 1 || value > DiscordVoiceAPIClient.MaxBitrate) | |||
@@ -7,8 +7,8 @@ | |||
private readonly OpusEncoder _encoder; | |||
internal OpusEncodeStream(AudioClient audioClient, byte[] secretKey, int samplesPerFrame, uint ssrc, int? bitrate = null, int bufferSize = 4000) | |||
: base(audioClient, secretKey, samplesPerFrame, ssrc, bufferSize) | |||
internal OpusEncodeStream(IAudioTarget target, byte[] secretKey, int samplesPerFrame, uint ssrc, int? bitrate = null, int bufferSize = 4000) | |||
: base(target, secretKey, samplesPerFrame, ssrc, bufferSize) | |||
{ | |||
_encoder = new OpusEncoder(SampleRate, Channels); | |||
@@ -5,7 +5,7 @@ namespace Discord.Audio | |||
{ | |||
internal class RTPWriteStream : Stream | |||
{ | |||
private readonly AudioClient _audioClient; | |||
private readonly IAudioTarget _target; | |||
private readonly byte[] _nonce, _secretKey; | |||
private int _samplesPerFrame; | |||
private uint _ssrc, _timestamp = 0; | |||
@@ -16,9 +16,9 @@ namespace Discord.Audio | |||
public override bool CanSeek => false; | |||
public override bool CanWrite => true; | |||
internal RTPWriteStream(AudioClient audioClient, byte[] secretKey, int samplesPerFrame, uint ssrc, int bufferSize = 4000) | |||
internal RTPWriteStream(IAudioTarget target, byte[] secretKey, int samplesPerFrame, uint ssrc, int bufferSize = 4000) | |||
{ | |||
_audioClient = audioClient; | |||
_target = target; | |||
_secretKey = secretKey; | |||
_samplesPerFrame = samplesPerFrame; | |||
_ssrc = ssrc; | |||
@@ -48,7 +48,7 @@ namespace Discord.Audio | |||
count = SecretBox.Encrypt(buffer, offset, count, _buffer, 12, _nonce, _secretKey); | |||
Buffer.BlockCopy(_nonce, 0, _buffer, 0, 12); //Copy the RTP header from nonce to buffer | |||
_audioClient.Send(_buffer, count + 12); | |||
_target.SendAsync(_buffer, count + 12).GetAwaiter().GetResult(); | |||
} | |||
public override void Flush() { } | |||
@@ -0,0 +1,79 @@ | |||
using System; | |||
using System.Collections.Concurrent; | |||
using System.Diagnostics; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
namespace Discord.Audio | |||
{ | |||
internal class BufferedAudioTarget : IAudioTarget, IDisposable | |||
{ | |||
private static readonly byte[] _silencePacket = new byte[] { 0xF8, 0xFF, 0xFE }; | |||
private double _ticksPerFrame; | |||
private Task _task; | |||
private DiscordVoiceAPIClient _client; | |||
private CancellationTokenSource _cancelTokenSource; | |||
private ConcurrentQueue<byte[]> _queue; | |||
internal BufferedAudioTarget(DiscordVoiceAPIClient client, int samplesPerFrame, CancellationToken cancelToken) | |||
{ | |||
_client = client; | |||
double milliseconds = samplesPerFrame / 48.0; | |||
double ticksPerFrame = Stopwatch.Frequency / 1000.0 * milliseconds; | |||
_cancelTokenSource = new CancellationTokenSource(); | |||
cancelToken = CancellationTokenSource.CreateLinkedTokenSource(_cancelTokenSource.Token, cancelToken).Token; | |||
_queue = new ConcurrentQueue<byte[]>(); //TODO: We need a better queue | |||
_task = Run(ticksPerFrame, cancelToken); | |||
} | |||
private Task Run(double ticksPerFrame, CancellationToken cancelToken) | |||
{ | |||
return Task.Run(async () => | |||
{ | |||
var stopwatch = Stopwatch.StartNew(); | |||
long lastTick = stopwatch.ElapsedTicks; | |||
double ticksPerMilli = Stopwatch.Frequency / 1000.0; | |||
while (!cancelToken.IsCancellationRequested) | |||
{ | |||
long thisTick = stopwatch.ElapsedTicks; | |||
double remaining = ticksPerFrame - (thisTick - lastTick); | |||
if (remaining <= 0) | |||
{ | |||
byte[] buffer; | |||
if (_queue.TryDequeue(out buffer)) | |||
await _client.SendAsync(buffer, buffer.Length).ConfigureAwait(false); | |||
else | |||
await _client.SendAsync(_silencePacket, _silencePacket.Length).ConfigureAwait(false); | |||
lastTick = thisTick; | |||
} | |||
else if (remaining > 1) | |||
{ | |||
int millis = (int)Math.Floor(remaining / ticksPerMilli); | |||
await Task.Delay(millis).ConfigureAwait(false); | |||
} | |||
} | |||
}); | |||
} | |||
public Task SendAsync(byte[] buffer, int count) | |||
{ | |||
byte[] newBuffer = new byte[count]; | |||
Buffer.BlockCopy(buffer, 0, newBuffer, 0, count); | |||
_queue.Enqueue(newBuffer); | |||
return Task.Delay(0); | |||
} | |||
protected void Dispose(bool disposing) | |||
{ | |||
if (disposing) | |||
_cancelTokenSource.Cancel(); | |||
} | |||
public void Dispose() | |||
{ | |||
Dispose(true); | |||
} | |||
} | |||
} |
@@ -0,0 +1,16 @@ | |||
using System.Threading.Tasks; | |||
namespace Discord.Audio | |||
{ | |||
internal class DirectAudioTarget : IAudioTarget | |||
{ | |||
private readonly DiscordVoiceAPIClient _client; | |||
public DirectAudioTarget(DiscordVoiceAPIClient client) | |||
{ | |||
_client = client; | |||
} | |||
public Task SendAsync(byte[] buffer, int count) | |||
=> _client.SendAsync(buffer, count); | |||
} | |||
} |
@@ -0,0 +1,9 @@ | |||
using System.Threading.Tasks; | |||
namespace Discord.Audio | |||
{ | |||
internal interface IAudioTarget | |||
{ | |||
Task SendAsync(byte[] buffer, int count); | |||
} | |||
} |
@@ -1,4 +1,4 @@ | |||
<Project ToolsVersion="15.0" Sdk="Microsoft.NET.Sdk" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> | |||
<Project ToolsVersion="15.0" Sdk="Microsoft.NET.Sdk" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> | |||
<PropertyGroup> | |||
<Description>A core Discord.Net library containing the WebSocket client and models.</Description> | |||
<VersionPrefix>1.0.0-beta2</VersionPrefix> | |||
@@ -1563,10 +1563,10 @@ namespace Discord.WebSocket | |||
{ | |||
before = guild.GetVoiceState(data.UserId)?.Clone() ?? SocketVoiceState.Default; | |||
after = guild.AddOrUpdateVoiceState(State, data); | |||
if (data.UserId == CurrentUser.Id) | |||
/*if (data.UserId == CurrentUser.Id) | |||
{ | |||
var _ = guild.FinishJoinAudioChannel().ConfigureAwait(false); | |||
} | |||
}*/ | |||
} | |||
else | |||
{ | |||
@@ -1,6 +1,10 @@ | |||
namespace Discord.WebSocket | |||
using Discord.Audio; | |||
using System.Threading.Tasks; | |||
namespace Discord.WebSocket | |||
{ | |||
public interface ISocketAudioChannel : IAudioChannel | |||
{ | |||
Task<IAudioClient> ConnectAsync(); | |||
} | |||
} |
@@ -1,4 +1,5 @@ | |||
using Discord.Rest; | |||
using Discord.Audio; | |||
using Discord.Rest; | |||
using System; | |||
using System.Collections.Concurrent; | |||
using System.Collections.Generic; | |||
@@ -64,6 +65,11 @@ namespace Discord.WebSocket | |||
public Task LeaveAsync(RequestOptions options = null) | |||
=> ChannelHelper.DeleteAsync(this, Discord, options); | |||
public Task<IAudioClient> ConnectAsync() | |||
{ | |||
throw new NotSupportedException("Voice is not yet supported for group channels."); | |||
} | |||
//Messages | |||
public SocketMessage GetCachedMessage(ulong id) | |||
=> _messages?.Get(id); | |||
@@ -41,6 +41,17 @@ namespace Discord.WebSocket | |||
public Task ModifyAsync(Action<VoiceChannelProperties> func, RequestOptions options = null) | |||
=> ChannelHelper.ModifyAsync(this, Discord, func, options); | |||
public async Task<IAudioClient> ConnectAsync() | |||
{ | |||
var audioMode = Discord.AudioMode; | |||
if (audioMode == AudioMode.Disabled) | |||
throw new InvalidOperationException($"Audio is not enabled on this client, {nameof(DiscordSocketConfig.AudioMode)} in {nameof(DiscordSocketConfig)} must be set."); | |||
return await Guild.ConnectAudioAsync(Id, | |||
(audioMode & AudioMode.Incoming) == 0, | |||
(audioMode & AudioMode.Outgoing) == 0).ConfigureAwait(false); | |||
} | |||
public override SocketGuildUser GetUser(ulong id) | |||
{ | |||
var user = Guild.GetUser(id); | |||
@@ -52,9 +63,6 @@ namespace Discord.WebSocket | |||
private string DebuggerDisplay => $"{Name} ({Id}, Voice)"; | |||
internal new SocketVoiceChannel Clone() => MemberwiseClone() as SocketVoiceChannel; | |||
//IVoiceChannel | |||
Task<IAudioClient> IVoiceChannel.ConnectAsync() { throw new NotSupportedException(); } | |||
//IGuildChannel | |||
Task<IGuildUser> IGuildChannel.GetUserAsync(ulong id, CacheMode mode, RequestOptions options) | |||
=> Task.FromResult<IGuildUser>(GetUser(id)); | |||
@@ -421,34 +421,64 @@ namespace Discord.WebSocket | |||
} | |||
//Audio | |||
public async Task DisconnectAudioAsync(AudioClient client = null) | |||
internal async Task<IAudioClient> ConnectAudioAsync(ulong channelId, bool selfDeaf, bool selfMute) | |||
{ | |||
selfDeaf = false; | |||
selfMute = false; | |||
TaskCompletionSource<AudioClient> promise; | |||
await _audioLock.WaitAsync().ConfigureAwait(false); | |||
try | |||
{ | |||
await DisconnectAudioInternalAsync(client).ConfigureAwait(false); | |||
await DisconnectAudioInternalAsync().ConfigureAwait(false); | |||
promise = new TaskCompletionSource<AudioClient>(); | |||
_audioConnectPromise = promise; | |||
await Discord.ApiClient.SendVoiceStateUpdateAsync(Id, channelId, selfDeaf, selfMute).ConfigureAwait(false); | |||
} | |||
catch (Exception) | |||
{ | |||
await DisconnectAudioInternalAsync().ConfigureAwait(false); | |||
throw; | |||
} | |||
finally | |||
{ | |||
_audioLock.Release(); | |||
} | |||
try | |||
{ | |||
var timeoutTask = Task.Delay(15000); | |||
if (await Task.WhenAny(promise.Task, timeoutTask) == timeoutTask) | |||
throw new TimeoutException(); | |||
return await promise.Task.ConfigureAwait(false); | |||
} | |||
catch (Exception) | |||
{ | |||
await DisconnectAudioAsync().ConfigureAwait(false); | |||
throw; | |||
} | |||
} | |||
private async Task DisconnectAudioInternalAsync(AudioClient client = null) | |||
internal async Task DisconnectAudioAsync() | |||
{ | |||
var oldClient = AudioClient; | |||
if (oldClient != null) | |||
await _audioLock.WaitAsync().ConfigureAwait(false); | |||
try | |||
{ | |||
if (client == null || oldClient == client) | |||
{ | |||
_audioConnectPromise?.TrySetCanceledAsync(); //Cancel any previous audio connection | |||
_audioConnectPromise = null; | |||
} | |||
if (oldClient == client) | |||
{ | |||
AudioClient = null; | |||
await oldClient.DisconnectAsync().ConfigureAwait(false); | |||
} | |||
await DisconnectAudioInternalAsync().ConfigureAwait(false); | |||
} | |||
finally | |||
{ | |||
_audioLock.Release(); | |||
} | |||
} | |||
private async Task DisconnectAudioInternalAsync() | |||
{ | |||
_audioConnectPromise?.TrySetCanceledAsync(); //Cancel any previous audio connection | |||
_audioConnectPromise = null; | |||
if (AudioClient != null) | |||
await AudioClient.DisconnectAsync().ConfigureAwait(false); | |||
AudioClient = null; | |||
} | |||
internal async Task FinishConnectAudio(int id, string url, string token) | |||
{ | |||
@@ -462,6 +492,14 @@ namespace Discord.WebSocket | |||
var audioClient = new AudioClient(this, id); | |||
audioClient.Disconnected += async ex => | |||
{ | |||
//If the initial connection hasn't been made yet, reconnecting will lead to deadlocks | |||
if (!_audioConnectPromise.Task.IsCompleted) | |||
{ | |||
try { audioClient.Dispose(); } catch { } | |||
AudioClient = null; | |||
return; | |||
} | |||
await _audioLock.WaitAsync().ConfigureAwait(false); | |||
try | |||
{ | |||
@@ -476,14 +514,14 @@ namespace Discord.WebSocket | |||
{ | |||
var voiceChannelId = voiceState2.Value.VoiceChannel?.Id; | |||
if (voiceChannelId != null) | |||
{ | |||
await Discord.ApiClient.SendVoiceStateUpdateAsync(Id, voiceChannelId, voiceState2.Value.IsSelfDeafened, voiceState2.Value.IsSelfMuted); | |||
return; | |||
} | |||
} | |||
} | |||
else | |||
{ | |||
try { AudioClient.Dispose(); } catch { } | |||
AudioClient = null; | |||
} | |||
try { audioClient.Dispose(); } catch { } | |||
AudioClient = null; | |||
} | |||
} | |||
finally | |||
@@ -498,25 +536,12 @@ namespace Discord.WebSocket | |||
} | |||
catch (OperationCanceledException) | |||
{ | |||
await DisconnectAudioAsync().ConfigureAwait(false); | |||
await DisconnectAudioInternalAsync().ConfigureAwait(false); | |||
} | |||
catch (Exception e) | |||
{ | |||
await _audioConnectPromise.SetExceptionAsync(e).ConfigureAwait(false); | |||
await DisconnectAudioAsync().ConfigureAwait(false); | |||
} | |||
finally | |||
{ | |||
_audioLock.Release(); | |||
} | |||
} | |||
internal async Task FinishJoinAudioChannel() | |||
{ | |||
await _audioLock.WaitAsync().ConfigureAwait(false); | |||
try | |||
{ | |||
if (AudioClient != null) | |||
await _audioConnectPromise.TrySetResultAsync(AudioClient).ConfigureAwait(false); | |||
await DisconnectAudioInternalAsync().ConfigureAwait(false); | |||
} | |||
finally | |||
{ | |||
@@ -22,6 +22,7 @@ namespace Discord.Net.Udp | |||
public DefaultUdpSocket() | |||
{ | |||
_lock = new SemaphoreSlim(1, 1); | |||
_cancelTokenSource = new CancellationTokenSource(); | |||
} | |||
private void Dispose(bool disposing) | |||
{ | |||
@@ -57,7 +58,7 @@ namespace Discord.Net.Udp | |||
_cancelTokenSource = new CancellationTokenSource(); | |||
_cancelToken = CancellationTokenSource.CreateLinkedTokenSource(_parentToken, _cancelTokenSource.Token).Token; | |||
_udp = new UdpClient(); | |||
_udp = new UdpClient(0); | |||
_task = RunAsync(_cancelToken); | |||
} | |||