Browse Source

Revert "Allow LineReader to start with a first package"

This reverts commit a61b10bb67.
tags/3.4.2
noisyfox 8 years ago
parent
commit
6ddd3d486b
1 changed files with 41 additions and 89 deletions
  1. +41
    -89
      shadowsocks-csharp/Util/Sockets/LineReader.cs

+ 41
- 89
shadowsocks-csharp/Util/Sockets/LineReader.cs View File

@@ -18,14 +18,10 @@ namespace Shadowsocks.Util.Sockets


private readonly byte[] _lineBuffer; private readonly byte[] _lineBuffer;


private int _bufferDataIndex;
private int _bufferDataLength;
private int _bufferIndex;


public LineReader(WrappedSocket socket, byte[] firstPackge, int index, int length,
Func<string, object, bool> onLineRead, Action<Exception, object> onException,
Action<byte[], int, int, object> onFinish,
Encoding encoding, string delimiter, int maxLineBytes,
object state)
public LineReader(WrappedSocket socket, Func<string, object, bool> onLineRead, Action<Exception, object> onException,
Action<byte[], int, int, object> onFinish, Encoding encoding, string delimiter, int maxLineBytes, object state)
{ {
if (socket == null) if (socket == null)
{ {
@@ -44,19 +40,6 @@ namespace Shadowsocks.Util.Sockets
throw new ArgumentNullException(nameof(delimiter)); throw new ArgumentNullException(nameof(delimiter));
} }


if (maxLineBytes < length)
{
throw new ArgumentException("Line buffer length can't less than first package length!", nameof(maxLineBytes));
}

if (length > 0)
{
if (firstPackge == null)
{
throw new ArgumentNullException(nameof(firstPackge));
}
}

_socket = socket; _socket = socket;
_onLineRead = onLineRead; _onLineRead = onLineRead;
_onException = onException; _onException = onException;
@@ -83,95 +66,64 @@ namespace Shadowsocks.Util.Sockets


_lineBuffer = new byte[maxLineBytes]; _lineBuffer = new byte[maxLineBytes];


if (length > 0)
{
// process first package
Array.Copy(firstPackge, index, _lineBuffer, 0, length);
_bufferDataLength = length;

try
{
NewPackageRecv();
}
catch (Exception ex)
{
OnException(ex);
OnFinish();
}
}
else
{
// start reading
socket.BeginReceive(_lineBuffer, 0, maxLineBytes, 0, ReceiveCallback, 0);
}
}

public LineReader(WrappedSocket socket, Func<string, object, bool> onLineRead,
Action<Exception, object> onException,
Action<byte[], int, int, object> onFinish, Encoding encoding, string delimiter, int maxLineBytes,
object state)
: this(socket, null, 0, 0, onLineRead, onException, onFinish, encoding, delimiter, maxLineBytes, state)
{
// start reading
socket.BeginReceive(_lineBuffer, 0, maxLineBytes, 0, ReceiveCallback, 0);
} }


private void ReceiveCallback(IAsyncResult ar) private void ReceiveCallback(IAsyncResult ar)
{ {
int length = (int)ar.AsyncState;
try try
{ {
var bytesRead = _socket.EndReceive(ar); var bytesRead = _socket.EndReceive(ar);


if (bytesRead == 0) if (bytesRead == 0)
{ {
OnFinish();
OnFinish(length);
return; return;
} }


_bufferDataLength += bytesRead;
length += bytesRead;


NewPackageRecv();
}
catch (Exception ex)
{
OnException(ex);
OnFinish();
}
}

private void NewPackageRecv()
{
int i;
while ((i = IndexOf(_lineBuffer, _bufferDataIndex, _bufferDataLength, _delimiterBytes, _delimiterSearchOffsetTable,
_delimiterSearchCharTable)) != -1)
{
var decodeLen = i - _bufferDataIndex;
string line = _encoding.GetString(_lineBuffer, _bufferDataIndex, decodeLen);
int i;
while ((i = IndexOf(_lineBuffer, _bufferIndex, length, _delimiterBytes, _delimiterSearchOffsetTable,
_delimiterSearchCharTable)) != -1)
{
var decodeLen = i - _bufferIndex;
string line = _encoding.GetString(_lineBuffer, _bufferIndex, decodeLen);


_bufferDataIndex = i + _delimiterBytes.Length;
_bufferDataLength -= decodeLen;
_bufferDataLength -= _delimiterBytes.Length;
_bufferIndex = i + _delimiterBytes.Length;
length -= decodeLen;
length -= _delimiterBytes.Length;


var stop = _onLineRead(line, _state);
if (stop)
var stop = _onLineRead(line, _state);
if (stop)
{
OnFinish(length);
return;
}
}
if (length == _lineBuffer.Length)
{ {
OnFinish();
OnException(new IndexOutOfRangeException("LineBuffer full! Try increace maxLineBytes!"));
OnFinish(length);

return; return;
} }
}
if (_bufferDataLength == _lineBuffer.Length)
{
OnException(new IndexOutOfRangeException("LineBuffer full! Try increace maxLineBytes!"));
OnFinish();


return;
}
if (_bufferIndex > 0)
{
Buffer.BlockCopy(_lineBuffer, _bufferIndex, _lineBuffer, 0, length);
_bufferIndex = 0;
}


if (_bufferDataIndex > 0)
_socket.BeginReceive(_lineBuffer, length, _lineBuffer.Length - length, 0, ReceiveCallback, length);
}
catch (Exception ex)
{ {
Buffer.BlockCopy(_lineBuffer, _bufferDataIndex, _lineBuffer, 0, _bufferDataLength);
_bufferDataIndex = 0;
OnException(ex);
OnFinish(length);
} }

_socket.BeginReceive(_lineBuffer, _bufferDataLength, _lineBuffer.Length - _bufferDataLength, 0, ReceiveCallback, _bufferDataLength);
} }


private void OnException(Exception ex) private void OnException(Exception ex)
@@ -179,14 +131,14 @@ namespace Shadowsocks.Util.Sockets
_onException?.Invoke(ex, _state); _onException?.Invoke(ex, _state);
} }


private void OnFinish()
private void OnFinish(int length)
{ {
_onFinish?.Invoke(_lineBuffer, _bufferDataIndex, _bufferDataLength, _state);
_onFinish?.Invoke(_lineBuffer, _bufferIndex, length, _state);
} }


#region Boyer-Moore string search #region Boyer-Moore string search


private static int IndexOf(byte[] haystack, int index, int length, byte[] needle, int[] offsetTable, int[] charTable)
public static int IndexOf(byte[] haystack, int index, int length, byte[] needle, int[] offsetTable, int[] charTable)
{ {
var end = index + length; var end = index + length;
for (int i = needle.Length - 1 + index, j; i < end;) for (int i = needle.Length - 1 + index, j; i < end;)


Loading…
Cancel
Save