Skip to content

Commit

Permalink
feat: Added processing threads
Browse files Browse the repository at this point in the history
  • Loading branch information
TwoTenPvP committed Jan 14, 2020
1 parent 0ed1dad commit bda863e
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 7 deletions.
5 changes: 5 additions & 0 deletions Ruffles/Configuration/SocketConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class SocketConfig
/// If this gets full no more events can be processed and the application will freeze until it is polled.
/// </summary>
public ushort EventQueueSize = 1024 * 8;
public int ProcessingQueueSize = 1024 * 8;
/// <summary>
/// The pool size of the HeapPointers pool.
/// </summary>
Expand Down Expand Up @@ -86,6 +87,10 @@ public class SocketConfig
/// The amount of socket threads to start.
/// </summary>
public int SocketThreads = 1;
/// <summary>
/// The amount of packet processing threads to start.
/// </summary>
public int ProcessingThreads = 1;

// Bandwidth
/// <summary>
Expand Down
91 changes: 84 additions & 7 deletions Ruffles/Core/RuffleSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ public sealed class RuffleSocket
private readonly ReaderWriterLockSlim _syncronizedCallbacksLock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
private readonly List<NetTuple<SynchronizationContext, SendOrPostCallback>> _syncronizedCallbacks = new List<NetTuple<SynchronizationContext, SendOrPostCallback>>();
// Event queue
private ConcurrentCircularQueue<NetworkEvent> _userEventQueue;
private ConcurrentCircularQueue<NetworkEvent> _userEventQueue;

// Processing queue
private ConcurrentCircularQueue<NetTuple<HeapMemory, EndPoint>> _processingQueue;

/// <summary>
/// Gets a syncronization event that is set when a event is received.
Expand Down Expand Up @@ -255,6 +258,16 @@ private void Initialize()
if (Logging.CurrentLogLevel <= LogLevel.Debug) Logging.LogInfo("Allocating " + Config.EventQueueSize + " event slots");
_userEventQueue = new ConcurrentCircularQueue<NetworkEvent>(Config.EventQueueSize);

if (Config.ProcessingThreads > 0)
{
if (Logging.CurrentLogLevel <= LogLevel.Debug) Logging.LogInfo("Allocating " + Config.ProcessingQueueSize + " processing slots");
_processingQueue = new ConcurrentCircularQueue<NetTuple<HeapMemory, EndPoint>>(Config.ProcessingQueueSize);
}
else
{
if (Logging.CurrentLogLevel <= LogLevel.Debug) Logging.LogInfo("Not allocating processingQueue beucase ProcessingThreads is set to 0");
}

if (Logging.CurrentLogLevel <= LogLevel.Debug) Logging.LogInfo("Allocating " + Config.ConnectionChallengeHistory + " challenge IV slots");
_challengeInitializationVectors = new SlidingSet<ulong>((int)Config.ConnectionChallengeHistory, true);

Expand Down Expand Up @@ -324,6 +337,17 @@ public bool Start()
});
}

for (int i = 0; i < Config.ProcessingThreads; i++)
{
if (Logging.CurrentLogLevel <= LogLevel.Debug) Logging.LogInfo("Creating ProcessingThread #" + i);

_threads.Add(new Thread(StartPacketProcessing)
{
Name = "ProcessingThread #" + i,
IsBackground = true
});
}

// Set running state to true
IsRunning = true;

Expand Down Expand Up @@ -772,7 +796,8 @@ private void StartNetworkLogic()

private void StartSocketLogic()
{
byte[] _incomingBuffer = new byte[Config.MaxBufferSize];
// Only alloc buffer if we dont have any processing threads.
byte[] _incomingBuffer = Config.ProcessingThreads > 0 ? null : new byte[Config.MaxBufferSize];
List<Socket> _selectSockets = new List<Socket>();

while (IsRunning)
Expand All @@ -796,11 +821,41 @@ private void StartSocketLogic()
try
{
// Get a endpoint reference
EndPoint _endpoint = _selectSockets[i].AddressFamily == AddressFamily.InterNetwork ? _fromIPv4Endpoint : _selectSockets[i].AddressFamily == AddressFamily.InterNetworkV6 ? _fromIPv6Endpoint : null;
EndPoint _endpoint = _selectSockets[i].AddressFamily == AddressFamily.InterNetwork ? _fromIPv4Endpoint : _selectSockets[i].AddressFamily == AddressFamily.InterNetworkV6 ? _fromIPv6Endpoint : null;

byte[] receiveBuffer;
int receiveSize;
HeapMemory memory = null;

if (Config.ProcessingThreads > 0)
{
// Alloc memory for the packet. Alloc max MTU
memory = MemoryManager.AllocHeapMemory((uint)Config.MaximumMTU);
receiveSize = (int)memory.VirtualCount;
receiveBuffer = memory.Buffer;
}
else
{
receiveBuffer = _incomingBuffer;
receiveSize = _incomingBuffer.Length;
}

int size = _selectSockets[i].ReceiveFrom(_incomingBuffer, 0, _incomingBuffer.Length, SocketFlags.None, ref _endpoint);
// Receive from socket
int size = _selectSockets[i].ReceiveFrom(receiveBuffer, 0, receiveSize, SocketFlags.None, ref _endpoint);

HandlePacket(new ArraySegment<byte>(_incomingBuffer, 0, size), _endpoint, true);
if (Config.ProcessingThreads > 0)
{
// Set the size to prevent reading to end
memory.VirtualCount = (uint)size;

// Process off thread
_processingQueue.Enqueue(new NetTuple<HeapMemory, EndPoint>(memory, _endpoint));
}
else
{
// Process on thread
HandlePacket(new ArraySegment<byte>(receiveBuffer, 0, receiveSize), _endpoint, true);
}
}
catch (SocketException e)
{
Expand All @@ -820,6 +875,28 @@ private void StartSocketLogic()
}
}

private void StartPacketProcessing()
{
while (IsRunning)
{
try
{
while (_processingQueue.TryDequeue(out NetTuple<HeapMemory, EndPoint> packet))
{
// Process packet
HandlePacket(new ArraySegment<byte>(packet.Item1.Buffer, (int)packet.Item1.VirtualOffset, (int)packet.Item1.VirtualCount), packet.Item2, true);

// Dealloc the memory
MemoryManager.DeAlloc(packet.Item1);
}
}
catch (Exception e)
{
if (Logging.CurrentLogLevel <= LogLevel.Error) Logging.LogError("Error when processing packet: " + e);
}
}
}

/// <summary>
/// Polls the RuffleSocket for incoming events about connections.
/// </summary>
Expand Down Expand Up @@ -877,7 +954,7 @@ internal bool SendRaw(EndPoint endpoint, ArraySegment<byte> payload)
}

private readonly List<ArraySegment<byte>> _mergeSegmentResults = new List<ArraySegment<byte>>();
internal void HandlePacket(ArraySegment<byte> payload, EndPoint endpoint, bool allowMergeUnpack, bool wirePacket = true)
internal void HandlePacket(ArraySegment<byte> payload, EndPoint endpoint, bool allowMergeUnpack)
{
if (payload.Count < 1)
{
Expand Down Expand Up @@ -920,7 +997,7 @@ internal void HandlePacket(ArraySegment<byte> payload, EndPoint endpoint, bool a
for (int i = 0; i < _mergeSegmentResults.Count; i++)
{
// Handle the segment
HandlePacket(_mergeSegmentResults[i], endpoint, false, false);
HandlePacket(_mergeSegmentResults[i], endpoint, false);
}
}
}
Expand Down

0 comments on commit bda863e

Please sign in to comment.