@@ -21,91 +21,48 @@ using System.Threading.Tasks; | |||
namespace Discord.API | |||
{ | |||
public class DiscordApiClient : IDisposable | |||
public class DiscordRestApiClient : IDisposable | |||
{ | |||
private object _eventLock = new object(); | |||
public event Func<string, string, double, Task> SentRequest { add { _sentRequestEvent.Add(value); } remove { _sentRequestEvent.Remove(value); } } | |||
private readonly AsyncEvent<Func<string, string, double, Task>> _sentRequestEvent = new AsyncEvent<Func<string, string, double, Task>>(); | |||
public event Func<GatewayOpCode, Task> SentGatewayMessage { add { _sentGatewayMessageEvent.Add(value); } remove { _sentGatewayMessageEvent.Remove(value); } } | |||
private readonly AsyncEvent<Func<GatewayOpCode, Task>> _sentGatewayMessageEvent = new AsyncEvent<Func<GatewayOpCode, Task>>(); | |||
public event Func<GatewayOpCode, int?, string, object, Task> ReceivedGatewayEvent { add { _receivedGatewayEvent.Add(value); } remove { _receivedGatewayEvent.Remove(value); } } | |||
private readonly AsyncEvent<Func<GatewayOpCode, int?, string, object, Task>> _receivedGatewayEvent = new AsyncEvent<Func<GatewayOpCode, int?, string, object, Task>>(); | |||
public event Func<Exception, Task> Disconnected { add { _disconnectedEvent.Add(value); } remove { _disconnectedEvent.Remove(value); } } | |||
private readonly AsyncEvent<Func<Exception, Task>> _disconnectedEvent = new AsyncEvent<Func<Exception, Task>>(); | |||
private readonly RequestQueue _requestQueue; | |||
private readonly JsonSerializer _serializer; | |||
private readonly IRestClient _restClient; | |||
private readonly IWebSocketClient _gatewayClient; | |||
private readonly SemaphoreSlim _connectionLock; | |||
private CancellationTokenSource _loginCancelToken, _connectCancelToken; | |||
private string _authToken; | |||
private string _gatewayUrl; | |||
private bool _isDisposed; | |||
protected readonly JsonSerializer _serializer; | |||
protected readonly SemaphoreSlim _stateLock; | |||
private readonly RestClientProvider _restClientProvider; | |||
protected string _authToken; | |||
protected bool _isDisposed; | |||
private CancellationTokenSource _loginCancelToken; | |||
private IRestClient _restClient; | |||
public LoginState LoginState { get; private set; } | |||
public ConnectionState ConnectionState { get; private set; } | |||
public TokenType AuthTokenType { get; private set; } | |||
internal RequestQueue RequestQueue { get; private set; } | |||
public DiscordApiClient(RestClientProvider restClientProvider, WebSocketProvider webSocketProvider = null, JsonSerializer serializer = null, RequestQueue requestQueue = null) | |||
public DiscordRestApiClient(RestClientProvider restClientProvider, JsonSerializer serializer = null, RequestQueue requestQueue = null) | |||
{ | |||
_connectionLock = new SemaphoreSlim(1, 1); | |||
_restClientProvider = restClientProvider; | |||
_serializer = serializer ?? new JsonSerializer { ContractResolver = new DiscordContractResolver() }; | |||
RequestQueue = requestQueue; | |||
_requestQueue = requestQueue ?? new RequestQueue(); | |||
_stateLock = new SemaphoreSlim(1, 1); | |||
_restClient = restClientProvider(DiscordRestConfig.ClientAPIUrl); | |||
SetBaseUrl(DiscordConfig.ClientAPIUrl); | |||
} | |||
internal void SetBaseUrl(string baseUrl) | |||
{ | |||
_restClient = _restClientProvider(baseUrl); | |||
_restClient.SetHeader("accept", "*/*"); | |||
_restClient.SetHeader("user-agent", DiscordRestConfig.UserAgent); | |||
if (webSocketProvider != null) | |||
{ | |||
_gatewayClient = webSocketProvider(); | |||
//_gatewayClient.SetHeader("user-agent", DiscordConfig.UserAgent); (Causes issues in .Net 4.6+) | |||
_gatewayClient.BinaryMessage += async (data, index, count) => | |||
{ | |||
using (var compressed = new MemoryStream(data, index + 2, count - 2)) | |||
using (var decompressed = new MemoryStream()) | |||
{ | |||
using (var zlib = new DeflateStream(compressed, CompressionMode.Decompress)) | |||
zlib.CopyTo(decompressed); | |||
decompressed.Position = 0; | |||
using (var reader = new StreamReader(decompressed)) | |||
using (var jsonReader = new JsonTextReader(reader)) | |||
{ | |||
var msg = _serializer.Deserialize<WebSocketMessage>(jsonReader); | |||
await _receivedGatewayEvent.InvokeAsync((GatewayOpCode)msg.Operation, msg.Sequence, msg.Type, msg.Payload).ConfigureAwait(false); | |||
} | |||
} | |||
}; | |||
_gatewayClient.TextMessage += async text => | |||
{ | |||
using (var reader = new StringReader(text)) | |||
using (var jsonReader = new JsonTextReader(reader)) | |||
{ | |||
var msg = _serializer.Deserialize<WebSocketMessage>(jsonReader); | |||
await _receivedGatewayEvent.InvokeAsync((GatewayOpCode)msg.Operation, msg.Sequence, msg.Type, msg.Payload).ConfigureAwait(false); | |||
} | |||
}; | |||
_gatewayClient.Closed += async ex => | |||
{ | |||
await DisconnectAsync().ConfigureAwait(false); | |||
await _disconnectedEvent.InvokeAsync(ex).ConfigureAwait(false); | |||
}; | |||
} | |||
_serializer = serializer ?? new JsonSerializer { ContractResolver = new DiscordContractResolver() }; | |||
} | |||
private void Dispose(bool disposing) | |||
internal virtual void Dispose(bool disposing) | |||
{ | |||
if (!_isDisposed) | |||
{ | |||
if (disposing) | |||
{ | |||
_loginCancelToken?.Dispose(); | |||
_connectCancelToken?.Dispose(); | |||
(_restClient as IDisposable)?.Dispose(); | |||
(_gatewayClient as IDisposable)?.Dispose(); | |||
} | |||
_isDisposed = true; | |||
} | |||
@@ -114,12 +71,12 @@ namespace Discord.API | |||
public async Task LoginAsync(TokenType tokenType, string token, RequestOptions options = null) | |||
{ | |||
await _connectionLock.WaitAsync().ConfigureAwait(false); | |||
await _stateLock.WaitAsync().ConfigureAwait(false); | |||
try | |||
{ | |||
await LoginInternalAsync(tokenType, token, options).ConfigureAwait(false); | |||
} | |||
finally { _connectionLock.Release(); } | |||
finally { _stateLock.Release(); } | |||
} | |||
private async Task LoginInternalAsync(TokenType tokenType, string token, RequestOptions options = null) | |||
{ | |||
@@ -134,7 +91,7 @@ namespace Discord.API | |||
AuthTokenType = TokenType.User; | |||
_authToken = null; | |||
_restClient.SetHeader("authorization", null); | |||
await _requestQueue.SetCancelTokenAsync(_loginCancelToken.Token).ConfigureAwait(false); | |||
await RequestQueue.SetCancelTokenAsync(_loginCancelToken.Token).ConfigureAwait(false); | |||
_restClient.SetCancelToken(_loginCancelToken.Token); | |||
AuthTokenType = tokenType; | |||
@@ -165,12 +122,12 @@ namespace Discord.API | |||
public async Task LogoutAsync() | |||
{ | |||
await _connectionLock.WaitAsync().ConfigureAwait(false); | |||
await _stateLock.WaitAsync().ConfigureAwait(false); | |||
try | |||
{ | |||
await LogoutInternalAsync().ConfigureAwait(false); | |||
} | |||
finally { _connectionLock.Release(); } | |||
finally { _stateLock.Release(); } | |||
} | |||
private async Task LogoutInternalAsync() | |||
{ | |||
@@ -182,87 +139,16 @@ namespace Discord.API | |||
catch { } | |||
await DisconnectInternalAsync().ConfigureAwait(false); | |||
await _requestQueue.ClearAsync().ConfigureAwait(false); | |||
await RequestQueue.ClearAsync().ConfigureAwait(false); | |||
await _requestQueue.SetCancelTokenAsync(CancellationToken.None).ConfigureAwait(false); | |||
await RequestQueue.SetCancelTokenAsync(CancellationToken.None).ConfigureAwait(false); | |||
_restClient.SetCancelToken(CancellationToken.None); | |||
LoginState = LoginState.LoggedOut; | |||
} | |||
public async Task ConnectAsync() | |||
{ | |||
await _connectionLock.WaitAsync().ConfigureAwait(false); | |||
try | |||
{ | |||
await ConnectInternalAsync().ConfigureAwait(false); | |||
} | |||
finally { _connectionLock.Release(); } | |||
} | |||
private async Task ConnectInternalAsync() | |||
{ | |||
if (LoginState != LoginState.LoggedIn) | |||
throw new InvalidOperationException("You must log in before connecting."); | |||
if (_gatewayClient == null) | |||
throw new NotSupportedException("This client is not configured with websocket support."); | |||
ConnectionState = ConnectionState.Connecting; | |||
try | |||
{ | |||
_connectCancelToken = new CancellationTokenSource(); | |||
if (_gatewayClient != null) | |||
_gatewayClient.SetCancelToken(_connectCancelToken.Token); | |||
if (_gatewayUrl == null) | |||
{ | |||
var gatewayResponse = await GetGatewayAsync().ConfigureAwait(false); | |||
_gatewayUrl = $"{gatewayResponse.Url}?v={DiscordConfig.APIVersion}&encoding={DiscordSocketConfig.GatewayEncoding}"; | |||
} | |||
await _gatewayClient.ConnectAsync(_gatewayUrl).ConfigureAwait(false); | |||
ConnectionState = ConnectionState.Connected; | |||
} | |||
catch (Exception) | |||
{ | |||
_gatewayUrl = null; //Uncache in case the gateway url changed | |||
await DisconnectInternalAsync().ConfigureAwait(false); | |||
throw; | |||
} | |||
} | |||
public async Task DisconnectAsync() | |||
{ | |||
await _connectionLock.WaitAsync().ConfigureAwait(false); | |||
try | |||
{ | |||
await DisconnectInternalAsync().ConfigureAwait(false); | |||
} | |||
finally { _connectionLock.Release(); } | |||
} | |||
public async Task DisconnectAsync(Exception ex) | |||
{ | |||
await _connectionLock.WaitAsync().ConfigureAwait(false); | |||
try | |||
{ | |||
await DisconnectInternalAsync().ConfigureAwait(false); | |||
} | |||
finally { _connectionLock.Release(); } | |||
} | |||
private async Task DisconnectInternalAsync() | |||
{ | |||
if (_gatewayClient == null) | |||
throw new NotSupportedException("This client is not configured with websocket support."); | |||
if (ConnectionState == ConnectionState.Disconnected) return; | |||
ConnectionState = ConnectionState.Disconnecting; | |||
try { _connectCancelToken?.Cancel(false); } | |||
catch { } | |||
await _gatewayClient.DisconnectAsync().ConfigureAwait(false); | |||
ConnectionState = ConnectionState.Disconnected; | |||
} | |||
internal virtual Task ConnectInternalAsync() => Task.CompletedTask; | |||
internal virtual Task DisconnectInternalAsync() => Task.CompletedTask; | |||
//REST | |||
public Task SendAsync(string method, string endpoint, | |||
@@ -306,15 +192,6 @@ namespace Discord.API | |||
GuildBucket bucket, ulong guildId, RequestOptions options = null) where TResponse : class | |||
=> DeserializeJson<TResponse>(await SendMultipartInternalAsync(method, endpoint, multipartArgs, false, BucketGroup.Guild, (int)bucket, guildId, options).ConfigureAwait(false)); | |||
//Gateway | |||
public Task SendGatewayAsync(GatewayOpCode opCode, object payload, | |||
GlobalBucket bucket = GlobalBucket.GeneralGateway, RequestOptions options = null) | |||
=> SendGatewayInternalAsync(opCode, payload, BucketGroup.Global, (int)bucket, 0, options); | |||
public Task SendGatewayAsync(GatewayOpCode opCode, object payload, | |||
GuildBucket bucket, ulong guildId, RequestOptions options = null) | |||
=> SendGatewayInternalAsync(opCode, payload, BucketGroup.Guild, (int)bucket, guildId, options); | |||
//Core | |||
private async Task<Stream> SendInternalAsync(string method, string endpoint, object payload, bool headerOnly, | |||
BucketGroup group, int bucketId, ulong guildId, RequestOptions options = null) | |||
@@ -323,7 +200,7 @@ namespace Discord.API | |||
string json = null; | |||
if (payload != null) | |||
json = SerializeJson(payload); | |||
var responseStream = await _requestQueue.SendAsync(new RestRequest(_restClient, method, endpoint, json, headerOnly, options), group, bucketId, guildId).ConfigureAwait(false); | |||
var responseStream = await RequestQueue.SendAsync(new RestRequest(_restClient, method, endpoint, json, headerOnly, options), group, bucketId, guildId).ConfigureAwait(false); | |||
stopwatch.Stop(); | |||
double milliseconds = ToMilliseconds(stopwatch); | |||
@@ -335,7 +212,7 @@ namespace Discord.API | |||
BucketGroup group, int bucketId, ulong guildId, RequestOptions options = null) | |||
{ | |||
var stopwatch = Stopwatch.StartNew(); | |||
var responseStream = await _requestQueue.SendAsync(new RestRequest(_restClient, method, endpoint, multipartArgs, headerOnly, options), group, bucketId, guildId).ConfigureAwait(false); | |||
var responseStream = await RequestQueue.SendAsync(new RestRequest(_restClient, method, endpoint, multipartArgs, headerOnly, options), group, bucketId, guildId).ConfigureAwait(false); | |||
int bytes = headerOnly ? 0 : (int)responseStream.Length; | |||
stopwatch.Stop(); | |||
@@ -344,17 +221,6 @@ namespace Discord.API | |||
return responseStream; | |||
} | |||
private async Task SendGatewayInternalAsync(GatewayOpCode opCode, object payload, | |||
BucketGroup group, int bucketId, ulong guildId, RequestOptions options) | |||
{ | |||
//TODO: Add ETF | |||
byte[] bytes = null; | |||
payload = new WebSocketMessage { Operation = (int)opCode, Payload = payload }; | |||
if (payload != null) | |||
bytes = Encoding.UTF8.GetBytes(SerializeJson(payload)); | |||
await _requestQueue.SendAsync(new WebSocketRequest(_gatewayClient, bytes, true, options), group, bucketId, guildId).ConfigureAwait(false); | |||
await _sentGatewayMessageEvent.InvokeAsync(opCode).ConfigureAwait(false); | |||
} | |||
//Auth | |||
public async Task ValidateTokenAsync(RequestOptions options = null) | |||
@@ -362,69 +228,6 @@ namespace Discord.API | |||
await SendAsync("GET", "auth/login", options: options).ConfigureAwait(false); | |||
} | |||
//Gateway | |||
public async Task<GetGatewayResponse> GetGatewayAsync(RequestOptions options = null) | |||
{ | |||
return await SendAsync<GetGatewayResponse>("GET", "gateway", options: options).ConfigureAwait(false); | |||
} | |||
public async Task SendIdentifyAsync(int largeThreshold = 100, bool useCompression = true, RequestOptions options = null) | |||
{ | |||
var props = new Dictionary<string, string> | |||
{ | |||
["$device"] = "Discord.Net" | |||
}; | |||
var msg = new IdentifyParams() | |||
{ | |||
Token = _authToken, | |||
Properties = props, | |||
LargeThreshold = largeThreshold, | |||
UseCompression = useCompression | |||
}; | |||
await SendGatewayAsync(GatewayOpCode.Identify, msg, options: options).ConfigureAwait(false); | |||
} | |||
public async Task SendResumeAsync(string sessionId, int lastSeq, RequestOptions options = null) | |||
{ | |||
var msg = new ResumeParams() | |||
{ | |||
Token = _authToken, | |||
SessionId = sessionId, | |||
Sequence = lastSeq | |||
}; | |||
await SendGatewayAsync(GatewayOpCode.Resume, msg, options: options).ConfigureAwait(false); | |||
} | |||
public async Task SendHeartbeatAsync(int lastSeq, RequestOptions options = null) | |||
{ | |||
await SendGatewayAsync(GatewayOpCode.Heartbeat, lastSeq, options: options).ConfigureAwait(false); | |||
} | |||
public async Task SendStatusUpdateAsync(long? idleSince, Game game, RequestOptions options = null) | |||
{ | |||
var args = new StatusUpdateParams | |||
{ | |||
IdleSince = idleSince, | |||
Game = game | |||
}; | |||
await SendGatewayAsync(GatewayOpCode.StatusUpdate, args, options: options).ConfigureAwait(false); | |||
} | |||
public async Task SendRequestMembersAsync(IEnumerable<ulong> guildIds, RequestOptions options = null) | |||
{ | |||
await SendGatewayAsync(GatewayOpCode.RequestGuildMembers, new RequestMembersParams { GuildIds = guildIds, Query = "", Limit = 0 }, options: options).ConfigureAwait(false); | |||
} | |||
public async Task SendVoiceStateUpdateAsync(ulong guildId, ulong? channelId, bool selfDeaf, bool selfMute, RequestOptions options = null) | |||
{ | |||
var payload = new VoiceStateUpdateParams | |||
{ | |||
GuildId = guildId, | |||
ChannelId = channelId, | |||
SelfDeaf = selfDeaf, | |||
SelfMute = selfMute | |||
}; | |||
await SendGatewayAsync(GatewayOpCode.VoiceStateUpdate, payload, options: options).ConfigureAwait(false); | |||
} | |||
public async Task SendGuildSyncAsync(IEnumerable<ulong> guildIds, RequestOptions options = null) | |||
{ | |||
await SendGatewayAsync(GatewayOpCode.GuildSync, guildIds, options: options).ConfigureAwait(false); | |||
} | |||
//Channels | |||
public async Task<Channel> GetChannelAsync(ulong channelId, RequestOptions options = null) | |||
{ | |||
@@ -1230,8 +1033,8 @@ namespace Discord.API | |||
} | |||
//Helpers | |||
private static double ToMilliseconds(Stopwatch stopwatch) => Math.Round((double)stopwatch.ElapsedTicks / (double)Stopwatch.Frequency * 1000.0, 2); | |||
private string SerializeJson(object value) | |||
protected static double ToMilliseconds(Stopwatch stopwatch) => Math.Round((double)stopwatch.ElapsedTicks / (double)Stopwatch.Frequency * 1000.0, 2); | |||
protected string SerializeJson(object value) | |||
{ | |||
var sb = new StringBuilder(256); | |||
using (TextWriter text = new StringWriter(sb, CultureInfo.InvariantCulture)) | |||
@@ -1239,7 +1042,7 @@ namespace Discord.API | |||
_serializer.Serialize(writer, value); | |||
return sb.ToString(); | |||
} | |||
private T DeserializeJson<T>(Stream jsonStream) | |||
protected T DeserializeJson<T>(Stream jsonStream) | |||
{ | |||
using (TextReader text = new StreamReader(jsonStream)) | |||
using (JsonReader reader = new JsonTextReader(text)) |
@@ -2,6 +2,7 @@ | |||
using Discord.Logging; | |||
using Discord.Net.Converters; | |||
using Discord.Net.Queue; | |||
using Discord.Net.Rest; | |||
using Discord.Net.WebSockets; | |||
using Newtonsoft.Json; | |||
using Newtonsoft.Json.Linq; | |||
@@ -17,7 +18,7 @@ using System.Threading.Tasks; | |||
namespace Discord.API | |||
{ | |||
public class DiscordRpcApiClient : IDisposable | |||
public class DiscordRpcApiClient : DiscordRestApiClient, IDisposable | |||
{ | |||
private abstract class RpcRequest | |||
{ | |||
@@ -60,19 +61,16 @@ namespace Discord.API | |||
private readonly ConcurrentDictionary<Guid, RpcRequest> _requests; | |||
private readonly RequestQueue _requestQueue; | |||
private readonly JsonSerializer _serializer; | |||
private readonly IWebSocketClient _webSocketClient; | |||
private readonly SemaphoreSlim _connectionLock; | |||
private readonly string _clientId; | |||
private CancellationTokenSource _loginCancelToken, _connectCancelToken; | |||
private string _authToken; | |||
private string _origin; | |||
private bool _isDisposed; | |||
public LoginState LoginState { get; private set; } | |||
public ConnectionState ConnectionState { get; private set; } | |||
public DiscordRpcApiClient(string clientId, string origin, WebSocketProvider webSocketProvider, JsonSerializer serializer = null, RequestQueue requestQueue = null) | |||
public DiscordRpcApiClient(string clientId, string origin, RestClientProvider restClientProvider, WebSocketProvider webSocketProvider, JsonSerializer serializer = null, RequestQueue requestQueue = null) | |||
: base(restClientProvider, serializer, requestQueue) | |||
{ | |||
_connectionLock = new SemaphoreSlim(1, 1); | |||
_clientId = clientId; | |||
@@ -80,33 +78,19 @@ namespace Discord.API | |||
_requestQueue = requestQueue ?? new RequestQueue(); | |||
_requests = new ConcurrentDictionary<Guid, RpcRequest>(); | |||
if (webSocketProvider != null) | |||
_webSocketClient = webSocketProvider(); | |||
//_webSocketClient.SetHeader("user-agent", DiscordConfig.UserAgent); (Causes issues in .Net 4.6+) | |||
_webSocketClient.SetHeader("origin", _origin); | |||
_webSocketClient.BinaryMessage += async (data, index, count) => | |||
{ | |||
_webSocketClient = webSocketProvider(); | |||
//_webSocketClient.SetHeader("user-agent", DiscordConfig.UserAgent); (Causes issues in .Net 4.6+) | |||
_webSocketClient.SetHeader("origin", _origin); | |||
_webSocketClient.BinaryMessage += async (data, index, count) => | |||
using (var compressed = new MemoryStream(data, index + 2, count - 2)) | |||
using (var decompressed = new MemoryStream()) | |||
{ | |||
using (var compressed = new MemoryStream(data, index + 2, count - 2)) | |||
using (var decompressed = new MemoryStream()) | |||
{ | |||
using (var zlib = new DeflateStream(compressed, CompressionMode.Decompress)) | |||
zlib.CopyTo(decompressed); | |||
decompressed.Position = 0; | |||
using (var reader = new StreamReader(decompressed)) | |||
using (var jsonReader = new JsonTextReader(reader)) | |||
{ | |||
var msg = _serializer.Deserialize<RpcMessage>(jsonReader); | |||
await _receivedRpcEvent.InvokeAsync(msg.Cmd, msg.Event, msg.Data).ConfigureAwait(false); | |||
if (msg.Nonce.IsSpecified && msg.Nonce.Value.HasValue) | |||
ProcessMessage(msg); | |||
} | |||
} | |||
}; | |||
_webSocketClient.TextMessage += async text => | |||
{ | |||
using (var reader = new StringReader(text)) | |||
using (var zlib = new DeflateStream(compressed, CompressionMode.Decompress)) | |||
zlib.CopyTo(decompressed); | |||
decompressed.Position = 0; | |||
using (var reader = new StreamReader(decompressed)) | |||
using (var jsonReader = new JsonTextReader(reader)) | |||
{ | |||
var msg = _serializer.Deserialize<RpcMessage>(jsonReader); | |||
@@ -114,17 +98,26 @@ namespace Discord.API | |||
if (msg.Nonce.IsSpecified && msg.Nonce.Value.HasValue) | |||
ProcessMessage(msg); | |||
} | |||
}; | |||
_webSocketClient.Closed += async ex => | |||
} | |||
}; | |||
_webSocketClient.TextMessage += async text => | |||
{ | |||
using (var reader = new StringReader(text)) | |||
using (var jsonReader = new JsonTextReader(reader)) | |||
{ | |||
await DisconnectAsync().ConfigureAwait(false); | |||
await _disconnectedEvent.InvokeAsync(ex).ConfigureAwait(false); | |||
}; | |||
} | |||
_serializer = serializer ?? new JsonSerializer { ContractResolver = new DiscordContractResolver() }; | |||
var msg = _serializer.Deserialize<RpcMessage>(jsonReader); | |||
await _receivedRpcEvent.InvokeAsync(msg.Cmd, msg.Event, msg.Data).ConfigureAwait(false); | |||
if (msg.Nonce.IsSpecified && msg.Nonce.Value.HasValue) | |||
ProcessMessage(msg); | |||
} | |||
}; | |||
_webSocketClient.Closed += async ex => | |||
{ | |||
await DisconnectAsync().ConfigureAwait(false); | |||
await _disconnectedEvent.InvokeAsync(ex).ConfigureAwait(false); | |||
}; | |||
} | |||
private void Dispose(bool disposing) | |||
internal override void Dispose(bool disposing) | |||
{ | |||
if (!_isDisposed) | |||
{ | |||
@@ -136,67 +129,6 @@ namespace Discord.API | |||
_isDisposed = true; | |||
} | |||
} | |||
public void Dispose() => Dispose(true); | |||
public async Task LoginAsync(TokenType tokenType, string token, bool upgrade = false, RequestOptions options = null) | |||
{ | |||
await _connectionLock.WaitAsync().ConfigureAwait(false); | |||
try | |||
{ | |||
await LoginInternalAsync(tokenType, token, upgrade, options).ConfigureAwait(false); | |||
} | |||
finally { _connectionLock.Release(); } | |||
} | |||
private async Task LoginInternalAsync(TokenType tokenType, string token, bool upgrade = false, RequestOptions options = null) | |||
{ | |||
if (!upgrade && LoginState != LoginState.LoggedOut) | |||
await LogoutInternalAsync().ConfigureAwait(false); | |||
if (tokenType != TokenType.Bearer) | |||
throw new InvalidOperationException("RPC only supports bearer tokens"); | |||
LoginState = LoginState.LoggingIn; | |||
try | |||
{ | |||
_loginCancelToken = new CancellationTokenSource(); | |||
await _requestQueue.SetCancelTokenAsync(_loginCancelToken.Token).ConfigureAwait(false); | |||
_authToken = token; | |||
LoginState = LoginState.LoggedIn; | |||
} | |||
catch (Exception) | |||
{ | |||
await LogoutInternalAsync().ConfigureAwait(false); | |||
throw; | |||
} | |||
} | |||
public async Task LogoutAsync() | |||
{ | |||
await _connectionLock.WaitAsync().ConfigureAwait(false); | |||
try | |||
{ | |||
await LogoutInternalAsync().ConfigureAwait(false); | |||
} | |||
finally { _connectionLock.Release(); } | |||
} | |||
private async Task LogoutInternalAsync() | |||
{ | |||
//An exception here will lock the client into the unusable LoggingOut state, but that's probably fine since our client is in an undefined state too. | |||
if (LoginState == LoginState.LoggedOut) return; | |||
LoginState = LoginState.LoggingOut; | |||
try { _loginCancelToken?.Cancel(false); } | |||
catch { } | |||
await DisconnectInternalAsync().ConfigureAwait(false); | |||
await _requestQueue.ClearAsync().ConfigureAwait(false); | |||
await _requestQueue.SetCancelTokenAsync(CancellationToken.None).ConfigureAwait(false); | |||
LoginState = LoginState.LoggedOut; | |||
} | |||
public async Task ConnectAsync() | |||
{ | |||
@@ -207,7 +139,7 @@ namespace Discord.API | |||
} | |||
finally { _connectionLock.Release(); } | |||
} | |||
private async Task ConnectInternalAsync() | |||
internal override async Task ConnectInternalAsync() | |||
{ | |||
/*if (LoginState != LoginState.LoggedIn) | |||
throw new InvalidOperationException("You must log in before connecting.");*/ | |||
@@ -226,6 +158,7 @@ namespace Discord.API | |||
{ | |||
string url = $"wss://discordapp.io:{port}/?v={DiscordRpcConfig.RpcAPIVersion}&client_id={_clientId}"; | |||
await _webSocketClient.ConnectAsync(url).ConfigureAwait(false); | |||
SetBaseUrl($"https://discordapp.io:{port}"); | |||
success = true; | |||
break; | |||
} | |||
@@ -254,7 +187,7 @@ namespace Discord.API | |||
} | |||
finally { _connectionLock.Release(); } | |||
} | |||
private async Task DisconnectInternalAsync() | |||
internal override async Task DisconnectInternalAsync() | |||
{ | |||
if (_webSocketClient == null) | |||
throw new NotSupportedException("This client is not configured with websocket support."); | |||
@@ -421,22 +354,5 @@ namespace Discord.API | |||
else | |||
return false; | |||
} | |||
//Helpers | |||
private static double ToMilliseconds(Stopwatch stopwatch) => Math.Round((double)stopwatch.ElapsedTicks / (double)Stopwatch.Frequency * 1000.0, 2); | |||
private string SerializeJson(object value) | |||
{ | |||
var sb = new StringBuilder(256); | |||
using (TextWriter text = new StringWriter(sb, CultureInfo.InvariantCulture)) | |||
using (JsonWriter writer = new JsonTextWriter(text)) | |||
_serializer.Serialize(writer, value); | |||
return sb.ToString(); | |||
} | |||
private T DeserializeJson<T>(Stream jsonStream) | |||
{ | |||
using (TextReader text = new StreamReader(jsonStream)) | |||
using (JsonReader reader = new JsonTextReader(text)) | |||
return _serializer.Deserialize<T>(reader); | |||
} | |||
} | |||
} |
@@ -0,0 +1,240 @@ | |||
using Discord.API.Gateway; | |||
using Discord.API.Rest; | |||
using Discord.Net.Queue; | |||
using Discord.Net.Rest; | |||
using Discord.Net.WebSockets; | |||
using Newtonsoft.Json; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.IO; | |||
using System.IO.Compression; | |||
using System.Text; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
namespace Discord.API | |||
{ | |||
public class DiscordSocketApiClient : DiscordRestApiClient | |||
{ | |||
public event Func<GatewayOpCode, Task> SentGatewayMessage { add { _sentGatewayMessageEvent.Add(value); } remove { _sentGatewayMessageEvent.Remove(value); } } | |||
private readonly AsyncEvent<Func<GatewayOpCode, Task>> _sentGatewayMessageEvent = new AsyncEvent<Func<GatewayOpCode, Task>>(); | |||
public event Func<GatewayOpCode, int?, string, object, Task> ReceivedGatewayEvent { add { _receivedGatewayEvent.Add(value); } remove { _receivedGatewayEvent.Remove(value); } } | |||
private readonly AsyncEvent<Func<GatewayOpCode, int?, string, object, Task>> _receivedGatewayEvent = new AsyncEvent<Func<GatewayOpCode, int?, string, object, Task>>(); | |||
public event Func<Exception, Task> Disconnected { add { _disconnectedEvent.Add(value); } remove { _disconnectedEvent.Remove(value); } } | |||
private readonly AsyncEvent<Func<Exception, Task>> _disconnectedEvent = new AsyncEvent<Func<Exception, Task>>(); | |||
private readonly IWebSocketClient _gatewayClient; | |||
private CancellationTokenSource _connectCancelToken; | |||
private string _gatewayUrl; | |||
public ConnectionState ConnectionState { get; private set; } | |||
public DiscordSocketApiClient(RestClientProvider restClientProvider, WebSocketProvider webSocketProvider, JsonSerializer serializer = null, RequestQueue requestQueue = null) | |||
: base(restClientProvider, serializer, requestQueue) | |||
{ | |||
_gatewayClient = webSocketProvider(); | |||
//_gatewayClient.SetHeader("user-agent", DiscordConfig.UserAgent); (Causes issues in .Net 4.6+) | |||
_gatewayClient.BinaryMessage += async (data, index, count) => | |||
{ | |||
using (var compressed = new MemoryStream(data, index + 2, count - 2)) | |||
using (var decompressed = new MemoryStream()) | |||
{ | |||
using (var zlib = new DeflateStream(compressed, CompressionMode.Decompress)) | |||
zlib.CopyTo(decompressed); | |||
decompressed.Position = 0; | |||
using (var reader = new StreamReader(decompressed)) | |||
using (var jsonReader = new JsonTextReader(reader)) | |||
{ | |||
var msg = _serializer.Deserialize<WebSocketMessage>(jsonReader); | |||
await _receivedGatewayEvent.InvokeAsync((GatewayOpCode)msg.Operation, msg.Sequence, msg.Type, msg.Payload).ConfigureAwait(false); | |||
} | |||
} | |||
}; | |||
_gatewayClient.TextMessage += async text => | |||
{ | |||
using (var reader = new StringReader(text)) | |||
using (var jsonReader = new JsonTextReader(reader)) | |||
{ | |||
var msg = _serializer.Deserialize<WebSocketMessage>(jsonReader); | |||
await _receivedGatewayEvent.InvokeAsync((GatewayOpCode)msg.Operation, msg.Sequence, msg.Type, msg.Payload).ConfigureAwait(false); | |||
} | |||
}; | |||
_gatewayClient.Closed += async ex => | |||
{ | |||
await DisconnectAsync().ConfigureAwait(false); | |||
await _disconnectedEvent.InvokeAsync(ex).ConfigureAwait(false); | |||
}; | |||
} | |||
internal override void Dispose(bool disposing) | |||
{ | |||
if (!_isDisposed) | |||
{ | |||
if (disposing) | |||
{ | |||
_connectCancelToken?.Dispose(); | |||
(_gatewayClient as IDisposable)?.Dispose(); | |||
} | |||
_isDisposed = true; | |||
} | |||
} | |||
public async Task ConnectAsync() | |||
{ | |||
await _stateLock.WaitAsync().ConfigureAwait(false); | |||
try | |||
{ | |||
await ConnectInternalAsync().ConfigureAwait(false); | |||
} | |||
finally { _stateLock.Release(); } | |||
} | |||
internal override async Task ConnectInternalAsync() | |||
{ | |||
if (LoginState != LoginState.LoggedIn) | |||
throw new InvalidOperationException("You must log in before connecting."); | |||
if (_gatewayClient == null) | |||
throw new NotSupportedException("This client is not configured with websocket support."); | |||
ConnectionState = ConnectionState.Connecting; | |||
try | |||
{ | |||
_connectCancelToken = new CancellationTokenSource(); | |||
if (_gatewayClient != null) | |||
_gatewayClient.SetCancelToken(_connectCancelToken.Token); | |||
if (_gatewayUrl == null) | |||
{ | |||
var gatewayResponse = await GetGatewayAsync().ConfigureAwait(false); | |||
_gatewayUrl = $"{gatewayResponse.Url}?v={DiscordConfig.APIVersion}&encoding={DiscordSocketConfig.GatewayEncoding}"; | |||
} | |||
await _gatewayClient.ConnectAsync(_gatewayUrl).ConfigureAwait(false); | |||
ConnectionState = ConnectionState.Connected; | |||
} | |||
catch (Exception) | |||
{ | |||
_gatewayUrl = null; //Uncache in case the gateway url changed | |||
await DisconnectInternalAsync().ConfigureAwait(false); | |||
throw; | |||
} | |||
} | |||
public async Task DisconnectAsync() | |||
{ | |||
await _stateLock.WaitAsync().ConfigureAwait(false); | |||
try | |||
{ | |||
await DisconnectInternalAsync().ConfigureAwait(false); | |||
} | |||
finally { _stateLock.Release(); } | |||
} | |||
public async Task DisconnectAsync(Exception ex) | |||
{ | |||
await _stateLock.WaitAsync().ConfigureAwait(false); | |||
try | |||
{ | |||
await DisconnectInternalAsync().ConfigureAwait(false); | |||
} | |||
finally { _stateLock.Release(); } | |||
} | |||
internal override async Task DisconnectInternalAsync() | |||
{ | |||
if (_gatewayClient == null) | |||
throw new NotSupportedException("This client is not configured with websocket support."); | |||
if (ConnectionState == ConnectionState.Disconnected) return; | |||
ConnectionState = ConnectionState.Disconnecting; | |||
try { _connectCancelToken?.Cancel(false); } | |||
catch { } | |||
await _gatewayClient.DisconnectAsync().ConfigureAwait(false); | |||
ConnectionState = ConnectionState.Disconnected; | |||
} | |||
//Core | |||
private async Task SendGatewayInternalAsync(GatewayOpCode opCode, object payload, | |||
BucketGroup group, int bucketId, ulong guildId, RequestOptions options) | |||
{ | |||
//TODO: Add ETF | |||
byte[] bytes = null; | |||
payload = new WebSocketMessage { Operation = (int)opCode, Payload = payload }; | |||
if (payload != null) | |||
bytes = Encoding.UTF8.GetBytes(SerializeJson(payload)); | |||
await RequestQueue.SendAsync(new WebSocketRequest(_gatewayClient, bytes, true, options), group, bucketId, guildId).ConfigureAwait(false); | |||
await _sentGatewayMessageEvent.InvokeAsync(opCode).ConfigureAwait(false); | |||
} | |||
//Gateway | |||
public Task SendGatewayAsync(GatewayOpCode opCode, object payload, | |||
GlobalBucket bucket = GlobalBucket.GeneralGateway, RequestOptions options = null) | |||
=> SendGatewayInternalAsync(opCode, payload, BucketGroup.Global, (int)bucket, 0, options); | |||
public Task SendGatewayAsync(GatewayOpCode opCode, object payload, | |||
GuildBucket bucket, ulong guildId, RequestOptions options = null) | |||
=> SendGatewayInternalAsync(opCode, payload, BucketGroup.Guild, (int)bucket, guildId, options); | |||
public async Task<GetGatewayResponse> GetGatewayAsync(RequestOptions options = null) | |||
{ | |||
return await SendAsync<GetGatewayResponse>("GET", "gateway", options: options).ConfigureAwait(false); | |||
} | |||
public async Task SendIdentifyAsync(int largeThreshold = 100, bool useCompression = true, RequestOptions options = null) | |||
{ | |||
var props = new Dictionary<string, string> | |||
{ | |||
["$device"] = "Discord.Net" | |||
}; | |||
var msg = new IdentifyParams() | |||
{ | |||
Token = _authToken, | |||
Properties = props, | |||
LargeThreshold = largeThreshold, | |||
UseCompression = useCompression | |||
}; | |||
await SendGatewayAsync(GatewayOpCode.Identify, msg, options: options).ConfigureAwait(false); | |||
} | |||
public async Task SendResumeAsync(string sessionId, int lastSeq, RequestOptions options = null) | |||
{ | |||
var msg = new ResumeParams() | |||
{ | |||
Token = _authToken, | |||
SessionId = sessionId, | |||
Sequence = lastSeq | |||
}; | |||
await SendGatewayAsync(GatewayOpCode.Resume, msg, options: options).ConfigureAwait(false); | |||
} | |||
public async Task SendHeartbeatAsync(int lastSeq, RequestOptions options = null) | |||
{ | |||
await SendGatewayAsync(GatewayOpCode.Heartbeat, lastSeq, options: options).ConfigureAwait(false); | |||
} | |||
public async Task SendStatusUpdateAsync(long? idleSince, Game game, RequestOptions options = null) | |||
{ | |||
var args = new StatusUpdateParams | |||
{ | |||
IdleSince = idleSince, | |||
Game = game | |||
}; | |||
await SendGatewayAsync(GatewayOpCode.StatusUpdate, args, options: options).ConfigureAwait(false); | |||
} | |||
public async Task SendRequestMembersAsync(IEnumerable<ulong> guildIds, RequestOptions options = null) | |||
{ | |||
await SendGatewayAsync(GatewayOpCode.RequestGuildMembers, new RequestMembersParams { GuildIds = guildIds, Query = "", Limit = 0 }, options: options).ConfigureAwait(false); | |||
} | |||
public async Task SendVoiceStateUpdateAsync(ulong guildId, ulong? channelId, bool selfDeaf, bool selfMute, RequestOptions options = null) | |||
{ | |||
var payload = new VoiceStateUpdateParams | |||
{ | |||
GuildId = guildId, | |||
ChannelId = channelId, | |||
SelfDeaf = selfDeaf, | |||
SelfMute = selfMute | |||
}; | |||
await SendGatewayAsync(GatewayOpCode.VoiceStateUpdate, payload, options: options).ConfigureAwait(false); | |||
} | |||
public async Task SendGuildSyncAsync(IEnumerable<ulong> guildIds, RequestOptions options = null) | |||
{ | |||
await SendGatewayAsync(GatewayOpCode.GuildSync, guildIds, options: options).ConfigureAwait(false); | |||
} | |||
} | |||
} |
@@ -27,20 +27,21 @@ namespace Discord | |||
internal readonly ILogger _clientLogger, _restLogger, _queueLogger; | |||
internal readonly SemaphoreSlim _connectionLock; | |||
internal readonly RequestQueue _requestQueue; | |||
internal SelfUser _currentUser; | |||
private bool _isFirstLogSub; | |||
internal bool _isDisposed; | |||
public API.DiscordApiClient ApiClient { get; } | |||
public API.DiscordRestApiClient ApiClient { get; } | |||
internal LogManager LogManager { get; } | |||
public LoginState LoginState { get; private set; } | |||
/// <summary> Creates a new REST-only discord client. </summary> | |||
public DiscordRestClient() : this(new DiscordRestConfig()) { } | |||
public DiscordRestClient(DiscordRestConfig config) : this(config, CreateApiClient(config)) { } | |||
/// <summary> Creates a new REST-only discord client. </summary> | |||
public DiscordRestClient(DiscordRestConfig config) | |||
internal DiscordRestClient(DiscordRestConfig config, API.DiscordRestApiClient client) | |||
{ | |||
ApiClient = client; | |||
LogManager = new LogManager(config.LogLevel); | |||
LogManager.Message += async msg => await _logEvent.InvokeAsync(msg).ConfigureAwait(false); | |||
_clientLogger = LogManager.CreateLogger("Client"); | |||
@@ -50,19 +51,16 @@ namespace Discord | |||
_connectionLock = new SemaphoreSlim(1, 1); | |||
_requestQueue = new RequestQueue(); | |||
_requestQueue.RateLimitTriggered += async (id, bucket, millis) => | |||
ApiClient.RequestQueue.RateLimitTriggered += async (id, bucket, millis) => | |||
{ | |||
await _queueLogger.WarningAsync($"Rate limit triggered (id = \"{id ?? "null"}\")").ConfigureAwait(false); | |||
if (bucket == null && id != null) | |||
await _queueLogger.WarningAsync($"Unknown rate limit bucket \"{id ?? "null"}\"").ConfigureAwait(false); | |||
}; | |||
var restProvider = config.RestClientProvider; | |||
var webSocketProvider = (this is DiscordSocketClient) ? (config as DiscordSocketConfig)?.WebSocketProvider : null; //TODO: Clean this check | |||
ApiClient = new API.DiscordApiClient(restProvider, webSocketProvider, requestQueue: _requestQueue); | |||
ApiClient.SentRequest += async (method, endpoint, millis) => await _restLogger.VerboseAsync($"{method} {endpoint}: {millis} ms").ConfigureAwait(false); | |||
} | |||
private static API.DiscordRestApiClient CreateApiClient(DiscordRestConfig config) | |||
=> new API.DiscordRestApiClient(config.RestClientProvider, requestQueue: new RequestQueue()); | |||
/// <inheritdoc /> | |||
public async Task LoginAsync(TokenType tokenType, string token, bool validateToken = true) | |||
@@ -89,27 +87,9 @@ namespace Discord | |||
try | |||
{ | |||
await ApiClient.LoginAsync(tokenType, token).ConfigureAwait(false); | |||
if (validateToken) | |||
{ | |||
try | |||
{ | |||
var user = await GetCurrentUserAsync().ConfigureAwait(false); | |||
if (user == null) //Is using a cached DiscordClient | |||
user = new SelfUser(this, await ApiClient.GetMyUserAsync().ConfigureAwait(false)); | |||
if (user.IsBot && tokenType == TokenType.User) | |||
throw new InvalidOperationException($"A bot token used provided with {nameof(TokenType)}.{nameof(TokenType.User)}"); | |||
else if (!user.IsBot && tokenType == TokenType.Bot) //Discord currently sends a 401 in this case | |||
throw new InvalidOperationException($"A user token used provided with {nameof(TokenType)}.{nameof(TokenType.Bot)}"); | |||
} | |||
catch (HttpException ex) | |||
{ | |||
throw new ArgumentException("Token validation failed", nameof(token), ex); | |||
} | |||
} | |||
await OnLoginAsync().ConfigureAwait(false); | |||
await ValidateTokenAsync(tokenType, token).ConfigureAwait(false); | |||
await OnLoginAsync(tokenType, token).ConfigureAwait(false); | |||
LoginState = LoginState.LoggedIn; | |||
} | |||
@@ -121,7 +101,26 @@ namespace Discord | |||
await _loggedInEvent.InvokeAsync().ConfigureAwait(false); | |||
} | |||
protected virtual Task OnLoginAsync() => Task.CompletedTask; | |||
protected virtual async Task ValidateTokenAsync(TokenType tokenType, string token) | |||
{ | |||
try | |||
{ | |||
var user = await GetCurrentUserAsync().ConfigureAwait(false); | |||
if (user == null) //Is using a cached DiscordClient | |||
user = new SelfUser(this, await ApiClient.GetMyUserAsync().ConfigureAwait(false)); | |||
if (user.IsBot && tokenType == TokenType.User) | |||
throw new InvalidOperationException($"A bot token used provided with {nameof(TokenType)}.{nameof(TokenType.User)}"); | |||
else if (!user.IsBot && tokenType == TokenType.Bot) //Discord currently sends a 401 in this case | |||
throw new InvalidOperationException($"A user token used provided with {nameof(TokenType)}.{nameof(TokenType.Bot)}"); | |||
} | |||
catch (HttpException ex) | |||
{ | |||
throw new ArgumentException("Token validation failed", nameof(token), ex); | |||
} | |||
} | |||
protected virtual Task OnLoginAsync(TokenType tokenType, string token) => Task.CompletedTask; | |||
/// <inheritdoc /> | |||
public async Task LogoutAsync() | |||
@@ -1,6 +1,7 @@ | |||
using Discord.API.Rpc; | |||
using Discord.Logging; | |||
using Discord.Net.Converters; | |||
using Discord.Net.Queue; | |||
using Newtonsoft.Json; | |||
using Newtonsoft.Json.Linq; | |||
using System; | |||
@@ -10,16 +11,8 @@ using System.Threading.Tasks; | |||
namespace Discord | |||
{ | |||
public class DiscordRpcClient | |||
public class DiscordRpcClient : DiscordRestClient | |||
{ | |||
public event Func<LogMessage, Task> Log { add { _logEvent.Add(value); } remove { _logEvent.Remove(value); } } | |||
private readonly AsyncEvent<Func<LogMessage, Task>> _logEvent = new AsyncEvent<Func<LogMessage, Task>>(); | |||
public event Func<Task> LoggedIn { add { _loggedInEvent.Add(value); } remove { _loggedInEvent.Remove(value); } } | |||
private readonly AsyncEvent<Func<Task>> _loggedInEvent = new AsyncEvent<Func<Task>>(); | |||
public event Func<Task> LoggedOut { add { _loggedOutEvent.Add(value); } remove { _loggedOutEvent.Remove(value); } } | |||
private readonly AsyncEvent<Func<Task>> _loggedOutEvent = new AsyncEvent<Func<Task>>(); | |||
public event Func<Task> Connected { add { _connectedEvent.Add(value); } remove { _connectedEvent.Remove(value); } } | |||
private readonly AsyncEvent<Func<Task>> _connectedEvent = new AsyncEvent<Func<Task>>(); | |||
public event Func<Exception, Task> Disconnected { add { _disconnectedEvent.Add(value); } remove { _disconnectedEvent.Remove(value); } } | |||
@@ -28,44 +21,36 @@ namespace Discord | |||
public event Func<Task> Ready { add { _readyEvent.Add(value); } remove { _readyEvent.Remove(value); } } | |||
private readonly AsyncEvent<Func<Task>> _readyEvent = new AsyncEvent<Func<Task>>(); | |||
private readonly ILogger _clientLogger, _rpcLogger; | |||
private readonly SemaphoreSlim _connectionLock; | |||
private readonly ILogger _rpcLogger; | |||
private readonly JsonSerializer _serializer; | |||
private TaskCompletionSource<bool> _connectTask; | |||
private CancellationTokenSource _cancelToken; | |||
internal SelfUser _currentUser; | |||
private CancellationTokenSource _cancelToken, _reconnectCancelToken; | |||
private Task _reconnectTask; | |||
private bool _isFirstLogSub; | |||
private bool _isReconnecting; | |||
private bool _isDisposed; | |||
private bool _canReconnect; | |||
public API.DiscordRpcApiClient ApiClient { get; } | |||
internal LogManager LogManager { get; } | |||
public LoginState LoginState { get; private set; } | |||
public ConnectionState ConnectionState { get; private set; } | |||
public new API.DiscordRpcApiClient ApiClient => base.ApiClient as API.DiscordRpcApiClient; | |||
/// <summary> Creates a new RPC discord client. </summary> | |||
public DiscordRpcClient(string clientId, string origin) : this(new DiscordRpcConfig(clientId, origin)) { } | |||
/// <summary> Creates a new RPC discord client. </summary> | |||
public DiscordRpcClient(DiscordRpcConfig config) | |||
: base(config, CreateApiClient(config)) | |||
{ | |||
LogManager = new LogManager(config.LogLevel); | |||
LogManager.Message += async msg => await _logEvent.InvokeAsync(msg).ConfigureAwait(false); | |||
_clientLogger = LogManager.CreateLogger("Client"); | |||
_rpcLogger = LogManager.CreateLogger("RPC"); | |||
_isFirstLogSub = true; | |||
_connectionLock = new SemaphoreSlim(1, 1); | |||
_serializer = new JsonSerializer { ContractResolver = new DiscordContractResolver() }; | |||
_serializer.Error += (s, e) => | |||
{ | |||
_rpcLogger.WarningAsync(e.ErrorContext.Error).GetAwaiter().GetResult(); | |||
e.ErrorContext.Handled = true; | |||
}; | |||
ApiClient = new API.DiscordRpcApiClient(config.ClientId, config.Origin, config.WebSocketProvider); | |||
ApiClient.SentRpcMessage += async opCode => await _rpcLogger.DebugAsync($"Sent {opCode}").ConfigureAwait(false); | |||
ApiClient.ReceivedRpcEvent += ProcessMessageAsync; | |||
ApiClient.Disconnected += async ex => | |||
@@ -79,91 +64,48 @@ namespace Discord | |||
await _rpcLogger.WarningAsync($"Connection Closed").ConfigureAwait(false); | |||
}; | |||
} | |||
private void Dispose(bool disposing) | |||
private static API.DiscordRpcApiClient CreateApiClient(DiscordRpcConfig config) | |||
=> new API.DiscordRpcApiClient(config.ClientId, config.Origin, config.RestClientProvider, config.WebSocketProvider, requestQueue: new RequestQueue()); | |||
internal override void Dispose(bool disposing) | |||
{ | |||
if (!_isDisposed) | |||
{ | |||
ApiClient.Dispose(); | |||
_isDisposed = true; | |||
} | |||
} | |||
public void Dispose() => Dispose(true); | |||
/// <inheritdoc /> | |||
public async Task LoginAsync(TokenType tokenType, string token, bool validateToken = true) | |||
protected override async Task OnLoginAsync(TokenType tokenType, string token) | |||
{ | |||
await _connectionLock.WaitAsync().ConfigureAwait(false); | |||
try | |||
{ | |||
await LoginInternalAsync(tokenType, token, validateToken).ConfigureAwait(false); | |||
} | |||
finally { _connectionLock.Release(); } | |||
await ApiClient.LoginAsync(tokenType, token).ConfigureAwait(false); | |||
} | |||
private async Task LoginInternalAsync(TokenType tokenType, string token, bool validateToken) | |||
protected override async Task OnLogoutAsync() | |||
{ | |||
if (_isFirstLogSub) | |||
{ | |||
_isFirstLogSub = false; | |||
await WriteInitialLog().ConfigureAwait(false); | |||
} | |||
if (LoginState != LoginState.LoggedOut) | |||
await LogoutInternalAsync().ConfigureAwait(false); | |||
LoginState = LoginState.LoggingIn; | |||
try | |||
{ | |||
await ApiClient.LoginAsync(tokenType, token).ConfigureAwait(false); | |||
LoginState = LoginState.LoggedIn; | |||
} | |||
catch (Exception) | |||
{ | |||
await LogoutInternalAsync().ConfigureAwait(false); | |||
throw; | |||
} | |||
await _loggedInEvent.InvokeAsync().ConfigureAwait(false); | |||
await ApiClient.LogoutAsync().ConfigureAwait(false); | |||
} | |||
/// <inheritdoc /> | |||
public async Task LogoutAsync() | |||
protected override Task ValidateTokenAsync(TokenType tokenType, string token) | |||
{ | |||
await _connectionLock.WaitAsync().ConfigureAwait(false); | |||
try | |||
{ | |||
await LogoutInternalAsync().ConfigureAwait(false); | |||
} | |||
finally { _connectionLock.Release(); } | |||
return Task.CompletedTask; //Validation is done in DiscordRpcAPIClient | |||
} | |||
private async Task LogoutInternalAsync() | |||
{ | |||
if (LoginState == LoginState.LoggedOut) return; | |||
LoginState = LoginState.LoggingOut; | |||
await ApiClient.LogoutAsync().ConfigureAwait(false); | |||
_currentUser = null; | |||
LoginState = LoginState.LoggedOut; | |||
await _loggedOutEvent.InvokeAsync().ConfigureAwait(false); | |||
} | |||
public async Task ConnectAsync() | |||
/// <inheritdoc /> | |||
public Task ConnectAsync() => ConnectAsync(false); | |||
internal async Task ConnectAsync(bool ignoreLoginCheck) | |||
{ | |||
await _connectionLock.WaitAsync().ConfigureAwait(false); | |||
try | |||
{ | |||
_isReconnecting = false; | |||
await ConnectInternalAsync().ConfigureAwait(false); | |||
await ConnectInternalAsync(ignoreLoginCheck, false).ConfigureAwait(false); | |||
} | |||
finally { _connectionLock.Release(); } | |||
} | |||
private async Task ConnectInternalAsync(bool ignoreLoginCheck = false) | |||
private async Task ConnectInternalAsync(bool ignoreLoginCheck, bool isReconnecting) | |||
{ | |||
if (LoginState != LoginState.LoggedIn) | |||
throw new InvalidOperationException("You must log in before connecting or call ConnectAndAuthorizeAsync."); | |||
if (!ignoreLoginCheck && LoginState != LoginState.LoggedIn) | |||
throw new InvalidOperationException("You must log in before connecting."); | |||
if (!isReconnecting && _reconnectCancelToken != null && !_reconnectCancelToken.IsCancellationRequested) | |||
_reconnectCancelToken.Cancel(); | |||
if (_isFirstLogSub) | |||
{ | |||
@@ -173,7 +115,7 @@ namespace Discord | |||
var state = ConnectionState; | |||
if (state == ConnectionState.Connecting || state == ConnectionState.Connected) | |||
await DisconnectInternalAsync(null).ConfigureAwait(false); | |||
await DisconnectInternalAsync(null, isReconnecting).ConfigureAwait(false); | |||
ConnectionState = ConnectionState.Connecting; | |||
await _rpcLogger.InfoAsync("Connecting").ConfigureAwait(false); | |||
@@ -185,13 +127,13 @@ namespace Discord | |||
await _connectedEvent.InvokeAsync().ConfigureAwait(false); | |||
await _connectTask.Task.ConfigureAwait(false); | |||
_canReconnect = true; | |||
ConnectionState = ConnectionState.Connected; | |||
await _rpcLogger.InfoAsync("Connected").ConfigureAwait(false); | |||
} | |||
catch (Exception) | |||
{ | |||
await DisconnectInternalAsync(null).ConfigureAwait(false); | |||
await DisconnectInternalAsync(null, isReconnecting).ConfigureAwait(false); | |||
throw; | |||
} | |||
} | |||
@@ -202,12 +144,20 @@ namespace Discord | |||
try | |||
{ | |||
_isReconnecting = false; | |||
await DisconnectInternalAsync(null).ConfigureAwait(false); | |||
await DisconnectInternalAsync(null, false).ConfigureAwait(false); | |||
} | |||
finally { _connectionLock.Release(); } | |||
} | |||
private async Task DisconnectInternalAsync(Exception ex) | |||
private async Task DisconnectInternalAsync(Exception ex, bool isReconnecting) | |||
{ | |||
if (!isReconnecting) | |||
{ | |||
_canReconnect = false; | |||
if (_reconnectCancelToken != null && !_reconnectCancelToken.IsCancellationRequested) | |||
_reconnectCancelToken.Cancel(); | |||
} | |||
if (ConnectionState == ConnectionState.Disconnected) return; | |||
ConnectionState = ConnectionState.Disconnecting; | |||
await _rpcLogger.InfoAsync("Disconnecting").ConfigureAwait(false); | |||
@@ -228,53 +178,51 @@ namespace Discord | |||
private async Task StartReconnectAsync(Exception ex) | |||
{ | |||
//TODO: Is this thread-safe? | |||
if (_reconnectTask != null) return; | |||
_connectTask?.TrySetException(ex); | |||
await _connectionLock.WaitAsync().ConfigureAwait(false); | |||
try | |||
{ | |||
await DisconnectInternalAsync(ex).ConfigureAwait(false); | |||
if (_reconnectTask != null) return; | |||
_isReconnecting = true; | |||
_reconnectTask = ReconnectInternalAsync(); | |||
if (!_canReconnect || _reconnectTask != null) return; | |||
await DisconnectInternalAsync(null, true).ConfigureAwait(false); | |||
_reconnectCancelToken = new CancellationTokenSource(); | |||
_reconnectTask = ReconnectInternalAsync(_reconnectCancelToken.Token); | |||
} | |||
finally { _connectionLock.Release(); } | |||
} | |||
private async Task ReconnectInternalAsync() | |||
private async Task ReconnectInternalAsync(CancellationToken cancelToken) | |||
{ | |||
try | |||
{ | |||
Random jitter = new Random(); | |||
int nextReconnectDelay = 1000; | |||
while (_isReconnecting) | |||
while (true) | |||
{ | |||
await Task.Delay(nextReconnectDelay, cancelToken).ConfigureAwait(false); | |||
nextReconnectDelay = nextReconnectDelay * 2 + jitter.Next(-250, 250); | |||
if (nextReconnectDelay > 60000) | |||
nextReconnectDelay = 60000; | |||
await _connectionLock.WaitAsync().ConfigureAwait(false); | |||
try | |||
{ | |||
await Task.Delay(nextReconnectDelay).ConfigureAwait(false); | |||
nextReconnectDelay *= 2; | |||
if (nextReconnectDelay > 30000) | |||
nextReconnectDelay = 30000; | |||
await _connectionLock.WaitAsync().ConfigureAwait(false); | |||
try | |||
{ | |||
await ConnectInternalAsync().ConfigureAwait(false); | |||
} | |||
finally { _connectionLock.Release(); } | |||
if (cancelToken.IsCancellationRequested) return; | |||
await ConnectInternalAsync(false, true).ConfigureAwait(false); | |||
_reconnectTask = null; | |||
return; | |||
} | |||
catch (Exception ex) | |||
{ | |||
await _rpcLogger.WarningAsync("Reconnect failed", ex).ConfigureAwait(false); | |||
} | |||
finally { _connectionLock.Release(); } | |||
} | |||
} | |||
finally | |||
catch (OperationCanceledException) | |||
{ | |||
await _connectionLock.WaitAsync().ConfigureAwait(false); | |||
try | |||
{ | |||
_isReconnecting = false; | |||
await _rpcLogger.DebugAsync("Reconnect cancelled").ConfigureAwait(false); | |||
_reconnectTask = null; | |||
} | |||
finally { _connectionLock.Release(); } | |||
@@ -283,7 +231,7 @@ namespace Discord | |||
public async Task<string> AuthorizeAsync(string[] scopes) | |||
{ | |||
await ConnectAsync().ConfigureAwait(false); | |||
await ConnectAsync(true).ConfigureAwait(false); | |||
var result = await ApiClient.SendAuthorizeAsync(scopes).ConfigureAwait(false); | |||
await DisconnectAsync().ConfigureAwait(false); | |||
return result.Code; | |||
@@ -314,7 +262,8 @@ namespace Discord | |||
//CancellationToken = cancelToken //TODO: Implement | |||
}; | |||
await ApiClient.SendAuthenticateAsync(options).ConfigureAwait(false); //Has bearer | |||
if (LoginState != LoginState.LoggedOut) | |||
await ApiClient.SendAuthenticateAsync(options).ConfigureAwait(false); //Has bearer | |||
var __ = _connectTask.TrySetResultAsync(true); //Signal the .Connect() call to complete | |||
await _rpcLogger.InfoAsync("Ready").ConfigureAwait(false); | |||
@@ -2,7 +2,7 @@ | |||
namespace Discord | |||
{ | |||
public class DiscordRpcConfig : DiscordConfig | |||
public class DiscordRpcConfig : DiscordRestConfig | |||
{ | |||
public const int RpcAPIVersion = 1; | |||
@@ -3,6 +3,7 @@ using Discord.Audio; | |||
using Discord.Extensions; | |||
using Discord.Logging; | |||
using Discord.Net.Converters; | |||
using Discord.Net.Queue; | |||
using Discord.Net.WebSockets; | |||
using Newtonsoft.Json; | |||
using Newtonsoft.Json.Linq; | |||
@@ -52,6 +53,7 @@ namespace Discord | |||
internal DataStore DataStore { get; private set; } | |||
internal WebSocketProvider WebSocketProvider { get; private set; } | |||
public new API.DiscordSocketApiClient ApiClient => base.ApiClient as API.DiscordSocketApiClient; | |||
internal SocketSelfUser CurrentUser => _currentUser as SocketSelfUser; | |||
internal IReadOnlyCollection<SocketGuild> Guilds => DataStore.Guilds; | |||
internal IReadOnlyCollection<VoiceRegion> VoiceRegions => _voiceRegions.ToReadOnlyCollection(); | |||
@@ -60,7 +62,7 @@ namespace Discord | |||
public DiscordSocketClient() : this(new DiscordSocketConfig()) { } | |||
/// <summary> Creates a new REST/WebSocket discord client. </summary> | |||
public DiscordSocketClient(DiscordSocketConfig config) | |||
: base(config) | |||
: base(config, CreateApiClient(config)) | |||
{ | |||
ShardId = config.ShardId; | |||
TotalShards = config.TotalShards; | |||
@@ -106,8 +108,10 @@ namespace Discord | |||
_voiceRegions = ImmutableDictionary.Create<string, VoiceRegion>(); | |||
_largeGuilds = new ConcurrentQueue<ulong>(); | |||
} | |||
private static API.DiscordSocketApiClient CreateApiClient(DiscordSocketConfig config) | |||
=> new API.DiscordSocketApiClient(config.RestClientProvider, config.WebSocketProvider, requestQueue: new RequestQueue()); | |||
protected override async Task OnLoginAsync() | |||
protected override async Task OnLoginAsync(TokenType tokenType, string token) | |||
{ | |||
var voiceRegions = await ApiClient.GetVoiceRegionsAsync().ConfigureAwait(false); | |||
_voiceRegions = voiceRegions.Select(x => new VoiceRegion(x)).ToImmutableDictionary(x => x.Id); | |||
@@ -7,9 +7,9 @@ namespace Discord | |||
{ | |||
internal class SelfUser : User, ISelfUser | |||
{ | |||
private long _idleSince; | |||
private UserStatus _status; | |||
private Game _game; | |||
protected long _idleSince; | |||
protected UserStatus _status; | |||
protected Game _game; | |||
public string Email { get; private set; } | |||
public bool IsVerified { get; private set; } | |||
@@ -61,27 +61,7 @@ namespace Discord | |||
var model = await Discord.ApiClient.ModifySelfAsync(args).ConfigureAwait(false); | |||
Update(model, UpdateSource.Rest); | |||
} | |||
public async Task ModifyStatusAsync(Action<ModifyPresenceParams> func) | |||
{ | |||
if (func == null) throw new NullReferenceException(nameof(func)); | |||
var args = new ModifyPresenceParams(); | |||
func(args); | |||
var game = args._game.GetValueOrDefault(_game); | |||
var status = args._status.GetValueOrDefault(_status); | |||
long idleSince = _idleSince; | |||
if (status == UserStatus.Idle && _status != UserStatus.Idle) | |||
idleSince = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); | |||
var apiGame = game != null ? new API.Game { Name = game.Name, StreamType = game.StreamType, StreamUrl = game.StreamUrl } : null; | |||
await Discord.ApiClient.SendStatusUpdateAsync(status == UserStatus.Idle ? _idleSince : (long?)null, apiGame).ConfigureAwait(false); | |||
//Save values | |||
_idleSince = idleSince; | |||
_game = game; | |||
_status = status; | |||
} | |||
Task ISelfUser.ModifyStatusAsync(Action<ModifyPresenceParams> func) { throw new NotSupportedException(); } | |||
} | |||
} |
@@ -1,4 +1,6 @@ | |||
using System; | |||
using Discord.API.Rest; | |||
using System; | |||
using System.Threading.Tasks; | |||
using Model = Discord.API.User; | |||
namespace Discord | |||
@@ -15,6 +17,29 @@ namespace Discord | |||
{ | |||
} | |||
public async Task ModifyStatusAsync(Action<ModifyPresenceParams> func) | |||
{ | |||
if (func == null) throw new NullReferenceException(nameof(func)); | |||
var args = new ModifyPresenceParams(); | |||
func(args); | |||
var game = args._game.GetValueOrDefault(_game); | |||
var status = args._status.GetValueOrDefault(_status); | |||
long idleSince = _idleSince; | |||
if (status == UserStatus.Idle && _status != UserStatus.Idle) | |||
idleSince = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); | |||
var apiGame = game != null ? new API.Game { Name = game.Name, StreamType = game.StreamType, StreamUrl = game.StreamUrl } : null; | |||
await Discord.ApiClient.SendStatusUpdateAsync(status == UserStatus.Idle ? _idleSince : (long?)null, apiGame).ConfigureAwait(false); | |||
//Save values | |||
_idleSince = idleSince; | |||
_game = game; | |||
_status = status; | |||
} | |||
public SocketSelfUser Clone() => MemberwiseClone() as SocketSelfUser; | |||
ISocketUser ISocketUser.Clone() => Clone(); | |||
} | |||
@@ -13,7 +13,7 @@ namespace Discord | |||
{ | |||
ConnectionState ConnectionState { get; } | |||
DiscordApiClient ApiClient { get; } | |||
DiscordRestApiClient ApiClient { get; } | |||
ILogManager LogManager { get; } | |||
Task ConnectAsync(); | |||