Browse Source

New approach to record and calculate inbound/outbound speed.

Use lock to sync between threads.This should fix issue #679
and other same issue.

Fix a bug in UpdateSpeed() that only save the first xxxSpeedRecord.
tags/3.3
noisyfox 8 years ago
parent
commit
65d72fdfed
1 changed files with 103 additions and 36 deletions
  1. +103
    -36
      shadowsocks-csharp/Controller/Service/AvailabilityStatistics.cs

+ 103
- 36
shadowsocks-csharp/Controller/Service/AvailabilityStatistics.cs View File

@@ -34,12 +34,77 @@ namespace Shadowsocks.Controller
//records cache for current server in {_monitorInterval} minutes
private readonly ConcurrentDictionary<string, List<int>> _latencyRecords = new ConcurrentDictionary<string, List<int>>();
//speed in KiB/s
private readonly ConcurrentDictionary<string, long> _inboundCounter = new ConcurrentDictionary<string, long>();
private readonly ConcurrentDictionary<string, long> _lastInboundCounter = new ConcurrentDictionary<string, long>();
private readonly ConcurrentDictionary<string, List<int>> _inboundSpeedRecords = new ConcurrentDictionary<string, List<int>>();
private readonly ConcurrentDictionary<string, long> _outboundCounter = new ConcurrentDictionary<string, long>();
private readonly ConcurrentDictionary<string, long> _lastOutboundCounter = new ConcurrentDictionary<string, long>();
private readonly ConcurrentDictionary<string, List<int>> _outboundSpeedRecords = new ConcurrentDictionary<string, List<int>>();
private readonly ConcurrentDictionary<string, InOutBoundRecord> _inOutBoundRecords = new ConcurrentDictionary<string, InOutBoundRecord>();
private class InOutBoundRecord
{
private long _inbound;
private long _lastInbound;
private long _outbound;
private long _lastOutbound;

private SpinLock _lock = new SpinLock();

public void UpdateInbound(long delta)
{
bool lockTaken = false;
try
{
_lock.Enter(ref lockTaken);
Interlocked.Add(ref _inbound, delta);
}
finally
{
if (lockTaken)
{
_lock.Exit(false);
}
}
}

public void UpdateOutbound(long delta)
{
bool lockTaken = false;
try
{
_lock.Enter(ref lockTaken);
Interlocked.Add(ref _outbound, delta);
}
finally
{
if (lockTaken)
{
_lock.Exit(false);
}
}
}

public void GetDelta(out long inboundDelta, out long outboundDelta)
{
bool lockTaken = false;
try
{
_lock.Enter(ref lockTaken);

var i = Interlocked.Read(ref _inbound);
var il = Interlocked.Exchange(ref _lastInbound, i);
inboundDelta = i - il;


var o = Interlocked.Read(ref _outbound);
var ol = Interlocked.Exchange(ref _lastOutbound, o);
outboundDelta = o - ol;
}
finally
{
if (lockTaken)
{
_lock.Exit(false);
}
}
}
}

//tasks
private readonly TimeSpan _delayBeforeStart = TimeSpan.FromSeconds(1);
@@ -98,36 +163,26 @@ namespace Shadowsocks.Controller

private void UpdateSpeed(object _)
{
foreach (var kv in _lastInboundCounter)
foreach (var kv in _inOutBoundRecords)
{
var id = kv.Key;
var record = kv.Value;

var lastInbound = kv.Value;
var inbound = _inboundCounter[id];
var bytes = inbound - lastInbound;
_lastInboundCounter[id] = inbound;
var inboundSpeed = GetSpeedInKiBPerSecond(bytes, _monitorInterval.TotalSeconds);
_inboundSpeedRecords.GetOrAdd(id, (k) =>
{
List<int> records = new List<int>();
records.Add(inboundSpeed);
return records;
});
long inboundDelta, outboundDelta;

var lastOutbound = _lastOutboundCounter[id];
var outbound = _outboundCounter[id];
bytes = outbound - lastOutbound;
_lastOutboundCounter[id] = outbound;
var outboundSpeed = GetSpeedInKiBPerSecond(bytes, _monitorInterval.TotalSeconds);
_outboundSpeedRecords.GetOrAdd(id, (k) =>
{
List<int> records = new List<int>();
records.Add(outboundSpeed);
return records;
});
record.GetDelta(out inboundDelta, out outboundDelta);

var inboundSpeed = GetSpeedInKiBPerSecond(inboundDelta, _monitorInterval.TotalSeconds);
var outboundSpeed = GetSpeedInKiBPerSecond(outboundDelta, _monitorInterval.TotalSeconds);

var inR = _inboundSpeedRecords.GetOrAdd(id, (k) => new List<int>());
var outR = _outboundSpeedRecords.GetOrAdd(id, (k) => new List<int>());

inR.Add(inboundSpeed);
outR.Add(outboundSpeed);

Logging.Debug(
$"{id}: current/max inbound {inboundSpeed}/{_inboundSpeedRecords[id].Max()} KiB/s, current/max outbound {outboundSpeed}/{_outboundSpeedRecords[id].Max()} KiB/s");
$"{id}: current/max inbound {inboundSpeed}/{inR.Max()} KiB/s, current/max outbound {outboundSpeed}/{outR.Max()} KiB/s");
}
}

@@ -327,20 +382,32 @@ namespace Shadowsocks.Controller

public void UpdateInboundCounter(Server server, long n)
{
_inboundCounter.AddOrUpdate(server.Identifier(), (k) =>
_inOutBoundRecords.AddOrUpdate(server.Identifier(), (k) =>
{
_lastInboundCounter.GetOrAdd(server.Identifier(), 0);
return n;
}, (k, v) => (v + n));
var r = new InOutBoundRecord();
r.UpdateInbound(n);

return r;
}, (k, v) =>
{
v.UpdateInbound(n);
return v;
});
}

public void UpdateOutboundCounter(Server server, long n)
{
_outboundCounter.AddOrUpdate(server.Identifier(), (k) =>
_inOutBoundRecords.AddOrUpdate(server.Identifier(), (k) =>
{
_lastOutboundCounter.GetOrAdd(server.Identifier(), 0);
return n;
}, (k, v) => (v + n));
var r = new InOutBoundRecord();
r.UpdateOutbound(n);

return r;
}, (k, v) =>
{
v.UpdateOutbound(n);
return v;
});
}

class UpdateRecordsState


Loading…
Cancel
Save