Browse Source

Added queued message queues and fixed SendMessage's null response

tags/docs-0.9
RogueException 9 years ago
parent
commit
0e68d190b4
4 changed files with 146 additions and 72 deletions
  1. +0
    -2
      src/Discord.Net/DiscordClient.cs
  2. +118
    -54
      src/Discord.Net/MessageQueue.cs
  3. +4
    -4
      src/Discord.Net/Models/Channel.cs
  4. +24
    -12
      src/Discord.Net/Models/Message.cs

+ 0
- 2
src/Discord.Net/DiscordClient.cs View File

@@ -847,7 +847,6 @@ namespace Discord
msg.Update(data); msg.Update(data);
user.UpdateActivity(); user.UpdateActivity();


msg.State = MessageState.Normal;
if (Config.LogEvents) if (Config.LogEvents)
Logger.Verbose($"Message Received: {channel.Server?.Name ?? "[Private]"}/{channel.Name}"); Logger.Verbose($"Message Received: {channel.Server?.Name ?? "[Private]"}/{channel.Name}");
OnMessageReceived(msg); OnMessageReceived(msg);
@@ -869,7 +868,6 @@ namespace Discord
if (msg != null) if (msg != null)
{ {
msg.Update(data); msg.Update(data);
msg.State = MessageState.Normal;
if (Config.LogEvents) if (Config.LogEvents)
Logger.Info($"Message Update: {channel.Server?.Name ?? "[Private]"}/{channel.Name}"); Logger.Info($"Message Update: {channel.Server?.Name ?? "[Private]"}/{channel.Name}");
OnMessageUpdated(msg); OnMessageUpdated(msg);


+ 118
- 54
src/Discord.Net/MessageQueue.cs View File

@@ -11,19 +11,42 @@ namespace Discord.Net
/// <summary> Manages an outgoing message queue for DiscordClient. </summary> /// <summary> Manages an outgoing message queue for DiscordClient. </summary>
public sealed class MessageQueue public sealed class MessageQueue
{ {
private struct MessageQueueItem
private interface IQueuedAction
{ {
public readonly ulong Id, ChannelId;
public readonly string Text;
public readonly bool IsTTS;
Task Do(MessageQueue queue);
}

private struct SendAction : IQueuedAction
{
private readonly Message _msg;

public SendAction(Message msg)
{
_msg = msg;
}
Task IQueuedAction.Do(MessageQueue queue) => queue.Send(_msg);
}
private struct EditAction : IQueuedAction
{
private readonly Message _msg;
private readonly string _text;


public MessageQueueItem(ulong id, ulong channelId, string text, bool isTTS)
public EditAction(Message msg, string text)
{ {
Id = id;
ChannelId = channelId;
Text = text;
IsTTS = isTTS;
_msg = msg;
_text = text;
} }
Task IQueuedAction.Do(MessageQueue queue) => queue.Edit(_msg, _text);
}
private struct DeleteAction : IQueuedAction
{
private readonly Message _msg;

public DeleteAction(Message msg)
{
_msg = msg;
}
Task IQueuedAction.Do(MessageQueue queue) => queue.Delete(_msg);
} }


private const int WarningStart = 30; private const int WarningStart = 30;
@@ -31,10 +54,11 @@ namespace Discord.Net
private readonly Random _nonceRand; private readonly Random _nonceRand;
private readonly DiscordClient _client; private readonly DiscordClient _client;
private readonly Logger _logger; private readonly Logger _logger;
private readonly ConcurrentQueue<MessageQueueItem> _pending;
private readonly ConcurrentQueue<IQueuedAction> _pendingActions;
private readonly ConcurrentDictionary<int, Message> _pendingSends;
private int _nextWarning; private int _nextWarning;


/// <summary> Gets the current number of queued message sends/edits. </summary>
/// <summary> Gets the current number of queued actions. </summary>
public int Count { get; private set; } public int Count { get; private set; }


internal MessageQueue(DiscordClient client, Logger logger) internal MessageQueue(DiscordClient client, Logger logger)
@@ -43,16 +67,35 @@ namespace Discord.Net
_logger = logger; _logger = logger;


_nonceRand = new Random(); _nonceRand = new Random();
_pending = new ConcurrentQueue<MessageQueueItem>();
_pendingActions = new ConcurrentQueue<IQueuedAction>();
_pendingSends = new ConcurrentDictionary<int, Message>();
} }


internal void QueueSend(ulong channelId, string text, bool isTTS)
internal Message QueueSend(Channel channel, string text, bool isTTS)
{
Message msg = new Message(0, channel, channel.IsPrivate ? _client.PrivateUser : channel.Server.CurrentUser);
msg.RawText = text;
msg.Text = msg.Resolve(text);
msg.Nonce = GenerateNonce();
msg.State = MessageState.Queued;
_pendingSends.TryAdd(msg.Nonce, msg);
_pendingActions.Enqueue(new SendAction(msg));
return msg;
}
internal void QueueEdit(Message msg, string text)
{ {
_pending.Enqueue(new MessageQueueItem(0, channelId, text, isTTS));
_pendingActions.Enqueue(new EditAction(msg, text));
} }
internal void QueueEdit(ulong channelId, ulong messageId, string text)
internal void QueueDelete(Message msg)
{ {
_pending.Enqueue(new MessageQueueItem(channelId, messageId, text, false));
Message ignored;
if (msg.State == MessageState.Queued && _pendingSends.TryRemove(msg.Nonce, out ignored))
{
msg.State = MessageState.Aborted;
return;
}

_pendingActions.Enqueue(new DeleteAction(msg));
} }


internal Task Run(CancellationToken cancelToken, int interval) internal Task Run(CancellationToken cancelToken, int interval)
@@ -60,63 +103,84 @@ namespace Discord.Net
_nextWarning = WarningStart; _nextWarning = WarningStart;
return Task.Run(async () => return Task.Run(async () =>
{ {
MessageQueueItem queuedMessage;

while (!cancelToken.IsCancellationRequested) while (!cancelToken.IsCancellationRequested)
{ {
await Task.Delay(interval).ConfigureAwait(false);

Count = _pending.Count;
Count = _pendingActions.Count;
if (Count >= _nextWarning) if (Count >= _nextWarning)
{ {
_nextWarning *= 2; _nextWarning *= 2;
_logger.Warning($"Queue is backed up, currently at {Count} messages.");
_logger.Warning($"Queue is backed up, currently at {Count} actions.");
} }
else if (Count < WarningStart) //Reset once the problem is solved else if (Count < WarningStart) //Reset once the problem is solved
_nextWarning = WarningStart; _nextWarning = WarningStart;


while (_pending.TryDequeue(out queuedMessage))
{
try
{
if (queuedMessage.Id == 0)
{
var request = new SendMessageRequest(queuedMessage.ChannelId)
{
Content = queuedMessage.Text,
Nonce = GenerateNonce().ToIdString(),
IsTTS = queuedMessage.IsTTS
};
await _client.ClientAPI.Send(request).ConfigureAwait(false);
}
else
{
var request = new UpdateMessageRequest(queuedMessage.ChannelId, queuedMessage.Id)
{
Content = queuedMessage.Text
};
await _client.ClientAPI.Send(request).ConfigureAwait(false);
}
}
catch (WebException) { break; }
catch (HttpException) { /*msg.State = MessageState.Failed;*/ }
catch (Exception ex) { _logger.Error(ex); }
}
IQueuedAction queuedAction;
while (_pendingActions.TryDequeue(out queuedAction))
await queuedAction.Do(this);

await Task.Delay(interval).ConfigureAwait(false);
} }
}); });
} }


/// <summary> Clears all queued message sends/edits </summary>
internal async Task Send(Message msg)
{
if (_pendingSends.TryRemove(msg.Nonce, out msg)) //Remove it from pending
{
try
{
var request = new SendMessageRequest(msg.Channel.Id)
{
Content = msg.Text,
Nonce = msg.Nonce.ToString(),
IsTTS = msg.IsTTS
};
var response = await _client.ClientAPI.Send(request).ConfigureAwait(false);
msg.Update(response);
msg.State = MessageState.Normal;
}
catch (Exception ex) { msg.State = MessageState.Failed; _logger.Error("Failed to send message", ex); }
}
}
internal async Task Edit(Message msg, string text)
{
if (msg.State == MessageState.Normal)
{
try
{
var request = new UpdateMessageRequest(msg.Channel.Id, msg.Id)
{
Content = text
};
await _client.ClientAPI.Send(request).ConfigureAwait(false);
}
catch (Exception ex) { msg.State = MessageState.Failed; _logger.Error("Failed to edit message", ex); }
}
}
internal async Task Delete(Message msg)
{
if (msg.State == MessageState.Normal)
{
try
{
var request = new DeleteMessageRequest(msg.Channel.Id, msg.Id);
await _client.ClientAPI.Send(request).ConfigureAwait(false);
}
catch (Exception ex) { msg.State = MessageState.Failed; _logger.Error("Failed to delete message", ex); }
}
}

/// <summary> Clears all queued message sends/edits/deletes </summary>
public void Clear() public void Clear()
{ {
MessageQueueItem ignored;
while (_pending.TryDequeue(out ignored)) { }
IQueuedAction ignored;
while (_pendingActions.TryDequeue(out ignored)) { }
} }


private ulong GenerateNonce()
private int GenerateNonce()
{ {
lock (_nonceRand) lock (_nonceRand)
return (ulong)_nonceRand.Next(1, int.MaxValue);
return _nonceRand.Next(1, int.MaxValue);
} }
} }
} }

+ 4
- 4
src/Discord.Net/Models/Channel.cs View File

@@ -243,6 +243,7 @@ namespace Discord
internal Message AddMessage(ulong id, User user, DateTime timestamp) internal Message AddMessage(ulong id, User user, DateTime timestamp)
{ {
Message message = new Message(id, this, user); Message message = new Message(id, this, user);
message.State = MessageState.Normal;
var cacheLength = Client.Config.MessageCacheSize; var cacheLength = Client.Config.MessageCacheSize;
if (cacheLength > 0) if (cacheLength > 0)
{ {
@@ -336,12 +337,11 @@ namespace Discord
} }
private async Task<Message> SendMessageInternal(string text, bool isTTS) private async Task<Message> SendMessageInternal(string text, bool isTTS)
{ {
Message msg = null;
if (text.Length > DiscordConfig.MaxMessageSize) if (text.Length > DiscordConfig.MaxMessageSize)
throw new ArgumentOutOfRangeException(nameof(text), $"Message must be {DiscordConfig.MaxMessageSize} characters or less."); throw new ArgumentOutOfRangeException(nameof(text), $"Message must be {DiscordConfig.MaxMessageSize} characters or less.");


if (Client.Config.UseMessageQueue) if (Client.Config.UseMessageQueue)
Client.MessageQueue.QueueSend(Id, text, isTTS);
return Client.MessageQueue.QueueSend(this, text, isTTS);
else else
{ {
var request = new SendMessageRequest(Id) var request = new SendMessageRequest(Id)
@@ -351,10 +351,10 @@ namespace Discord
IsTTS = isTTS IsTTS = isTTS
}; };
var model = await Client.ClientAPI.Send(request).ConfigureAwait(false); var model = await Client.ClientAPI.Send(request).ConfigureAwait(false);
msg = AddMessage(model.Id, IsPrivate ? Client.PrivateUser : Server.CurrentUser, model.Timestamp.Value);
var msg = AddMessage(model.Id, IsPrivate ? Client.PrivateUser : Server.CurrentUser, model.Timestamp.Value);
msg.Update(model); msg.Update(model);
return msg;
} }
return msg;
} }


public async Task<Message> SendFile(string filePath) public async Task<Message> SendFile(string filePath)


+ 24
- 12
src/Discord.Net/Models/Message.cs View File

@@ -14,11 +14,16 @@ using APIMessage = Discord.API.Client.Message;
namespace Discord namespace Discord
{ {
public enum MessageState : byte public enum MessageState : byte
{
{
/// <summary> Message did not originate from this session, or was successfully sent. </summary>
Normal = 0, Normal = 0,
/// <summary> Message is current queued. </summary>
Queued, Queued,
/// <summary> Message was deleted before it was sent. </summary>
Aborted,
/// <summary> Message failed to be sent. </summary>
Failed Failed
}
}


public sealed class Message public sealed class Message
{ {
@@ -177,7 +182,7 @@ namespace Discord
/// <summary> Returns the state of this message. Only useful if UseMessageQueue is true. </summary> /// <summary> Returns the state of this message. Only useful if UseMessageQueue is true. </summary>
public MessageState State { get; internal set; } public MessageState State { get; internal set; }
/// <summary> Returns the raw content of this message as it was received from the server. </summary> /// <summary> Returns the raw content of this message as it was received from the server. </summary>
public string RawText { get; private set; }
public string RawText { get; internal set; }
/// <summary> Returns the content of this message with any special references such as mentions converted. </summary> /// <summary> Returns the content of this message with any special references such as mentions converted. </summary>
public string Text { get; internal set; } public string Text { get; internal set; }
/// <summary> Returns the timestamp for when this message was sent. </summary> /// <summary> Returns the timestamp for when this message was sent. </summary>
@@ -189,15 +194,17 @@ namespace Discord
/// <summary> Returns a collection of all embeded content in this message. </summary> /// <summary> Returns a collection of all embeded content in this message. </summary>
public Embed[] Embeds { get; private set; } public Embed[] Embeds { get; private set; }


/// <summary> Returns a collection of all users mentioned in this message. </summary>
public IEnumerable<User> MentionedUsers { get; internal set; }
/// <summary> Returns a collection of all users mentioned in this message. </summary>
public IEnumerable<User> MentionedUsers { get; internal set; }
/// <summary> Returns a collection of all channels mentioned in this message. </summary> /// <summary> Returns a collection of all channels mentioned in this message. </summary>
public IEnumerable<Channel> MentionedChannels { get; internal set; } public IEnumerable<Channel> MentionedChannels { get; internal set; }
/// <summary> Returns a collection of all roles mentioned in this message. </summary> /// <summary> Returns a collection of all roles mentioned in this message. </summary>
public IEnumerable<Role> MentionedRoles { get; internal set; } public IEnumerable<Role> MentionedRoles { get; internal set; }
/// <summary> Returns the server containing the channel this message was sent to. </summary>
public Server Server => Channel.Server;

internal int Nonce { get; set; }

/// <summary> Returns the server containing the channel this message was sent to. </summary>
public Server Server => Channel.Server;
/// <summary> Returns if this message was sent from the logged-in accounts. </summary> /// <summary> Returns if this message was sent from the logged-in accounts. </summary>
public bool IsAuthor => User.Id == Client.CurrentUser?.Id; public bool IsAuthor => User.Id == Client.CurrentUser?.Id;


@@ -309,7 +316,7 @@ namespace Discord
throw new ArgumentOutOfRangeException(nameof(text), $"Message must be {DiscordConfig.MaxMessageSize} characters or less."); throw new ArgumentOutOfRangeException(nameof(text), $"Message must be {DiscordConfig.MaxMessageSize} characters or less.");


if (Client.Config.UseMessageQueue) if (Client.Config.UseMessageQueue)
Client.MessageQueue.QueueEdit(channel.Id, Id, text);
Client.MessageQueue.QueueEdit(this, text);
else else
{ {
var request = new UpdateMessageRequest(Channel.Id, Id) var request = new UpdateMessageRequest(Channel.Id, Id)
@@ -321,9 +328,14 @@ namespace Discord
} }
public async Task Delete() public async Task Delete()
{ {
var request = new DeleteMessageRequest(Channel.Id, Id);
try { await Client.ClientAPI.Send(request).ConfigureAwait(false); }
catch (HttpException ex) when (ex.StatusCode == HttpStatusCode.NotFound) { }
if (Client.Config.UseMessageQueue)
Client.MessageQueue.QueueDelete(this);
else
{
var request = new DeleteMessageRequest(Channel.Id, Id);
try { await Client.ClientAPI.Send(request).ConfigureAwait(false); }
catch (HttpException ex) when (ex.StatusCode == HttpStatusCode.NotFound) { }
}
} }


/// <summary> Returns true if the logged-in user was mentioned. </summary> /// <summary> Returns true if the logged-in user was mentioned. </summary>


Loading…
Cancel
Save