From 37d894594e835d1afd6ed44b332f03f99dffc091 Mon Sep 17 00:00:00 2001 From: noisyfox Date: Mon, 22 Aug 2016 11:49:53 +1000 Subject: [PATCH] Use immutable remote reference during async process. To prevent "The IAsyncResult object was not returned from the corresponding asynchronous method on this class." exception. Signed-off-by: noisyfox --- .../Controller/Service/TCPRelay.cs | 99 ++++++++++++++----- 1 file changed, 73 insertions(+), 26 deletions(-) diff --git a/shadowsocks-csharp/Controller/Service/TCPRelay.cs b/shadowsocks-csharp/Controller/Service/TCPRelay.cs index da69f028..2d86de25 100644 --- a/shadowsocks-csharp/Controller/Service/TCPRelay.cs +++ b/shadowsocks-csharp/Controller/Service/TCPRelay.cs @@ -80,6 +80,33 @@ namespace Shadowsocks.Controller class TCPHandler { + + class AsyncSession + { + public IProxy Remote { get; } + + public AsyncSession(IProxy remote) + { + Remote = remote; + } + } + + class AsyncSession : AsyncSession + { + public T State { get; set; } + + public AsyncSession(IProxy remote, T state) : base(remote) + { + State = state; + } + + public AsyncSession(AsyncSession session, T state): base(session.Remote) + { + State = state; + } + } + + // Size of receive buffer. public static readonly int RecvSize = 8192; public static readonly int RecvReserveSize = IVEncryptor.ONETIMEAUTH_BYTES + IVEncryptor.AUTH_BYTES; // reserve for one-time auth @@ -89,7 +116,8 @@ namespace Shadowsocks.Controller public IEncryptor encryptor; public Server server; // Client socket. - public IProxy remote; + private AsyncSession _currentRemoteSession; + public Socket connection; public ShadowsocksController controller; public TCPRelay tcprelay; @@ -177,6 +205,7 @@ namespace Shadowsocks.Controller } try { + var remote = _currentRemoteSession?.Remote; remote?.Shutdown(SocketShutdown.Both); remote?.Close(); } @@ -340,7 +369,8 @@ namespace Shadowsocks.Controller // inner class private class ProxyTimer : Timer { - public IProxy Proxy; + public AsyncSession Session; + public EndPoint DestEndPoint; public Server Server; @@ -351,6 +381,8 @@ namespace Shadowsocks.Controller private class ServerTimer : Timer { + public AsyncSession Session; + public Server Server; public ServerTimer(int p) : base(p) { } } @@ -362,6 +394,7 @@ namespace Shadowsocks.Controller CreateRemote(); // Setting up proxy + IProxy remote; EndPoint proxyEP; if (_config.useProxy) { @@ -374,20 +407,22 @@ namespace Shadowsocks.Controller proxyEP = null; } + var session = new AsyncSession(remote); + _currentRemoteSession = session; ProxyTimer proxyTimer = new ProxyTimer(3000); proxyTimer.AutoReset = false; proxyTimer.Elapsed += proxyConnectTimer_Elapsed; proxyTimer.Enabled = true; - proxyTimer.Proxy = remote; + proxyTimer.Session = session; proxyTimer.DestEndPoint = SocketUtil.GetEndPoint(server.server, server.server_port); proxyTimer.Server = server; _proxyConnected = false; // Connect to the proxy server. - remote.BeginConnectProxy(proxyEP, new AsyncCallback(ProxyConnectCallback), proxyTimer); + remote.BeginConnectProxy(proxyEP, new AsyncCallback(ProxyConnectCallback), new AsyncSession(remote, proxyTimer)); } catch (Exception e) { @@ -402,10 +437,10 @@ namespace Shadowsocks.Controller { return; } - var proxy = ((ProxyTimer)sender).Proxy; + var proxy = ((ProxyTimer)sender).Session.Remote; Logging.Info($"Proxy {proxy.ProxyEndPoint} timed out"); - remote?.Close(); + proxy.Close(); RetryConnect(); } @@ -418,13 +453,16 @@ namespace Shadowsocks.Controller } try { - ProxyTimer timer = (ProxyTimer)ar.AsyncState; + var session = (AsyncSession) ar.AsyncState; + ProxyTimer timer = session.State; var destEndPoint = timer.DestEndPoint; server = timer.Server; timer.Elapsed -= proxyConnectTimer_Elapsed; timer.Enabled = false; timer.Dispose(); + var remote = session.Remote; + // Complete the connection. remote.EndConnectProxy(ar); @@ -443,11 +481,12 @@ namespace Shadowsocks.Controller connectTimer.AutoReset = false; connectTimer.Elapsed += destConnectTimer_Elapsed; connectTimer.Enabled = true; + connectTimer.Session = session; connectTimer.Server = server; _destConnected = false; // Connect to the remote endpoint. - remote.BeginConnectDest(destEndPoint, new AsyncCallback(ConnectCallback), connectTimer); + remote.BeginConnectDest(destEndPoint, new AsyncCallback(ConnectCallback), new AsyncSession(session, connectTimer)); } catch (ArgumentException) { @@ -466,11 +505,12 @@ namespace Shadowsocks.Controller return; } + var session = ((ServerTimer) sender).Session; Server server = ((ServerTimer)sender).Server; IStrategy strategy = controller.GetCurrentStrategy(); strategy?.SetFailure(server); Logging.Info($"{server.FriendlyName()} timed out"); - remote?.Close(); + session.Remote.Close(); RetryConnect(); } @@ -491,12 +531,14 @@ namespace Shadowsocks.Controller if (_closed) return; try { - ServerTimer timer = (ServerTimer)ar.AsyncState; + var session = (AsyncSession) ar.AsyncState; + ServerTimer timer = session.State; server = timer.Server; timer.Elapsed -= destConnectTimer_Elapsed; timer.Enabled = false; timer.Dispose(); + var remote = session.Remote; // Complete the connection. remote?.EndConnectDest(ar); @@ -512,7 +554,7 @@ namespace Shadowsocks.Controller strategy?.UpdateLatency(server, latency); _tcprelay.UpdateLatency(server, latency); - StartPipe(); + StartPipe(session); } catch (ArgumentException) { @@ -529,15 +571,15 @@ namespace Shadowsocks.Controller } } - private void StartPipe() + private void StartPipe(AsyncSession session) { if (_closed) return; try { _startReceivingTime = DateTime.Now; - remote?.BeginReceive(_remoteRecvBuffer, 0, RecvSize, SocketFlags.None, new AsyncCallback(PipeRemoteReceiveCallback), null); + session.Remote.BeginReceive(_remoteRecvBuffer, 0, RecvSize, SocketFlags.None, new AsyncCallback(PipeRemoteReceiveCallback), session); connection?.BeginReceive(_connetionRecvBuffer, 0, RecvSize, SocketFlags.None, new AsyncCallback(PipeConnectionReceiveCallback), - true /* to tell the callback this is the first time reading packet, and we haven't found the header yet. */); + new AsyncSession(session, true) /* to tell the callback this is the first time reading packet, and we haven't found the header yet. */); } catch (Exception e) { @@ -551,8 +593,8 @@ namespace Shadowsocks.Controller if (_closed) return; try { - if ( remote == null ) return; - int bytesRead = remote.EndReceive(ar); + var session = (AsyncSession) ar.AsyncState; + int bytesRead = session.Remote.EndReceive(ar); _totalRead += bytesRead; _tcprelay.UpdateInboundCounter(server, bytesRead); if (bytesRead > 0) @@ -564,7 +606,7 @@ namespace Shadowsocks.Controller if (_closed) return; encryptor.Decrypt(_remoteRecvBuffer, bytesRead, _remoteSendBuffer, out bytesToSend); } - connection.BeginSend(_remoteSendBuffer, 0, bytesToSend, SocketFlags.None, new AsyncCallback(PipeConnectionSendCallback), null); + connection.BeginSend(_remoteSendBuffer, 0, bytesToSend, SocketFlags.None, new AsyncCallback(PipeConnectionSendCallback), session); IStrategy strategy = controller.GetCurrentStrategy(); strategy?.UpdateLastRead(server); } @@ -590,14 +632,17 @@ namespace Shadowsocks.Controller if(connection == null) return; int bytesRead = connection.EndReceive(ar); _totalWrite += bytesRead; + + var session = (AsyncSession) ar.AsyncState; + var remote = session.Remote; + if (bytesRead > 0) { /* * Only the first packet contains the socks5 header, it doesn't make sense to parse every packets. * Also it's unnecessary to parse these data if we turn off the VerboseLogging. */ - object needToFindHeader = null; - if (ar.AsyncState != null && _config.isVerboseLogging) + if (session.State && _config.isVerboseLogging) { int atyp = _connetionRecvBuffer[0]; string dst_addr; @@ -609,6 +654,7 @@ namespace Shadowsocks.Controller dst_port = (_connetionRecvBuffer[5] << 8) + _connetionRecvBuffer[6]; Logging.Info($"connect to {dst_addr}:{dst_port}"); + session.State = false; break; case 3: // domain name, length + str int len = _connetionRecvBuffer[1]; @@ -616,15 +662,14 @@ namespace Shadowsocks.Controller dst_port = (_connetionRecvBuffer[len + 2] << 8) + _connetionRecvBuffer[len + 3]; Logging.Info($"connect to {dst_addr}:{dst_port}"); + session.State = false; break; case 4: // IPv6 address, 16 bytes dst_addr = new IPAddress(_connetionRecvBuffer.Skip(1).Take(16).ToArray()).ToString(); dst_port = (_connetionRecvBuffer[17] << 8) + _connetionRecvBuffer[18]; Logging.Info($"connect to [{dst_addr}]:{dst_port}"); - break; - default: - needToFindHeader = true; // Still not found, try next packet. + session.State = false; break; } } @@ -638,7 +683,7 @@ namespace Shadowsocks.Controller _tcprelay.UpdateOutboundCounter(server, bytesToSend); _startSendingTime = DateTime.Now; _bytesToSend = bytesToSend; - remote.BeginSend(_connetionSendBuffer, 0, bytesToSend, SocketFlags.None, new AsyncCallback(PipeRemoteSendCallback), needToFindHeader); + remote.BeginSend(_connetionSendBuffer, 0, bytesToSend, SocketFlags.None, new AsyncCallback(PipeRemoteSendCallback), session); IStrategy strategy = controller.GetCurrentStrategy(); strategy?.UpdateLastWrite(server); } @@ -661,8 +706,9 @@ namespace Shadowsocks.Controller if (_closed) return; try { - remote?.EndSend(ar); - connection?.BeginReceive(_connetionRecvBuffer, 0, RecvSize, SocketFlags.None, new AsyncCallback(PipeConnectionReceiveCallback), ar.AsyncState); + var session = (AsyncSession)ar.AsyncState; + session.Remote.EndSend(ar); + connection?.BeginReceive(_connetionRecvBuffer, 0, RecvSize, SocketFlags.None, new AsyncCallback(PipeConnectionReceiveCallback), session); } catch (Exception e) { @@ -676,8 +722,9 @@ namespace Shadowsocks.Controller if (_closed) return; try { + var session = (AsyncSession)ar.AsyncState; connection?.EndSend(ar); - remote?.BeginReceive(_remoteRecvBuffer, 0, RecvSize, SocketFlags.None, new AsyncCallback(PipeRemoteReceiveCallback), null); + session.Remote.BeginReceive(_remoteRecvBuffer, 0, RecvSize, SocketFlags.None, new AsyncCallback(PipeRemoteReceiveCallback), session); } catch (Exception e) {