Browse Source

Allow LineReader to start with a first package

tags/3.4.0
noisyfox 7 years ago
parent
commit
a61b10bb67
1 changed files with 89 additions and 41 deletions
  1. +89
    -41
      shadowsocks-csharp/Util/Sockets/LineReader.cs

+ 89
- 41
shadowsocks-csharp/Util/Sockets/LineReader.cs View File

@@ -18,10 +18,14 @@ namespace Shadowsocks.Util.Sockets

private readonly byte[] _lineBuffer;

private int _bufferIndex;
private int _bufferDataIndex;
private int _bufferDataLength;

public LineReader(WrappedSocket socket, Func<string, object, bool> onLineRead, Action<Exception, object> onException,
Action<byte[], int, int, object> onFinish, Encoding encoding, string delimiter, int maxLineBytes, object state)
public LineReader(WrappedSocket socket, byte[] firstPackge, int index, int length,
Func<string, object, bool> onLineRead, Action<Exception, object> onException,
Action<byte[], int, int, object> onFinish,
Encoding encoding, string delimiter, int maxLineBytes,
object state)
{
if (socket == null)
{
@@ -40,6 +44,19 @@ namespace Shadowsocks.Util.Sockets
throw new ArgumentNullException(nameof(delimiter));
}

if (maxLineBytes < length)
{
throw new ArgumentException("Line buffer length can't less than first package length!", nameof(maxLineBytes));
}

if (length > 0)
{
if (firstPackge == null)
{
throw new ArgumentNullException(nameof(firstPackge));
}
}

_socket = socket;
_onLineRead = onLineRead;
_onException = onException;
@@ -66,64 +83,95 @@ namespace Shadowsocks.Util.Sockets

_lineBuffer = new byte[maxLineBytes];

// start reading
socket.BeginReceive(_lineBuffer, 0, maxLineBytes, 0, ReceiveCallback, 0);
if (length > 0)
{
// process first package
Array.Copy(firstPackge, index, _lineBuffer, 0, length);
_bufferDataLength = length;

try
{
NewPackageRecv();
}
catch (Exception ex)
{
OnException(ex);
OnFinish();
}
}
else
{
// start reading
socket.BeginReceive(_lineBuffer, 0, maxLineBytes, 0, ReceiveCallback, 0);
}
}

public LineReader(WrappedSocket socket, Func<string, object, bool> onLineRead,
Action<Exception, object> onException,
Action<byte[], int, int, object> onFinish, Encoding encoding, string delimiter, int maxLineBytes,
object state)
: this(socket, null, 0, 0, onLineRead, onException, onFinish, encoding, delimiter, maxLineBytes, state)
{
}

private void ReceiveCallback(IAsyncResult ar)
{
int length = (int)ar.AsyncState;
try
{
var bytesRead = _socket.EndReceive(ar);

if (bytesRead == 0)
{
OnFinish(length);
OnFinish();
return;
}

length += bytesRead;

int i;
while ((i = IndexOf(_lineBuffer, _bufferIndex, length, _delimiterBytes, _delimiterSearchOffsetTable,
_delimiterSearchCharTable)) != -1)
{
var decodeLen = i - _bufferIndex;
string line = _encoding.GetString(_lineBuffer, _bufferIndex, decodeLen);
_bufferDataLength += bytesRead;

_bufferIndex = i + _delimiterBytes.Length;
length -= decodeLen;
length -= _delimiterBytes.Length;
NewPackageRecv();
}
catch (Exception ex)
{
OnException(ex);
OnFinish();
}
}

var stop = _onLineRead(line, _state);
if (stop)
{
OnFinish(length);
return;
}
}
if (length == _lineBuffer.Length)
{
OnException(new IndexOutOfRangeException("LineBuffer full! Try increace maxLineBytes!"));
OnFinish(length);
private void NewPackageRecv()
{
int i;
while ((i = IndexOf(_lineBuffer, _bufferDataIndex, _bufferDataLength, _delimiterBytes, _delimiterSearchOffsetTable,
_delimiterSearchCharTable)) != -1)
{
var decodeLen = i - _bufferDataIndex;
string line = _encoding.GetString(_lineBuffer, _bufferDataIndex, decodeLen);

return;
}
_bufferDataIndex = i + _delimiterBytes.Length;
_bufferDataLength -= decodeLen;
_bufferDataLength -= _delimiterBytes.Length;

if (_bufferIndex > 0)
var stop = _onLineRead(line, _state);
if (stop)
{
Buffer.BlockCopy(_lineBuffer, _bufferIndex, _lineBuffer, 0, length);
_bufferIndex = 0;
OnFinish();
return;
}
}
if (_bufferDataLength == _lineBuffer.Length)
{
OnException(new IndexOutOfRangeException("LineBuffer full! Try increace maxLineBytes!"));
OnFinish();

_socket.BeginReceive(_lineBuffer, length, _lineBuffer.Length - length, 0, ReceiveCallback, length);
return;
}
catch (Exception ex)

if (_bufferDataIndex > 0)
{
OnException(ex);
OnFinish(length);
Buffer.BlockCopy(_lineBuffer, _bufferDataIndex, _lineBuffer, 0, _bufferDataLength);
_bufferDataIndex = 0;
}

_socket.BeginReceive(_lineBuffer, _bufferDataLength, _lineBuffer.Length - _bufferDataLength, 0, ReceiveCallback, _bufferDataLength);
}

private void OnException(Exception ex)
@@ -131,14 +179,14 @@ namespace Shadowsocks.Util.Sockets
_onException?.Invoke(ex, _state);
}

private void OnFinish(int length)
private void OnFinish()
{
_onFinish?.Invoke(_lineBuffer, _bufferIndex, length, _state);
_onFinish?.Invoke(_lineBuffer, _bufferDataIndex, _bufferDataLength, _state);
}

#region Boyer-Moore string search

public static int IndexOf(byte[] haystack, int index, int length, byte[] needle, int[] offsetTable, int[] charTable)
private static int IndexOf(byte[] haystack, int index, int length, byte[] needle, int[] offsetTable, int[] charTable)
{
var end = index + length;
for (int i = needle.Length - 1 + index, j; i < end;)


Loading…
Cancel
Save