@@ -18,7 +18,7 @@ namespace Discord | |||
public event Func<LogMessage, Task> Log; | |||
public event Func<Task> LoggedIn, LoggedOut; | |||
internal readonly Logger _discordLogger, _restLogger; | |||
internal readonly Logger _discordLogger, _restLogger, _queueLogger; | |||
internal readonly SemaphoreSlim _connectionLock; | |||
internal readonly LogManager _log; | |||
internal readonly RequestQueue _requestQueue; | |||
@@ -38,12 +38,20 @@ namespace Discord | |||
_log.Message += async msg => await Log.RaiseAsync(msg).ConfigureAwait(false); | |||
_discordLogger = _log.CreateLogger("Discord"); | |||
_restLogger = _log.CreateLogger("Rest"); | |||
_queueLogger = _log.CreateLogger("Queue"); | |||
_connectionLock = new SemaphoreSlim(1, 1); | |||
_requestQueue = new RequestQueue(); | |||
_requestQueue.RateLimitTriggered += async (id, bucket, millis) => | |||
{ | |||
await _queueLogger.WarningAsync($"Rate limit triggered (id = \"{id ?? "null"}\")").ConfigureAwait(false); | |||
if (bucket == null && id != null) | |||
await _queueLogger.WarningAsync($"Unknown rate limit bucket \"{id ?? "null"}\"").ConfigureAwait(false); | |||
}; | |||
ApiClient = new API.DiscordApiClient(config.RestClientProvider, (config as DiscordSocketConfig)?.WebSocketProvider, requestQueue: _requestQueue); | |||
ApiClient.SentRequest += async (method, endpoint, millis) => await _log.VerboseAsync("Rest", $"{method} {endpoint}: {millis} ms").ConfigureAwait(false); | |||
ApiClient.SentRequest += async (method, endpoint, millis) => await _restLogger.VerboseAsync($"{method} {endpoint}: {millis} ms").ConfigureAwait(false); | |||
} | |||
/// <inheritdoc /> | |||
@@ -21,8 +21,7 @@ namespace Discord | |||
//TODO: Add resume logic | |||
public class DiscordSocketClient : DiscordClient, IDiscordClient | |||
{ | |||
public event Func<Task> Connected, Disconnected; | |||
public event Func<Task> Ready; | |||
public event Func<Task> Connected, Disconnected, Ready; | |||
//public event Func<Channel> VoiceConnected, VoiceDisconnected; | |||
public event Func<IChannel, Task> ChannelCreated, ChannelDestroyed; | |||
public event Func<IChannel, IChannel, Task> ChannelUpdated; | |||
@@ -174,6 +173,7 @@ namespace Discord | |||
_connectTask = new TaskCompletionSource<bool>(); | |||
_cancelToken = new CancellationTokenSource(); | |||
await ApiClient.ConnectAsync().ConfigureAwait(false); | |||
await Connected.RaiseAsync().ConfigureAwait(false); | |||
await _connectTask.Task.ConfigureAwait(false); | |||
@@ -185,8 +185,6 @@ namespace Discord | |||
await DisconnectInternalAsync().ConfigureAwait(false); | |||
throw; | |||
} | |||
await Connected.RaiseAsync().ConfigureAwait(false); | |||
} | |||
/// <inheritdoc /> | |||
public async Task DisconnectAsync() | |||
@@ -1139,6 +1137,7 @@ namespace Discord | |||
private async Task RunHeartbeatAsync(int intervalMillis, CancellationToken cancelToken) | |||
{ | |||
//Clean this up when Discord's session patch is live | |||
try | |||
{ | |||
while (!cancelToken.IsCancellationRequested) | |||
@@ -1161,7 +1160,6 @@ namespace Discord | |||
} | |||
catch (OperationCanceledException) { } | |||
} | |||
private async Task WaitForGuildsAsync(CancellationToken cancelToken) | |||
{ | |||
while ((_unavailableGuilds != 0) && (Environment.TickCount - _lastGuildAvailableTime < 2000)) | |||
@@ -1,16 +0,0 @@ | |||
namespace Discord.Net.Queue | |||
{ | |||
internal struct BucketDefinition | |||
{ | |||
public int WindowCount { get; } | |||
public int WindowSeconds { get; } | |||
public GlobalBucket? Parent { get; } | |||
public BucketDefinition(int windowCount, int windowSeconds, GlobalBucket? parent = null) | |||
{ | |||
WindowCount = windowCount; | |||
WindowSeconds = windowSeconds; | |||
Parent = parent; | |||
} | |||
} | |||
} |
@@ -0,0 +1,30 @@ | |||
namespace Discord.Net.Queue | |||
{ | |||
public sealed class Bucket | |||
{ | |||
/// <summary> Gets the unique identifier for this bucket. </summary> | |||
public string Id { get; } | |||
/// <summary> Gets the name of this bucket. </summary> | |||
public string Name { get; } | |||
/// <summary> Gets the amount of requests that may be sent per window. </summary> | |||
public int WindowCount { get; } | |||
/// <summary> Gets the length of this bucket's window, in seconds. </summary> | |||
public int WindowSeconds { get; } | |||
/// <summary> Gets the type of account this bucket affects. </summary> | |||
public BucketTarget Target { get; } | |||
/// <summary> Gets this bucket's parent. </summary> | |||
public GlobalBucket? Parent { get; } | |||
internal Bucket(string id, int windowCount, int windowSeconds, BucketTarget target, GlobalBucket? parent = null) | |||
: this(id, id, windowCount, windowSeconds, target, parent) { } | |||
internal Bucket(string id, string name, int windowCount, int windowSeconds, BucketTarget target, GlobalBucket? parent = null) | |||
{ | |||
Id = id; | |||
Name = name; | |||
WindowCount = windowCount; | |||
WindowSeconds = windowSeconds; | |||
Target = target; | |||
Parent = parent; | |||
} | |||
} | |||
} |
@@ -3,6 +3,7 @@ | |||
public enum BucketGroup | |||
{ | |||
Global, | |||
Guild | |||
Guild, | |||
Channel | |||
} | |||
} |
@@ -0,0 +1,9 @@ | |||
namespace Discord.Net.Queue | |||
{ | |||
public enum BucketTarget | |||
{ | |||
Client, | |||
Bot, | |||
Both | |||
} | |||
} |
@@ -0,0 +1,7 @@ | |||
namespace Discord.Net.Queue | |||
{ | |||
public enum ChannelBucket | |||
{ | |||
SendEditMessage, | |||
} | |||
} |
@@ -10,52 +10,83 @@ namespace Discord.Net.Queue | |||
{ | |||
public class RequestQueue | |||
{ | |||
private readonly static ImmutableDictionary<GlobalBucket, BucketDefinition> _globalLimits; | |||
private readonly static ImmutableDictionary<GuildBucket, BucketDefinition> _guildLimits; | |||
public event Func<string, Bucket, int, Task> RateLimitTriggered; | |||
private readonly static ImmutableDictionary<GlobalBucket, Bucket> _globalLimits; | |||
private readonly static ImmutableDictionary<GuildBucket, Bucket> _guildLimits; | |||
private readonly static ImmutableDictionary<ChannelBucket, Bucket> _channelLimits; | |||
private readonly SemaphoreSlim _lock; | |||
private readonly RequestQueueBucket[] _globalBuckets; | |||
private readonly ConcurrentDictionary<ulong, RequestQueueBucket>[] _guildBuckets; | |||
private readonly ConcurrentDictionary<ulong, RequestQueueBucket>[] _channelBuckets; | |||
private CancellationTokenSource _clearToken; | |||
private CancellationToken _parentToken; | |||
private CancellationToken _cancelToken; | |||
static RequestQueue() | |||
{ | |||
_globalLimits = new Dictionary<GlobalBucket, BucketDefinition> | |||
_globalLimits = new Dictionary<GlobalBucket, Bucket> | |||
{ | |||
//REST | |||
[GlobalBucket.GeneralRest] = new BucketDefinition(0, 0), //No Limit | |||
[GlobalBucket.GeneralRest] = new Bucket(null, "rest", 0, 0, BucketTarget.Both), //No Limit | |||
//[GlobalBucket.Login] = new BucketDefinition(1, 1), | |||
[GlobalBucket.DirectMessage] = new BucketDefinition(5, 5), | |||
[GlobalBucket.SendEditMessage] = new BucketDefinition(50, 10), | |||
[GlobalBucket.DirectMessage] = new Bucket("bot:msg:dm", 5, 5, BucketTarget.Bot), | |||
[GlobalBucket.SendEditMessage] = new Bucket("bot:msg:global", 50, 10, BucketTarget.Bot), | |||
//Gateway | |||
[GlobalBucket.GeneralGateway] = new BucketDefinition(120, 60), | |||
[GlobalBucket.UpdateStatus] = new BucketDefinition(5, 1, GlobalBucket.GeneralGateway) | |||
[GlobalBucket.GeneralGateway] = new Bucket(null, "gateway", 120, 60, BucketTarget.Both), | |||
[GlobalBucket.UpdateStatus] = new Bucket(null, "status", 5, 1, BucketTarget.Both, GlobalBucket.GeneralGateway) | |||
}.ToImmutableDictionary(); | |||
_guildLimits = new Dictionary<GuildBucket, Bucket> | |||
{ | |||
//REST | |||
[GuildBucket.SendEditMessage] = new Bucket("bot:msg:server", 5, 5, BucketTarget.Bot, GlobalBucket.SendEditMessage), | |||
[GuildBucket.DeleteMessage] = new Bucket("dmsg", 5, 1, BucketTarget.Bot), | |||
[GuildBucket.DeleteMessages] = new Bucket("bdmsg", 1, 1, BucketTarget.Bot), | |||
[GuildBucket.ModifyMember] = new Bucket("guild_member", 10, 10, BucketTarget.Bot), | |||
[GuildBucket.Nickname] = new Bucket("guild_member_nick", 1, 1, BucketTarget.Bot) | |||
}.ToImmutableDictionary(); | |||
_guildLimits = new Dictionary<GuildBucket, BucketDefinition> | |||
//Client-Only | |||
_channelLimits = new Dictionary<ChannelBucket, Bucket> | |||
{ | |||
//REST | |||
[GuildBucket.SendEditMessage] = new BucketDefinition(5, 5, GlobalBucket.SendEditMessage), | |||
[GuildBucket.DeleteMessage] = new BucketDefinition(5, 1), | |||
[GuildBucket.DeleteMessages] = new BucketDefinition(1, 1), | |||
[GuildBucket.ModifyMember] = new BucketDefinition(10, 10), | |||
[GuildBucket.Nickname] = new BucketDefinition(1, 1) | |||
[ChannelBucket.SendEditMessage] = new Bucket("msg", 10, 10, BucketTarget.Client, GlobalBucket.SendEditMessage), | |||
}.ToImmutableDictionary(); | |||
} | |||
public static Bucket GetBucketInfo(GlobalBucket bucket) => _globalLimits[bucket]; | |||
public static Bucket GetBucketInfo(GuildBucket bucket) => _guildLimits[bucket]; | |||
public static Bucket GetBucketInfo(ChannelBucket bucket) => _channelLimits[bucket]; | |||
public RequestQueue() | |||
{ | |||
_lock = new SemaphoreSlim(1, 1); | |||
_globalBuckets = new RequestQueueBucket[_globalLimits.Count]; | |||
foreach (var pair in _globalLimits) | |||
_globalBuckets[(int)pair.Key] = CreateBucket(pair.Value); | |||
{ | |||
//var target = _globalLimits[pair.Key].Target; | |||
//if (target == BucketTarget.Both || (target == BucketTarget.Bot && isBot) || (target == BucketTarget.Client && !isBot)) | |||
_globalBuckets[(int)pair.Key] = CreateBucket(pair.Value); | |||
} | |||
_guildBuckets = new ConcurrentDictionary<ulong, RequestQueueBucket>[_guildLimits.Count]; | |||
for (int i = 0; i < _guildLimits.Count; i++) | |||
_guildBuckets[i] = new ConcurrentDictionary<ulong, RequestQueueBucket>(); | |||
{ | |||
//var target = _guildLimits[(GuildBucket)i].Target; | |||
//if (target == BucketTarget.Both || (target == BucketTarget.Bot && isBot) || (target == BucketTarget.Client && !isBot)) | |||
_guildBuckets[i] = new ConcurrentDictionary<ulong, RequestQueueBucket>(); | |||
} | |||
_channelBuckets = new ConcurrentDictionary<ulong, RequestQueueBucket>[_channelLimits.Count]; | |||
for (int i = 0; i < _channelLimits.Count; i++) | |||
{ | |||
//var target = _channelLimits[(GuildBucket)i].Target; | |||
//if (target == BucketTarget.Both || (target == BucketTarget.Bot && isBot) || (target == BucketTarget.Client && !isBot)) | |||
_channelBuckets[i] = new ConcurrentDictionary<ulong, RequestQueueBucket>(); | |||
} | |||
_clearToken = new CancellationTokenSource(); | |||
_cancelToken = CancellationToken.None; | |||
@@ -72,23 +103,23 @@ namespace Discord.Net.Queue | |||
finally { _lock.Release(); } | |||
} | |||
internal async Task<Stream> SendAsync(RestRequest request, BucketGroup group, int bucketId, ulong guildId) | |||
internal async Task<Stream> SendAsync(RestRequest request, BucketGroup group, int bucketId, ulong objId) | |||
{ | |||
request.CancelToken = _cancelToken; | |||
var bucket = GetBucket(group, bucketId, guildId); | |||
var bucket = GetBucket(group, bucketId, objId); | |||
return await bucket.SendAsync(request).ConfigureAwait(false); | |||
} | |||
internal async Task<Stream> SendAsync(WebSocketRequest request, BucketGroup group, int bucketId, ulong guildId) | |||
internal async Task<Stream> SendAsync(WebSocketRequest request, BucketGroup group, int bucketId, ulong objId) | |||
{ | |||
request.CancelToken = _cancelToken; | |||
var bucket = GetBucket(group, bucketId, guildId); | |||
var bucket = GetBucket(group, bucketId, objId); | |||
return await bucket.SendAsync(request).ConfigureAwait(false); | |||
} | |||
private RequestQueueBucket CreateBucket(BucketDefinition def) | |||
private RequestQueueBucket CreateBucket(Bucket def) | |||
{ | |||
var parent = def.Parent != null ? GetGlobalBucket(def.Parent.Value) : null; | |||
return new RequestQueueBucket(def.WindowCount, def.WindowSeconds * 1000, parent); | |||
return new RequestQueueBucket(this, def, parent); | |||
} | |||
public void DestroyGuildBucket(GuildBucket type, ulong guildId) | |||
@@ -97,15 +128,23 @@ namespace Discord.Net.Queue | |||
RequestQueueBucket bucket; | |||
_guildBuckets[(int)type].TryRemove(guildId, out bucket); | |||
} | |||
public void DestroyChannelBucket(ChannelBucket type, ulong channelId) | |||
{ | |||
//Assume this object is locked | |||
RequestQueueBucket bucket; | |||
_channelBuckets[(int)type].TryRemove(channelId, out bucket); | |||
} | |||
private RequestQueueBucket GetBucket(BucketGroup group, int bucketId, ulong guildId) | |||
private RequestQueueBucket GetBucket(BucketGroup group, int bucketId, ulong objId) | |||
{ | |||
switch (group) | |||
{ | |||
case BucketGroup.Global: | |||
return GetGlobalBucket((GlobalBucket)bucketId); | |||
case BucketGroup.Guild: | |||
return GetGuildBucket((GuildBucket)bucketId, guildId); | |||
return GetGuildBucket((GuildBucket)bucketId, objId); | |||
case BucketGroup.Channel: | |||
return GetChannelBucket((ChannelBucket)bucketId, objId); | |||
default: | |||
throw new ArgumentException($"Unknown bucket group: {group}", nameof(group)); | |||
} | |||
@@ -118,6 +157,10 @@ namespace Discord.Net.Queue | |||
{ | |||
return _guildBuckets[(int)type].GetOrAdd(guildId, _ => CreateBucket(_guildLimits[type])); | |||
} | |||
private RequestQueueBucket GetChannelBucket(ChannelBucket type, ulong channelId) | |||
{ | |||
return _channelBuckets[(int)type].GetOrAdd(channelId, _ => CreateBucket(_channelLimits[type])); | |||
} | |||
public async Task ClearAsync() | |||
{ | |||
@@ -133,5 +176,10 @@ namespace Discord.Net.Queue | |||
} | |||
finally { _lock.Release(); } | |||
} | |||
internal async Task RaiseRateLimitTriggered(string id, Bucket bucket, int millis) | |||
{ | |||
await RateLimitTriggered.Invoke(id, bucket, millis).ConfigureAwait(false); | |||
} | |||
} | |||
} |
@@ -9,27 +9,51 @@ namespace Discord.Net.Queue | |||
{ | |||
internal class RequestQueueBucket | |||
{ | |||
private readonly int _windowMilliseconds; | |||
private readonly RequestQueue _queue; | |||
private readonly SemaphoreSlim _semaphore; | |||
private readonly object _pauseLock; | |||
private int _pauseEndTick; | |||
private TaskCompletionSource<byte> _resumeNotifier; | |||
public Bucket Definition { get; } | |||
public RequestQueueBucket Parent { get; } | |||
public Task _resetTask { get; } | |||
public RequestQueueBucket(int windowCount, int windowMilliseconds, RequestQueueBucket parent = null) | |||
public RequestQueueBucket(RequestQueue queue, Bucket definition, RequestQueueBucket parent = null) | |||
{ | |||
if (windowCount != 0) | |||
_semaphore = new SemaphoreSlim(windowCount, windowCount); | |||
_queue = queue; | |||
Definition = definition; | |||
if (definition.WindowCount != 0) | |||
_semaphore = new SemaphoreSlim(definition.WindowCount, definition.WindowCount); | |||
Parent = parent; | |||
_pauseLock = new object(); | |||
_resumeNotifier = new TaskCompletionSource<byte>(); | |||
_resumeNotifier.SetResult(0); | |||
_windowMilliseconds = windowMilliseconds; | |||
Parent = parent; | |||
} | |||
public async Task<Stream> SendAsync(IQueuedRequest request) | |||
{ | |||
while (true) | |||
{ | |||
try | |||
{ | |||
return await SendAsyncInternal(request).ConfigureAwait(false); | |||
} | |||
catch (HttpRateLimitException ex) | |||
{ | |||
//When a 429 occurs, we drop all our locks, including the ones we wanted. | |||
//This is generally safe though since 429s actually occuring should be very rare. | |||
RequestQueueBucket bucket; | |||
bool success = FindBucket(ex.BucketId, out bucket); | |||
await _queue.RaiseRateLimitTriggered(ex.BucketId, success ? bucket.Definition : (Bucket)null, ex.RetryAfterMilliseconds).ConfigureAwait(false); | |||
bucket.Pause(ex.RetryAfterMilliseconds); | |||
} | |||
} | |||
} | |||
private async Task<Stream> SendAsyncInternal(IQueuedRequest request) | |||
{ | |||
var endTick = request.TimeoutTick; | |||
@@ -64,7 +88,7 @@ namespace Discord.Net.Queue | |||
{ | |||
//If there's a parent bucket, pass this request to them | |||
if (Parent != null) | |||
return await Parent.SendAsync(request).ConfigureAwait(false); | |||
return await Parent.SendAsyncInternal(request).ConfigureAwait(false); | |||
//We have all our semaphores, send the request | |||
return await request.SendAsync().ConfigureAwait(false); | |||
@@ -73,11 +97,6 @@ namespace Discord.Net.Queue | |||
{ | |||
continue; | |||
} | |||
catch (HttpRateLimitException ex) | |||
{ | |||
Pause(ex.RetryAfterMilliseconds); | |||
continue; | |||
} | |||
} | |||
} | |||
finally | |||
@@ -88,6 +107,23 @@ namespace Discord.Net.Queue | |||
} | |||
} | |||
private bool FindBucket(string id, out RequestQueueBucket bucket) | |||
{ | |||
//Keep going up until we find a bucket with matching id or we're at the topmost bucket | |||
if (Definition.Id == id) | |||
{ | |||
bucket = this; | |||
return true; | |||
} | |||
else if (Parent == null) | |||
{ | |||
bucket = this; | |||
return false; | |||
} | |||
else | |||
return Parent.FindBucket(id, out bucket); | |||
} | |||
private void Pause(int milliseconds) | |||
{ | |||
lock (_pauseLock) | |||
@@ -120,7 +156,7 @@ namespace Discord.Net.Queue | |||
} | |||
private async Task QueueExitAsync() | |||
{ | |||
await Task.Delay(_windowMilliseconds).ConfigureAwait(false); | |||
await Task.Delay(Definition.WindowSeconds * 1000).ConfigureAwait(false); | |||
_semaphore.Release(); | |||
} | |||
} | |||
@@ -4,6 +4,7 @@ namespace Discord.Net | |||
{ | |||
public class HttpRateLimitException : HttpException | |||
{ | |||
public string BucketId { get; } | |||
public int RetryAfterMilliseconds { get; } | |||
public HttpRateLimitException(int retryAfterMilliseconds) | |||