- BucketId is it's own class now - Add WebhookId as a major parameter - Add shared buckets using the hash and major parameterspull/1546/head
@@ -0,0 +1,66 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Collections.Immutable; | |||||
using System.Linq; | |||||
namespace Discord.Net | |||||
{ | |||||
public class BucketId : IEquatable<BucketId> | |||||
{ | |||||
public string HttpMethod { get; } | |||||
public string Endpoint { get; } | |||||
public IOrderedEnumerable<KeyValuePair<string, string>> MajorParams { get; } | |||||
public string BucketHash { get; } | |||||
public bool IsHashBucket { get => BucketHash != null; } | |||||
private BucketId(string httpMethod, string endpoint, IEnumerable<KeyValuePair<string, string>> majorParams, string bucketHash) | |||||
{ | |||||
HttpMethod = httpMethod; | |||||
Endpoint = endpoint; | |||||
MajorParams = majorParams.OrderBy(x => x.Key); | |||||
BucketHash = bucketHash; | |||||
} | |||||
public static BucketId Create(string httpMethod, string endpoint, Dictionary<string, string> majorParams) | |||||
{ | |||||
Preconditions.NotNullOrWhitespace(httpMethod, nameof(httpMethod)); | |||||
Preconditions.NotNullOrWhitespace(endpoint, nameof(endpoint)); | |||||
majorParams ??= new Dictionary<string, string>(); | |||||
return new BucketId(httpMethod, endpoint, majorParams, null); | |||||
} | |||||
public static BucketId Create(string hash, BucketId oldBucket) | |||||
{ | |||||
Preconditions.NotNullOrWhitespace(hash, nameof(hash)); | |||||
Preconditions.NotNull(oldBucket, nameof(oldBucket)); | |||||
return new BucketId(null, null, oldBucket.MajorParams, hash); | |||||
} | |||||
public string GetBucketHash() | |||||
=> IsHashBucket ? $"{BucketHash}:{string.Join("/", MajorParams.Select(x => x.Value))}" : null; | |||||
public string GetUniqueEndpoint() | |||||
=> HttpMethod != null ? $"{HttpMethod} {Endpoint}" : Endpoint; | |||||
public override bool Equals(object obj) | |||||
=> Equals(obj as BucketId); | |||||
public override int GetHashCode() | |||||
=> IsHashBucket ? (BucketHash, string.Join("/", MajorParams.Select(x => x.Value))).GetHashCode() : (HttpMethod, Endpoint).GetHashCode(); | |||||
public override string ToString() | |||||
=> GetBucketHash() ?? GetUniqueEndpoint(); | |||||
public bool Equals(BucketId other) | |||||
{ | |||||
if (other is null) | |||||
return false; | |||||
if (ReferenceEquals(this, other)) | |||||
return true; | |||||
if (GetType() != other.GetType()) | |||||
return false; | |||||
return ToString() == other.ToString(); | |||||
} | |||||
} | |||||
} |
@@ -1,3 +1,4 @@ | |||||
using Discord.Net; | |||||
using System.Threading; | using System.Threading; | ||||
namespace Discord | namespace Discord | ||||
@@ -57,7 +58,7 @@ namespace Discord | |||||
public bool? UseSystemClock { get; set; } | public bool? UseSystemClock { get; set; } | ||||
internal bool IgnoreState { get; set; } | internal bool IgnoreState { get; set; } | ||||
internal string BucketId { get; set; } | |||||
internal BucketId BucketId { get; set; } | |||||
internal bool IsClientBucket { get; set; } | internal bool IsClientBucket { get; set; } | ||||
internal bool IsReactionBucket { get; set; } | internal bool IsReactionBucket { get; set; } | ||||
@@ -49,9 +49,9 @@ namespace Discord.Rest | |||||
ApiClient.RequestQueue.RateLimitTriggered += async (id, info) => | ApiClient.RequestQueue.RateLimitTriggered += async (id, info) => | ||||
{ | { | ||||
if (info == null) | if (info == null) | ||||
await _restLogger.VerboseAsync($"Preemptive Rate limit triggered: {id ?? "null"}").ConfigureAwait(false); | |||||
await _restLogger.VerboseAsync($"Preemptive Rate limit triggered: {id?.ToString() ?? "null"}").ConfigureAwait(false); | |||||
else | else | ||||
await _restLogger.WarningAsync($"Rate limit triggered: {id ?? "null"}").ConfigureAwait(false); | |||||
await _restLogger.WarningAsync($"Rate limit triggered: {id?.ToString() ?? "null"}").ConfigureAwait(false); | |||||
}; | }; | ||||
ApiClient.SentRequest += async (method, endpoint, millis) => await _restLogger.VerboseAsync($"{method} {endpoint}: {millis} ms").ConfigureAwait(false); | ApiClient.SentRequest += async (method, endpoint, millis) => await _restLogger.VerboseAsync($"{method} {endpoint}: {millis} ms").ConfigureAwait(false); | ||||
} | } | ||||
@@ -24,7 +24,7 @@ namespace Discord.API | |||||
{ | { | ||||
internal class DiscordRestApiClient : IDisposable | internal class DiscordRestApiClient : IDisposable | ||||
{ | { | ||||
private static readonly ConcurrentDictionary<string, Func<BucketIds, string>> _bucketIdGenerators = new ConcurrentDictionary<string, Func<BucketIds, string>>(); | |||||
private static readonly ConcurrentDictionary<string, Func<BucketIds, BucketId>> _bucketIdGenerators = new ConcurrentDictionary<string, Func<BucketIds, BucketId>>(); | |||||
public event Func<string, string, double, Task> SentRequest { add { _sentRequestEvent.Add(value); } remove { _sentRequestEvent.Remove(value); } } | public event Func<string, string, double, Task> SentRequest { add { _sentRequestEvent.Add(value); } remove { _sentRequestEvent.Remove(value); } } | ||||
private readonly AsyncEvent<Func<string, string, double, Task>> _sentRequestEvent = new AsyncEvent<Func<string, string, double, Task>>(); | private readonly AsyncEvent<Func<string, string, double, Task>> _sentRequestEvent = new AsyncEvent<Func<string, string, double, Task>>(); | ||||
@@ -182,7 +182,7 @@ namespace Discord.API | |||||
ClientBucketType clientBucket = ClientBucketType.Unbucketed, RequestOptions options = null, [CallerMemberName] string funcName = null) | ClientBucketType clientBucket = ClientBucketType.Unbucketed, RequestOptions options = null, [CallerMemberName] string funcName = null) | ||||
=> SendAsync(method, GetEndpoint(endpointExpr), GetBucketId(method, ids, endpointExpr, funcName), clientBucket, options); | => SendAsync(method, GetEndpoint(endpointExpr), GetBucketId(method, ids, endpointExpr, funcName), clientBucket, options); | ||||
public async Task SendAsync(string method, string endpoint, | public async Task SendAsync(string method, string endpoint, | ||||
string bucketId = null, ClientBucketType clientBucket = ClientBucketType.Unbucketed, RequestOptions options = null) | |||||
BucketId bucketId = null, ClientBucketType clientBucket = ClientBucketType.Unbucketed, RequestOptions options = null) | |||||
{ | { | ||||
options = options ?? new RequestOptions(); | options = options ?? new RequestOptions(); | ||||
options.HeaderOnly = true; | options.HeaderOnly = true; | ||||
@@ -196,7 +196,7 @@ namespace Discord.API | |||||
ClientBucketType clientBucket = ClientBucketType.Unbucketed, RequestOptions options = null, [CallerMemberName] string funcName = null) | ClientBucketType clientBucket = ClientBucketType.Unbucketed, RequestOptions options = null, [CallerMemberName] string funcName = null) | ||||
=> SendJsonAsync(method, GetEndpoint(endpointExpr), payload, GetBucketId(method, ids, endpointExpr, funcName), clientBucket, options); | => SendJsonAsync(method, GetEndpoint(endpointExpr), payload, GetBucketId(method, ids, endpointExpr, funcName), clientBucket, options); | ||||
public async Task SendJsonAsync(string method, string endpoint, object payload, | public async Task SendJsonAsync(string method, string endpoint, object payload, | ||||
string bucketId = null, ClientBucketType clientBucket = ClientBucketType.Unbucketed, RequestOptions options = null) | |||||
BucketId bucketId = null, ClientBucketType clientBucket = ClientBucketType.Unbucketed, RequestOptions options = null) | |||||
{ | { | ||||
options = options ?? new RequestOptions(); | options = options ?? new RequestOptions(); | ||||
options.HeaderOnly = true; | options.HeaderOnly = true; | ||||
@@ -211,7 +211,7 @@ namespace Discord.API | |||||
ClientBucketType clientBucket = ClientBucketType.Unbucketed, RequestOptions options = null, [CallerMemberName] string funcName = null) | ClientBucketType clientBucket = ClientBucketType.Unbucketed, RequestOptions options = null, [CallerMemberName] string funcName = null) | ||||
=> SendMultipartAsync(method, GetEndpoint(endpointExpr), multipartArgs, GetBucketId(method, ids, endpointExpr, funcName), clientBucket, options); | => SendMultipartAsync(method, GetEndpoint(endpointExpr), multipartArgs, GetBucketId(method, ids, endpointExpr, funcName), clientBucket, options); | ||||
public async Task SendMultipartAsync(string method, string endpoint, IReadOnlyDictionary<string, object> multipartArgs, | public async Task SendMultipartAsync(string method, string endpoint, IReadOnlyDictionary<string, object> multipartArgs, | ||||
string bucketId = null, ClientBucketType clientBucket = ClientBucketType.Unbucketed, RequestOptions options = null) | |||||
BucketId bucketId = null, ClientBucketType clientBucket = ClientBucketType.Unbucketed, RequestOptions options = null) | |||||
{ | { | ||||
options = options ?? new RequestOptions(); | options = options ?? new RequestOptions(); | ||||
options.HeaderOnly = true; | options.HeaderOnly = true; | ||||
@@ -225,7 +225,7 @@ namespace Discord.API | |||||
ClientBucketType clientBucket = ClientBucketType.Unbucketed, RequestOptions options = null, [CallerMemberName] string funcName = null) where TResponse : class | ClientBucketType clientBucket = ClientBucketType.Unbucketed, RequestOptions options = null, [CallerMemberName] string funcName = null) where TResponse : class | ||||
=> SendAsync<TResponse>(method, GetEndpoint(endpointExpr), GetBucketId(method, ids, endpointExpr, funcName), clientBucket, options); | => SendAsync<TResponse>(method, GetEndpoint(endpointExpr), GetBucketId(method, ids, endpointExpr, funcName), clientBucket, options); | ||||
public async Task<TResponse> SendAsync<TResponse>(string method, string endpoint, | public async Task<TResponse> SendAsync<TResponse>(string method, string endpoint, | ||||
string bucketId = null, ClientBucketType clientBucket = ClientBucketType.Unbucketed, RequestOptions options = null) where TResponse : class | |||||
BucketId bucketId = null, ClientBucketType clientBucket = ClientBucketType.Unbucketed, RequestOptions options = null) where TResponse : class | |||||
{ | { | ||||
options = options ?? new RequestOptions(); | options = options ?? new RequestOptions(); | ||||
options.BucketId = bucketId; | options.BucketId = bucketId; | ||||
@@ -238,7 +238,7 @@ namespace Discord.API | |||||
ClientBucketType clientBucket = ClientBucketType.Unbucketed, RequestOptions options = null, [CallerMemberName] string funcName = null) where TResponse : class | ClientBucketType clientBucket = ClientBucketType.Unbucketed, RequestOptions options = null, [CallerMemberName] string funcName = null) where TResponse : class | ||||
=> SendJsonAsync<TResponse>(method, GetEndpoint(endpointExpr), payload, GetBucketId(method, ids, endpointExpr, funcName), clientBucket, options); | => SendJsonAsync<TResponse>(method, GetEndpoint(endpointExpr), payload, GetBucketId(method, ids, endpointExpr, funcName), clientBucket, options); | ||||
public async Task<TResponse> SendJsonAsync<TResponse>(string method, string endpoint, object payload, | public async Task<TResponse> SendJsonAsync<TResponse>(string method, string endpoint, object payload, | ||||
string bucketId = null, ClientBucketType clientBucket = ClientBucketType.Unbucketed, RequestOptions options = null) where TResponse : class | |||||
BucketId bucketId = null, ClientBucketType clientBucket = ClientBucketType.Unbucketed, RequestOptions options = null) where TResponse : class | |||||
{ | { | ||||
options = options ?? new RequestOptions(); | options = options ?? new RequestOptions(); | ||||
options.BucketId = bucketId; | options.BucketId = bucketId; | ||||
@@ -252,7 +252,7 @@ namespace Discord.API | |||||
ClientBucketType clientBucket = ClientBucketType.Unbucketed, RequestOptions options = null, [CallerMemberName] string funcName = null) | ClientBucketType clientBucket = ClientBucketType.Unbucketed, RequestOptions options = null, [CallerMemberName] string funcName = null) | ||||
=> SendMultipartAsync<TResponse>(method, GetEndpoint(endpointExpr), multipartArgs, GetBucketId(method, ids, endpointExpr, funcName), clientBucket, options); | => SendMultipartAsync<TResponse>(method, GetEndpoint(endpointExpr), multipartArgs, GetBucketId(method, ids, endpointExpr, funcName), clientBucket, options); | ||||
public async Task<TResponse> SendMultipartAsync<TResponse>(string method, string endpoint, IReadOnlyDictionary<string, object> multipartArgs, | public async Task<TResponse> SendMultipartAsync<TResponse>(string method, string endpoint, IReadOnlyDictionary<string, object> multipartArgs, | ||||
string bucketId = null, ClientBucketType clientBucket = ClientBucketType.Unbucketed, RequestOptions options = null) | |||||
BucketId bucketId = null, ClientBucketType clientBucket = ClientBucketType.Unbucketed, RequestOptions options = null) | |||||
{ | { | ||||
options = options ?? new RequestOptions(); | options = options ?? new RequestOptions(); | ||||
options.BucketId = bucketId; | options.BucketId = bucketId; | ||||
@@ -1442,15 +1442,30 @@ namespace Discord.API | |||||
{ | { | ||||
public ulong GuildId { get; internal set; } | public ulong GuildId { get; internal set; } | ||||
public ulong ChannelId { get; internal set; } | public ulong ChannelId { get; internal set; } | ||||
public ulong WebhookId { get; internal set; } | |||||
public string HttpMethod { get; internal set; } | public string HttpMethod { get; internal set; } | ||||
internal BucketIds(ulong guildId = 0, ulong channelId = 0) | |||||
internal BucketIds(ulong guildId = 0, ulong channelId = 0, ulong webhookId = 0) | |||||
{ | { | ||||
GuildId = guildId; | GuildId = guildId; | ||||
ChannelId = channelId; | ChannelId = channelId; | ||||
WebhookId = webhookId; | |||||
} | } | ||||
internal object[] ToArray() | internal object[] ToArray() | ||||
=> new object[] { HttpMethod, GuildId, ChannelId }; | |||||
=> new object[] { HttpMethod, GuildId, ChannelId, WebhookId }; | |||||
internal Dictionary<string, string> ToMajorParametersDictionary() | |||||
{ | |||||
var dict = new Dictionary<string, string>(); | |||||
if (GuildId != 0) | |||||
dict["GuildId"] = GuildId.ToString(); | |||||
if (ChannelId != 0) | |||||
dict["ChannelId"] = ChannelId.ToString(); | |||||
if (WebhookId != 0) | |||||
dict["WebhookId"] = WebhookId.ToString(); | |||||
return dict; | |||||
} | |||||
internal static int? GetIndex(string name) | internal static int? GetIndex(string name) | ||||
{ | { | ||||
@@ -1459,6 +1474,7 @@ namespace Discord.API | |||||
case "httpMethod": return 0; | case "httpMethod": return 0; | ||||
case "guildId": return 1; | case "guildId": return 1; | ||||
case "channelId": return 2; | case "channelId": return 2; | ||||
case "webhookId": return 3; | |||||
default: | default: | ||||
return null; | return null; | ||||
} | } | ||||
@@ -1469,19 +1485,20 @@ namespace Discord.API | |||||
{ | { | ||||
return endpointExpr.Compile()(); | return endpointExpr.Compile()(); | ||||
} | } | ||||
private static string GetBucketId(string httpMethod, BucketIds ids, Expression<Func<string>> endpointExpr, string callingMethod) | |||||
private static BucketId GetBucketId(string httpMethod, BucketIds ids, Expression<Func<string>> endpointExpr, string callingMethod) | |||||
{ | { | ||||
ids.HttpMethod ??= httpMethod; | ids.HttpMethod ??= httpMethod; | ||||
Debug.WriteLine("GetBucketId: " + CreateBucketId(endpointExpr)(ids)); | |||||
return _bucketIdGenerators.GetOrAdd(callingMethod, x => CreateBucketId(endpointExpr))(ids); | return _bucketIdGenerators.GetOrAdd(callingMethod, x => CreateBucketId(endpointExpr))(ids); | ||||
} | } | ||||
private static Func<BucketIds, string> CreateBucketId(Expression<Func<string>> endpoint) | |||||
private static Func<BucketIds, BucketId> CreateBucketId(Expression<Func<string>> endpoint) | |||||
{ | { | ||||
try | try | ||||
{ | { | ||||
//Is this a constant string? | //Is this a constant string? | ||||
if (endpoint.Body.NodeType == ExpressionType.Constant) | if (endpoint.Body.NodeType == ExpressionType.Constant) | ||||
return x => string.Format($"{{0}} {(endpoint.Body as ConstantExpression).Value}", x.ToArray()); | |||||
return x => BucketId.Create(x.HttpMethod, (endpoint.Body as ConstantExpression).Value.ToString(), x.ToMajorParametersDictionary()); | |||||
var builder = new StringBuilder(); | var builder = new StringBuilder(); | ||||
var methodCall = endpoint.Body as MethodCallExpression; | var methodCall = endpoint.Body as MethodCallExpression; | ||||
@@ -1518,7 +1535,7 @@ namespace Discord.API | |||||
var mappedId = BucketIds.GetIndex(fieldName); | var mappedId = BucketIds.GetIndex(fieldName); | ||||
if(!mappedId.HasValue && rightIndex != endIndex && format.Length > rightIndex + 1 && format[rightIndex + 1] == '/') //Ignore the next slash | |||||
if (!mappedId.HasValue && rightIndex != endIndex && format.Length > rightIndex + 1 && format[rightIndex + 1] == '/') //Ignore the next slash | |||||
rightIndex++; | rightIndex++; | ||||
if (mappedId.HasValue) | if (mappedId.HasValue) | ||||
@@ -1531,7 +1548,7 @@ namespace Discord.API | |||||
format = builder.ToString(); | format = builder.ToString(); | ||||
return x => string.Format($"{{0}} {format}", x.ToArray()); | |||||
return x => BucketId.Create(x.HttpMethod, string.Format(format, x.ToArray()), x.ToMajorParametersDictionary()); | |||||
} | } | ||||
catch (Exception ex) | catch (Exception ex) | ||||
{ | { | ||||
@@ -10,14 +10,14 @@ namespace Discord.Net.Queue | |||||
internal struct ClientBucket | internal struct ClientBucket | ||||
{ | { | ||||
private static readonly ImmutableDictionary<ClientBucketType, ClientBucket> DefsByType; | private static readonly ImmutableDictionary<ClientBucketType, ClientBucket> DefsByType; | ||||
private static readonly ImmutableDictionary<string, ClientBucket> DefsById; | |||||
private static readonly ImmutableDictionary<BucketId, ClientBucket> DefsById; | |||||
static ClientBucket() | static ClientBucket() | ||||
{ | { | ||||
var buckets = new[] | var buckets = new[] | ||||
{ | { | ||||
new ClientBucket(ClientBucketType.Unbucketed, "<unbucketed>", 10, 10), | |||||
new ClientBucket(ClientBucketType.SendEdit, "<send_edit>", 10, 10) | |||||
new ClientBucket(ClientBucketType.Unbucketed, BucketId.Create(null, "<unbucketed>", null), 10, 10), | |||||
new ClientBucket(ClientBucketType.SendEdit, BucketId.Create(null, "<send_edit>", null), 10, 10) | |||||
}; | }; | ||||
var builder = ImmutableDictionary.CreateBuilder<ClientBucketType, ClientBucket>(); | var builder = ImmutableDictionary.CreateBuilder<ClientBucketType, ClientBucket>(); | ||||
@@ -25,21 +25,21 @@ namespace Discord.Net.Queue | |||||
builder.Add(bucket.Type, bucket); | builder.Add(bucket.Type, bucket); | ||||
DefsByType = builder.ToImmutable(); | DefsByType = builder.ToImmutable(); | ||||
var builder2 = ImmutableDictionary.CreateBuilder<string, ClientBucket>(); | |||||
var builder2 = ImmutableDictionary.CreateBuilder<BucketId, ClientBucket>(); | |||||
foreach (var bucket in buckets) | foreach (var bucket in buckets) | ||||
builder2.Add(bucket.Id, bucket); | builder2.Add(bucket.Id, bucket); | ||||
DefsById = builder2.ToImmutable(); | DefsById = builder2.ToImmutable(); | ||||
} | } | ||||
public static ClientBucket Get(ClientBucketType type) => DefsByType[type]; | public static ClientBucket Get(ClientBucketType type) => DefsByType[type]; | ||||
public static ClientBucket Get(string id) => DefsById[id]; | |||||
public static ClientBucket Get(BucketId id) => DefsById[id]; | |||||
public ClientBucketType Type { get; } | public ClientBucketType Type { get; } | ||||
public string Id { get; } | |||||
public BucketId Id { get; } | |||||
public int WindowCount { get; } | public int WindowCount { get; } | ||||
public int WindowSeconds { get; } | public int WindowSeconds { get; } | ||||
public ClientBucket(ClientBucketType type, string id, int count, int seconds) | |||||
public ClientBucket(ClientBucketType type, BucketId id, int count, int seconds) | |||||
{ | { | ||||
Type = type; | Type = type; | ||||
Id = id; | Id = id; | ||||
@@ -12,10 +12,9 @@ namespace Discord.Net.Queue | |||||
{ | { | ||||
internal class RequestQueue : IDisposable | internal class RequestQueue : IDisposable | ||||
{ | { | ||||
public event Func<string, RateLimitInfo?, Task> RateLimitTriggered; | |||||
public event Func<BucketId, RateLimitInfo?, Task> RateLimitTriggered; | |||||
private readonly ConcurrentDictionary<string, RequestBucket> _bucketsByHash; | |||||
private readonly ConcurrentDictionary<string, object> _bucketsById; | |||||
private readonly ConcurrentDictionary<BucketId, object> _buckets; | |||||
private readonly SemaphoreSlim _tokenLock; | private readonly SemaphoreSlim _tokenLock; | ||||
private readonly CancellationTokenSource _cancelTokenSource; //Dispose token | private readonly CancellationTokenSource _cancelTokenSource; //Dispose token | ||||
private CancellationTokenSource _clearToken; | private CancellationTokenSource _clearToken; | ||||
@@ -35,8 +34,7 @@ namespace Discord.Net.Queue | |||||
_requestCancelToken = CancellationToken.None; | _requestCancelToken = CancellationToken.None; | ||||
_parentToken = CancellationToken.None; | _parentToken = CancellationToken.None; | ||||
_bucketsByHash = new ConcurrentDictionary<string, RequestBucket>(); | |||||
_bucketsById = new ConcurrentDictionary<string, object>(); | |||||
_buckets = new ConcurrentDictionary<BucketId, object>(); | |||||
_cleanupTask = RunCleanup(); | _cleanupTask = RunCleanup(); | ||||
} | } | ||||
@@ -84,7 +82,7 @@ namespace Discord.Net.Queue | |||||
else | else | ||||
request.Options.CancelToken = _requestCancelToken; | request.Options.CancelToken = _requestCancelToken; | ||||
var bucket = GetOrCreateBucket(request.Options.BucketId, request); | |||||
var bucket = GetOrCreateBucket(request.Options, request); | |||||
var result = await bucket.SendAsync(request).ConfigureAwait(false); | var result = await bucket.SendAsync(request).ConfigureAwait(false); | ||||
createdTokenSource?.Dispose(); | createdTokenSource?.Dispose(); | ||||
return result; | return result; | ||||
@@ -112,25 +110,31 @@ namespace Discord.Net.Queue | |||||
_waitUntil = DateTimeOffset.UtcNow.AddMilliseconds(info.RetryAfter.Value + (info.Lag?.TotalMilliseconds ?? 0.0)); | _waitUntil = DateTimeOffset.UtcNow.AddMilliseconds(info.RetryAfter.Value + (info.Lag?.TotalMilliseconds ?? 0.0)); | ||||
} | } | ||||
private RequestBucket GetOrCreateBucket(string id, RestRequest request) | |||||
private RequestBucket GetOrCreateBucket(RequestOptions options, RestRequest request) | |||||
{ | { | ||||
object obj = _bucketsById.GetOrAdd(id, x => new RequestBucket(this, request, x)); | |||||
if (obj is string hash) | |||||
return _bucketsByHash.GetOrAdd(hash, x => new RequestBucket(this, request, x)); | |||||
var bucketId = options.BucketId; | |||||
object obj = _buckets.GetOrAdd(bucketId, x => new RequestBucket(this, request, x)); | |||||
if (obj is BucketId hashBucket) | |||||
{ | |||||
options.BucketId = hashBucket; | |||||
return (RequestBucket)_buckets.GetOrAdd(hashBucket, x => new RequestBucket(this, request, x)); | |||||
} | |||||
return (RequestBucket)obj; | return (RequestBucket)obj; | ||||
} | } | ||||
internal async Task RaiseRateLimitTriggered(string bucketId, RateLimitInfo? info) | |||||
internal async Task RaiseRateLimitTriggered(BucketId bucketId, RateLimitInfo? info) | |||||
{ | { | ||||
await RateLimitTriggered(bucketId, info).ConfigureAwait(false); | await RateLimitTriggered(bucketId, info).ConfigureAwait(false); | ||||
} | } | ||||
internal void UpdateBucketHash(string id, string discordHash) | |||||
internal BucketId UpdateBucketHash(BucketId id, string discordHash) | |||||
{ | { | ||||
if (_bucketsById.TryGetValue(id, out object obj) && obj is RequestBucket bucket) | |||||
if (!id.IsHashBucket) | |||||
{ | { | ||||
string hash = discordHash + id.Split(new char[] { ' ' }, 2)[1]; //remove http method, using hash now | |||||
_bucketsByHash.GetOrAdd(hash, bucket); | |||||
_bucketsById.TryUpdate(id, hash, bucket); | |||||
var bucket = BucketId.Create(discordHash, id); | |||||
_buckets.GetOrAdd(bucket, _buckets[id]); | |||||
_buckets.AddOrUpdate(id, bucket, (oldBucket, oldObj) => bucket); | |||||
return bucket; | |||||
} | } | ||||
return null; | |||||
} | } | ||||
private async Task RunCleanup() | private async Task RunCleanup() | ||||
@@ -140,20 +144,14 @@ namespace Discord.Net.Queue | |||||
while (!_cancelTokenSource.IsCancellationRequested) | while (!_cancelTokenSource.IsCancellationRequested) | ||||
{ | { | ||||
var now = DateTimeOffset.UtcNow; | var now = DateTimeOffset.UtcNow; | ||||
foreach (var bucket in _bucketsById.Where(x => x.Value is RequestBucket).Select(x => (RequestBucket)x.Value)) | |||||
foreach (var bucket in _buckets.Where(x => x.Value is RequestBucket).Select(x => (RequestBucket)x.Value)) | |||||
{ | { | ||||
if ((now - bucket.LastAttemptAt).TotalMinutes > 1.0) | if ((now - bucket.LastAttemptAt).TotalMinutes > 1.0) | ||||
_bucketsById.TryRemove(bucket.Id, out _); | |||||
} | |||||
foreach (var kvp in _bucketsByHash) | |||||
{ | |||||
var kvpHash = kvp.Key; | |||||
var kvpBucket = kvp.Value; | |||||
if ((now - kvpBucket.LastAttemptAt).TotalMinutes > 1.0) | |||||
{ | { | ||||
_bucketsByHash.TryRemove(kvpHash, out _); | |||||
foreach (var key in _bucketsById.Where(x => x.Value is string hash && hash == kvpHash).Select(x => x.Key)) | |||||
_bucketsById.TryRemove(key, out _); | |||||
if (bucket.Id.IsHashBucket) | |||||
foreach (var redirectBucket in _buckets.Where(x => x.Value == bucket.Id).Select(x => (BucketId)x.Value)) | |||||
_buckets.TryRemove(redirectBucket, out _); //remove redirections if hash bucket | |||||
_buckets.TryRemove(bucket.Id, out _); | |||||
} | } | ||||
} | } | ||||
await Task.Delay(60000, _cancelTokenSource.Token).ConfigureAwait(false); //Runs each minute | await Task.Delay(60000, _cancelTokenSource.Token).ConfigureAwait(false); //Runs each minute | ||||
@@ -20,11 +20,11 @@ namespace Discord.Net.Queue | |||||
private int _semaphore; | private int _semaphore; | ||||
private DateTimeOffset? _resetTick; | private DateTimeOffset? _resetTick; | ||||
public string Id { get; private set; } | |||||
public BucketId Id { get; private set; } | |||||
public int WindowCount { get; private set; } | public int WindowCount { get; private set; } | ||||
public DateTimeOffset LastAttemptAt { get; private set; } | public DateTimeOffset LastAttemptAt { get; private set; } | ||||
public RequestBucket(RequestQueue queue, RestRequest request, string id) | |||||
public RequestBucket(RequestQueue queue, RestRequest request, BucketId id) | |||||
{ | { | ||||
_queue = queue; | _queue = queue; | ||||
Id = id; | Id = id; | ||||
@@ -234,7 +234,7 @@ namespace Discord.Net.Queue | |||||
} | } | ||||
if (info.Bucket != null) | if (info.Bucket != null) | ||||
_queue.UpdateBucketHash(request.Options.BucketId, info.Bucket); | |||||
Id = _queue.UpdateBucketHash(request.Options.BucketId, info.Bucket) ?? Id; | |||||
var now = DateTimeOffset.UtcNow.ToUnixTimeSeconds(); | var now = DateTimeOffset.UtcNow.ToUnixTimeSeconds(); | ||||
DateTimeOffset? resetTick = null; | DateTimeOffset? resetTick = null; | ||||
@@ -77,9 +77,9 @@ namespace Discord.Webhook | |||||
ApiClient.RequestQueue.RateLimitTriggered += async (id, info) => | ApiClient.RequestQueue.RateLimitTriggered += async (id, info) => | ||||
{ | { | ||||
if (info == null) | if (info == null) | ||||
await _restLogger.VerboseAsync($"Preemptive Rate limit triggered: {id ?? "null"}").ConfigureAwait(false); | |||||
await _restLogger.VerboseAsync($"Preemptive Rate limit triggered: {id?.ToString() ?? "null"}").ConfigureAwait(false); | |||||
else | else | ||||
await _restLogger.WarningAsync($"Rate limit triggered: {id ?? "null"}").ConfigureAwait(false); | |||||
await _restLogger.WarningAsync($"Rate limit triggered: {id?.ToString() ?? "null"}").ConfigureAwait(false); | |||||
}; | }; | ||||
ApiClient.SentRequest += async (method, endpoint, millis) => await _restLogger.VerboseAsync($"{method} {endpoint}: {millis} ms").ConfigureAwait(false); | ApiClient.SentRequest += async (method, endpoint, millis) => await _restLogger.VerboseAsync($"{method} {endpoint}: {millis} ms").ConfigureAwait(false); | ||||
} | } | ||||