Browse Source

async await style udp operation

pull/2925/head
Student Main 4 years ago
parent
commit
bff6760a9b
No known key found for this signature in database GPG Key ID: AA78519C208C8742
2 changed files with 31 additions and 42 deletions
  1. +3
    -9
      shadowsocks-csharp/Controller/Service/UDPListener.cs
  2. +28
    -33
      shadowsocks-csharp/Controller/Service/UDPRelay.cs

+ 3
- 9
shadowsocks-csharp/Controller/Service/UDPListener.cs View File

@@ -13,20 +13,14 @@ namespace Shadowsocks.Controller
{
public interface IDatagramService
{
[Obsolete]
bool Handle(byte[] firstPacket, int length, Socket socket, object state);

public abstract bool Handle(CachedNetworkStream stream, object state);
public abstract Task< bool> Handle(Memory<byte> packet, Socket socket, EndPoint client);

void Stop();
}

public abstract class DatagramService : IDatagramService
{
[Obsolete]
public abstract bool Handle(byte[] firstPacket, int length, Socket socket, object state);

public abstract bool Handle(CachedNetworkStream stream, object state);
public abstract Task<bool> Handle(Memory<byte> packet, Socket socket, EndPoint client);

public virtual void Stop() { }
}
@@ -101,7 +95,7 @@ namespace Shadowsocks.Controller
var len = result.ReceivedBytes;
foreach (IDatagramService service in _services)
{
if (service.Handle(buffer, len, _udpSocket, result.RemoteEndPoint))
if (await service.Handle(new Memory<byte>(buffer)[..len], _udpSocket, result.RemoteEndPoint))
{
break;
}


+ 28
- 33
shadowsocks-csharp/Controller/Service/UDPRelay.cs View File

@@ -1,8 +1,10 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using NLog;
using Shadowsocks.Controller.Strategy;
@@ -26,28 +28,17 @@ namespace Shadowsocks.Controller
this._controller = controller;
}
// TODO: UDP is datagram protocol not stream protocol
public override bool Handle(CachedNetworkStream stream, object state)
{
byte[] fp = new byte[256];
int len = stream.ReadFirstBlock(fp);
return Handle(fp, len, stream.Socket, state);
}
[Obsolete]
public override bool Handle(byte[] firstPacket, int length, Socket socket, object state)
public override async Task<bool> Handle(Memory<byte> packet, Socket socket, EndPoint client)
{
if (socket.ProtocolType != ProtocolType.Udp)
{
return false;
}
if (length < 4)
if (packet.Length < 4)
{
return false;
}
// UDPListener.UDPState udpState = (UDPListener.UDPState)state;
IPEndPoint remoteEndPoint = (IPEndPoint)state;
// IPEndPoint remoteEndPoint = (IPEndPoint)socket.RemoteEndPoint;
IPEndPoint remoteEndPoint = (IPEndPoint)client;
UDPHandler handler = _cache.get(remoteEndPoint);
if (handler == null)
{
@@ -55,14 +46,14 @@ namespace Shadowsocks.Controller
handler.Receive();
_cache.add(remoteEndPoint, handler);
}
handler.Send(firstPacket, length);
await handler.SendAsync(packet);
return true;
}
public class UDPHandler
{
private static Logger logger = LogManager.GetCurrentClassLogger();
private static MemoryPool<byte> pool = MemoryPool<byte>.Shared;
private Socket _local;
private Socket _remote;
@@ -92,8 +83,7 @@ namespace Shadowsocks.Controller
_localEndPoint = localEndPoint;
// TODO async resolving
IPAddress ipAddress;
bool parsed = IPAddress.TryParse(server.server, out ipAddress);
bool parsed = IPAddress.TryParse(server.server, out IPAddress ipAddress);
if (!parsed)
{
IPHostEntry ipHostInfo = Dns.GetHostEntry(server.server);
@@ -104,15 +94,19 @@ namespace Shadowsocks.Controller
_remote.Bind(new IPEndPoint(ListenAddress, 0));
}
public void Send(byte[] data, int length)
public async Task SendAsync(ReadOnlyMemory<byte> data)
{
IEncryptor encryptor = EncryptorFactory.GetEncryptor(_server.method, _server.password);
var slicedData = data.AsSpan(0, length);
byte[] dataOut = new byte[slicedData.Length + 1000];
var dataToSend = slicedData[3..];
int outlen = encryptor.EncryptUDP(slicedData[3..], dataOut);
using IMemoryOwner<byte> mem = pool.Rent(data.Length + 1000);
// byte[] dataOut = new byte[slicedData.Length + 1000];
int outlen = encryptor.EncryptUDP(data.Span[3..], mem.Memory.Span);
logger.Debug(_localEndPoint, _remoteEndPoint, outlen, "UDP Relay up");
_remote?.SendTo(dataOut, outlen, SocketFlags.None, _remoteEndPoint);
if (!MemoryMarshal.TryGetArray(mem.Memory[..outlen], out ArraySegment<byte> outData))
{
throw new InvalidOperationException("Can't extract underly array segment");
};
await _remote?.SendToAsync(outData, SocketFlags.None, _remoteEndPoint);
}
public async Task ReceiveAsync()
@@ -121,22 +115,23 @@ namespace Shadowsocks.Controller
logger.Debug($"++++++Receive Server Port, size:" + _buffer.Length);
try
{
while (true)
{
var result = await _remote.ReceiveFromAsync(_buffer, SocketFlags.None, remoteEndPoint);
int bytesRead = result.ReceivedBytes;
byte[] dataOut = new byte[bytesRead];
int outlen;
IEncryptor encryptor = EncryptorFactory.GetEncryptor(_server.method, _server.password);
outlen = encryptor.DecryptUDP(dataOut, _buffer.AsSpan(0, bytesRead));
byte[] sendBuf = new byte[outlen + 3];
Array.Copy(dataOut, 0, sendBuf, 3, outlen);
using IMemoryOwner<byte> owner = pool.Rent(bytesRead + 3);
Memory<byte> o = owner.Memory;
IEncryptor encryptor = EncryptorFactory.GetEncryptor(_server.method, _server.password);
int outlen = encryptor.DecryptUDP(o.Span[3..], _buffer.AsSpan(0, bytesRead));
logger.Debug(_remoteEndPoint, _localEndPoint, outlen, "UDP Relay down");
await _local?.SendToAsync(sendBuf, SocketFlags.None, _localEndPoint);
if (!MemoryMarshal.TryGetArray(o[..(outlen + 3)], out ArraySegment<byte> data))
{
throw new InvalidOperationException("Can't extract underly array segment");
};
await _local?.SendToAsync(data, SocketFlags.None, _localEndPoint);
}
}
catch (Exception e)


Loading…
Cancel
Save