The only account limit is for identify that is dealt in a different way (exclusive semaphore), so websocket queues can be shared with REST and don't need to be shared between clients anymore. Also added the ratelimit for presence updates.pull/1537/head
@@ -1,3 +1,5 @@ | |||
using System; | |||
namespace Discord.Rest | |||
{ | |||
/// <summary> | |||
@@ -9,13 +11,28 @@ namespace Discord.Rest | |||
/// Gets or sets the global limits for the gateway rate limiter. | |||
/// </summary> | |||
/// <remarks> | |||
/// This property includes all the other limits, like Identify. | |||
/// This property includes all the other limits, like Identify, | |||
/// and it is per websocket. | |||
/// </remarks> | |||
public GatewayLimit Global { get; set; } | |||
/// <summary> | |||
/// Gets or sets the limits of Identify requests. | |||
/// </summary> | |||
/// <remarks> | |||
/// This limit is included into <see cref="Global"/> but it is | |||
/// also per account. | |||
/// </remarks> | |||
public GatewayLimit Identify { get; set; } | |||
/// <summary> | |||
/// Gets or sets the limits of Presence Update requests. | |||
/// </summary> | |||
/// <remarks> | |||
/// Presence updates include activity (playing, watching, etc) | |||
/// and status (online, idle, etc) | |||
/// </remarks> | |||
public GatewayLimit PresenceUpdate { get; set; } | |||
public string IdentifySemaphoreName { get; set; } | |||
/// <summary> | |||
/// Initializes a new <see cref="GatewayLimits"/> with the default values. | |||
@@ -24,6 +41,8 @@ namespace Discord.Rest | |||
{ | |||
Global = new GatewayLimit(120, 60); | |||
Identify = new GatewayLimit(1, 5); | |||
PresenceUpdate = new GatewayLimit(5, 60); | |||
IdentifySemaphoreName = Guid.NewGuid().ToString(); | |||
} | |||
internal static GatewayLimits GetOrCreate(GatewayLimits limits) | |||
@@ -6,12 +6,14 @@ namespace Discord.Net.Queue | |||
public enum GatewayBucketType | |||
{ | |||
Unbucketed = 0, | |||
Identify = 1 | |||
Identify = 1, | |||
PresenceUpdate = 2, | |||
} | |||
internal struct GatewayBucket | |||
{ | |||
private static ImmutableDictionary<GatewayBucketType, GatewayBucket> DefsByType; | |||
private static ImmutableDictionary<string, GatewayBucket> DefsById; | |||
private static string IdentifySemaphoreName; | |||
static GatewayBucket() | |||
{ | |||
@@ -20,6 +22,7 @@ namespace Discord.Net.Queue | |||
public static GatewayBucket Get(GatewayBucketType type) => DefsByType[type]; | |||
public static GatewayBucket Get(string id) => DefsById[id]; | |||
public static string GetIdentifySemaphoreName() => IdentifySemaphoreName; | |||
public static void SetLimits(GatewayLimits limits) | |||
{ | |||
@@ -28,11 +31,14 @@ namespace Discord.Net.Queue | |||
Preconditions.GreaterThan(limits.Global.Seconds, 0, nameof(limits.Global.Seconds), "Global seconds must be greater than zero."); | |||
Preconditions.GreaterThan(limits.Identify.Count, 0, nameof(limits.Identify.Count), "Identify count must be greater than zero."); | |||
Preconditions.GreaterThan(limits.Identify.Seconds, 0, nameof(limits.Identify.Seconds), "Identify seconds must be greater than zero."); | |||
Preconditions.GreaterThan(limits.PresenceUpdate.Count, 0, nameof(limits.PresenceUpdate.Count), "PresenceUpdate count must be greater than zero."); | |||
Preconditions.GreaterThan(limits.PresenceUpdate.Seconds, 0, nameof(limits.PresenceUpdate.Seconds), "PresenceUpdate seconds must be greater than zero."); | |||
var buckets = new[] | |||
{ | |||
new GatewayBucket(GatewayBucketType.Unbucketed, "<gateway-unbucketed>", limits.Global.Count, limits.Global.Seconds), | |||
new GatewayBucket(GatewayBucketType.Identify, "<gateway-identify>", limits.Identify.Count, limits.Identify.Seconds) | |||
new GatewayBucket(GatewayBucketType.Identify, "<gateway-identify>", limits.Identify.Count, limits.Identify.Seconds), | |||
new GatewayBucket(GatewayBucketType.PresenceUpdate, "<gateway-presenceupdate>", limits.Identify.Count, limits.Identify.Seconds), | |||
}; | |||
var builder = ImmutableDictionary.CreateBuilder<GatewayBucketType, GatewayBucket>(); | |||
@@ -44,6 +50,8 @@ namespace Discord.Net.Queue | |||
foreach (var bucket in buckets) | |||
builder2.Add(bucket.Id, bucket); | |||
DefsById = builder2.ToImmutable(); | |||
IdentifySemaphoreName = limits.IdentifySemaphoreName; | |||
} | |||
public GatewayBucketType Type { get; } | |||
@@ -1,3 +1,4 @@ | |||
using Discord.Rest; | |||
using System; | |||
using System.Collections.Concurrent; | |||
#if DEBUG_LIMITS | |||
@@ -22,12 +23,15 @@ namespace Discord.Net.Queue | |||
private CancellationTokenSource _requestCancelTokenSource; | |||
private CancellationToken _requestCancelToken; //Parent token + Clear token | |||
private DateTimeOffset _waitUntil; | |||
private Semaphore _identifySemaphore; | |||
private Task _cleanupTask; | |||
public RequestQueue() | |||
{ | |||
_tokenLock = new SemaphoreSlim(1, 1); | |||
int semaphoreCount = GatewayBucket.Get(GatewayBucketType.Identify).WindowCount; | |||
_identifySemaphore = new Semaphore(semaphoreCount, semaphoreCount, GatewayBucket.GetIdentifySemaphoreName()); | |||
_clearToken = new CancellationTokenSource(); | |||
_cancelTokenSource = new CancellationTokenSource(); | |||
@@ -120,10 +124,22 @@ namespace Discord.Net.Queue | |||
} | |||
internal async Task EnterGlobalAsync(int id, WebSocketRequest request) | |||
{ | |||
//If this is a global request (unbucketed), it'll be dealt in EnterAsync | |||
var requestBucket = GatewayBucket.Get(request.Options.BucketId); | |||
if (requestBucket.Type == GatewayBucketType.Unbucketed) | |||
return; | |||
//Identify is per-account so we won't trigger global until we can actually go for it | |||
if (requestBucket.Type == GatewayBucketType.Identify) | |||
{ | |||
while (!_identifySemaphore.WaitOne(0)) //To not block the thread | |||
await Task.Delay(100, request.CancelToken); | |||
#if DEBUG_LIMITS | |||
Debug.WriteLine($"[{id}] Acquired identify ticket"); | |||
#endif | |||
} | |||
//It's not a global request, so need to remove one from global (per-session) | |||
var globalBucketType = GatewayBucket.Get(GatewayBucketType.Unbucketed); | |||
var options = RequestOptions.CreateOrClone(request.Options); | |||
options.BucketId = globalBucketType.Id; | |||
@@ -131,6 +147,13 @@ namespace Discord.Net.Queue | |||
var globalBucket = GetOrCreateBucket(globalBucketType.Id, globalRequest); | |||
await globalBucket.TriggerAsync(id, globalRequest); | |||
} | |||
internal void ReleaseIdentifySemaphore(int id) | |||
{ | |||
_identifySemaphore.Release(); | |||
#if DEBUG_LIMITS | |||
Debug.WriteLine($"[{id}] Released identify ticket"); | |||
#endif | |||
} | |||
private RequestBucket GetOrCreateBucket(string id, IRequest request) | |||
{ | |||
@@ -348,7 +348,7 @@ namespace Discord.Net.Queue | |||
#if DEBUG_LIMITS | |||
Debug.WriteLine($"[{id}] Reset in {(int)Math.Ceiling((resetTick - DateTimeOffset.UtcNow).Value.TotalMilliseconds)} ms"); | |||
#endif | |||
var _ = QueueReset(id, (int)Math.Ceiling((_resetTick.Value - DateTimeOffset.UtcNow).TotalMilliseconds)); | |||
var _ = QueueReset(id, (int)Math.Ceiling((_resetTick.Value - DateTimeOffset.UtcNow).TotalMilliseconds), request); | |||
} | |||
return; | |||
} | |||
@@ -372,12 +372,12 @@ namespace Discord.Net.Queue | |||
if (!hasQueuedReset) | |||
{ | |||
var _ = QueueReset(id, (int)Math.Ceiling((_resetTick.Value - DateTimeOffset.UtcNow).TotalMilliseconds)); | |||
var _ = QueueReset(id, (int)Math.Ceiling((_resetTick.Value - DateTimeOffset.UtcNow).TotalMilliseconds), request); | |||
} | |||
} | |||
} | |||
} | |||
private async Task QueueReset(int id, int millis) | |||
private async Task QueueReset(int id, int millis, IRequest request) | |||
{ | |||
while (true) | |||
{ | |||
@@ -391,6 +391,8 @@ namespace Discord.Net.Queue | |||
#if DEBUG_LIMITS | |||
Debug.WriteLine($"[{id}] * Reset *"); | |||
#endif | |||
if (request is WebSocketRequest webSocketRequest && webSocketRequest.Options.BucketId == GatewayBucket.Get(GatewayBucketType.Identify).Id) | |||
_queue.ReleaseIdentifySemaphore(id); | |||
_semaphore = WindowCount; | |||
_resetTick = null; | |||
return; | |||
@@ -80,7 +80,7 @@ namespace Discord.WebSocket | |||
internal BaseSocketClient(DiscordSocketConfig config, DiscordRestApiClient client) | |||
: base(config, client) => BaseConfig = config; | |||
private static DiscordSocketApiClient CreateApiClient(DiscordSocketConfig config) | |||
=> new DiscordSocketApiClient(config.RestClientProvider, config.WebSocketProvider, DiscordRestConfig.UserAgent, config.WebsocketRequestQueue, | |||
=> new DiscordSocketApiClient(config.RestClientProvider, config.WebSocketProvider, DiscordRestConfig.UserAgent, | |||
rateLimitPrecision: config.RateLimitPrecision, | |||
useSystemClock: config.UseSystemClock); | |||
@@ -85,17 +85,9 @@ namespace Discord.WebSocket | |||
RegisterEvents(_shards[i], i == 0); | |||
} | |||
} | |||
ApiClient.WebSocketRequestQueue.RateLimitTriggered += async (id, info) => | |||
{ | |||
if (info == null) | |||
await _restLogger.VerboseAsync($"Preemptive Rate limit triggered: {id ?? "null"}").ConfigureAwait(false); | |||
else | |||
await _restLogger.WarningAsync($"Rate limit triggered: {id ?? "null"}").ConfigureAwait(false); | |||
}; | |||
} | |||
private static API.DiscordSocketApiClient CreateApiClient(DiscordSocketConfig config) | |||
=> new API.DiscordSocketApiClient(config.RestClientProvider, config.WebSocketProvider, DiscordRestConfig.UserAgent, config.WebsocketRequestQueue, | |||
=> new API.DiscordSocketApiClient(config.RestClientProvider, config.WebSocketProvider, DiscordRestConfig.UserAgent, | |||
rateLimitPrecision: config.RateLimitPrecision); | |||
internal override async Task OnLoginAsync(TokenType tokenType, string token) | |||
@@ -37,17 +37,7 @@ namespace Discord.API | |||
public ConnectionState ConnectionState { get; private set; } | |||
internal RequestQueue WebSocketRequestQueue { get; } | |||
public DiscordSocketApiClient(RestClientProvider restClientProvider, WebSocketProvider webSocketProvider, string userAgent, | |||
string url = null, RetryMode defaultRetryMode = RetryMode.AlwaysRetry, JsonSerializer serializer = null, | |||
RateLimitPrecision rateLimitPrecision = RateLimitPrecision.Second, | |||
bool useSystemClock = true) | |||
: this(restClientProvider, webSocketProvider, userAgent, null, url, defaultRetryMode, serializer, rateLimitPrecision, useSystemClock) | |||
{ | |||
} | |||
internal DiscordSocketApiClient(RestClientProvider restClientProvider, WebSocketProvider webSocketProvider, string userAgent, RequestQueue websocketRequestQueue, | |||
string url = null, RetryMode defaultRetryMode = RetryMode.AlwaysRetry, JsonSerializer serializer = null, | |||
RateLimitPrecision rateLimitPrecision = RateLimitPrecision.Second, | |||
bool useSystemClock = true) | |||
@@ -58,7 +48,6 @@ namespace Discord.API | |||
_isExplicitUrl = true; | |||
WebSocketClient = webSocketProvider(); | |||
//WebSocketClient.SetHeader("user-agent", DiscordConfig.UserAgent); (Causes issues in .NET Framework 4.6+) | |||
WebSocketRequestQueue = websocketRequestQueue ?? new RequestQueue(); | |||
WebSocketClient.BinaryMessage += async (data, index, count) => | |||
{ | |||
@@ -218,8 +207,9 @@ namespace Discord.API | |||
bytes = Encoding.UTF8.GetBytes(SerializeJson(payload)); | |||
options.IsGatewayBucket = true; | |||
options.BucketId = GatewayBucket.Get(opCode == GatewayOpCode.Identify ? GatewayBucketType.Identify : GatewayBucketType.Unbucketed).Id; | |||
await WebSocketRequestQueue.SendAsync(new WebSocketRequest(WebSocketClient, bytes, true, options)).ConfigureAwait(false); | |||
if (string.IsNullOrEmpty(options.BucketId)) | |||
options.BucketId = GatewayBucket.Get(GatewayBucketType.Unbucketed).Id; | |||
await RequestQueue.SendAsync(new WebSocketRequest(WebSocketClient, bytes, true, options)).ConfigureAwait(false); | |||
await _sentGatewayMessageEvent.InvokeAsync(opCode).ConfigureAwait(false); | |||
} | |||
@@ -240,6 +230,7 @@ namespace Discord.API | |||
if (totalShards > 1) | |||
msg.ShardingParams = new int[] { shardID, totalShards }; | |||
options.BucketId = GatewayBucket.Get(GatewayBucketType.Identify).Id; | |||
await SendGatewayAsync(GatewayOpCode.Identify, msg, options: options).ConfigureAwait(false); | |||
} | |||
public async Task SendResumeAsync(string sessionId, int lastSeq, RequestOptions options = null) | |||
@@ -268,6 +259,7 @@ namespace Discord.API | |||
IsAFK = isAFK, | |||
Game = game | |||
}; | |||
options.BucketId = GatewayBucket.Get(GatewayBucketType.PresenceUpdate).Id; | |||
await SendGatewayAsync(GatewayOpCode.StatusUpdate, args, options: options).ConfigureAwait(false); | |||
} | |||
public async Task SendRequestMembersAsync(IEnumerable<ulong> guildIds, RequestOptions options = null) | |||
@@ -122,14 +122,6 @@ namespace Discord.WebSocket | |||
public DiscordSocketClient(DiscordSocketConfig config) : this(config, CreateApiClient(config), null, null) | |||
{ | |||
GatewayBucket.SetLimits(GatewayLimits.GetOrCreate(config.GatewayLimits)); | |||
ApiClient.WebSocketRequestQueue.RateLimitTriggered += async (id, info) => | |||
{ | |||
if (info == null) | |||
await _restLogger.VerboseAsync($"Preemptive Rate limit triggered: {id ?? "null"}").ConfigureAwait(false); | |||
else | |||
await _restLogger.WarningAsync($"Rate limit triggered: {id ?? "null"}").ConfigureAwait(false); | |||
}; | |||
} | |||
internal DiscordSocketClient(DiscordSocketConfig config, SemaphoreSlim groupLock, DiscordSocketClient parentClient) : this(config, CreateApiClient(config), groupLock, parentClient) { } | |||
#pragma warning restore IDISP004 | |||
@@ -190,7 +182,7 @@ namespace Discord.WebSocket | |||
_largeGuilds = new ConcurrentQueue<ulong>(); | |||
} | |||
private static API.DiscordSocketApiClient CreateApiClient(DiscordSocketConfig config) | |||
=> new API.DiscordSocketApiClient(config.RestClientProvider, config.WebSocketProvider, DiscordRestConfig.UserAgent, config.WebsocketRequestQueue, config.GatewayHost, | |||
=> new API.DiscordSocketApiClient(config.RestClientProvider, config.WebSocketProvider, DiscordRestConfig.UserAgent, config.GatewayHost, | |||
rateLimitPrecision: config.RateLimitPrecision); | |||
/// <inheritdoc /> | |||
internal override void Dispose(bool disposing) | |||
@@ -135,8 +135,6 @@ namespace Discord.WebSocket | |||
/// </remarks> | |||
public GatewayLimits GatewayLimits { get; set; } = new GatewayLimits(); | |||
internal RequestQueue WebsocketRequestQueue { get; } = new RequestQueue(); | |||
/// <summary> | |||
/// Initializes a default configuration. | |||
/// </summary> | |||