diff --git a/Ruffles/Configuration/SocketConfig.cs b/Ruffles/Configuration/SocketConfig.cs index 61a2003..10fe4bd 100644 --- a/Ruffles/Configuration/SocketConfig.cs +++ b/Ruffles/Configuration/SocketConfig.cs @@ -21,6 +21,7 @@ public class SocketConfig /// If this gets full no more events can be processed and the application will freeze until it is polled. /// public ushort EventQueueSize = 1024 * 8; + public int ProcessingQueueSize = 1024 * 8; /// /// The pool size of the HeapPointers pool. /// @@ -86,6 +87,10 @@ public class SocketConfig /// The amount of socket threads to start. /// public int SocketThreads = 1; + /// + /// The amount of packet processing threads to start. + /// + public int ProcessingThreads = 1; // Bandwidth /// diff --git a/Ruffles/Core/RuffleSocket.cs b/Ruffles/Core/RuffleSocket.cs index 3ac5a37..dd687a4 100644 --- a/Ruffles/Core/RuffleSocket.cs +++ b/Ruffles/Core/RuffleSocket.cs @@ -69,7 +69,10 @@ public sealed class RuffleSocket private readonly ReaderWriterLockSlim _syncronizedCallbacksLock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion); private readonly List> _syncronizedCallbacks = new List>(); // Event queue - private ConcurrentCircularQueue _userEventQueue; + private ConcurrentCircularQueue _userEventQueue; + + // Processing queue + private ConcurrentCircularQueue> _processingQueue; /// /// Gets a syncronization event that is set when a event is received. @@ -255,6 +258,16 @@ private void Initialize() if (Logging.CurrentLogLevel <= LogLevel.Debug) Logging.LogInfo("Allocating " + Config.EventQueueSize + " event slots"); _userEventQueue = new ConcurrentCircularQueue(Config.EventQueueSize); + if (Config.ProcessingThreads > 0) + { + if (Logging.CurrentLogLevel <= LogLevel.Debug) Logging.LogInfo("Allocating " + Config.ProcessingQueueSize + " processing slots"); + _processingQueue = new ConcurrentCircularQueue>(Config.ProcessingQueueSize); + } + else + { + if (Logging.CurrentLogLevel <= LogLevel.Debug) Logging.LogInfo("Not allocating processingQueue beucase ProcessingThreads is set to 0"); + } + if (Logging.CurrentLogLevel <= LogLevel.Debug) Logging.LogInfo("Allocating " + Config.ConnectionChallengeHistory + " challenge IV slots"); _challengeInitializationVectors = new SlidingSet((int)Config.ConnectionChallengeHistory, true); @@ -324,6 +337,17 @@ public bool Start() }); } + for (int i = 0; i < Config.ProcessingThreads; i++) + { + if (Logging.CurrentLogLevel <= LogLevel.Debug) Logging.LogInfo("Creating ProcessingThread #" + i); + + _threads.Add(new Thread(StartPacketProcessing) + { + Name = "ProcessingThread #" + i, + IsBackground = true + }); + } + // Set running state to true IsRunning = true; @@ -772,7 +796,8 @@ private void StartNetworkLogic() private void StartSocketLogic() { - byte[] _incomingBuffer = new byte[Config.MaxBufferSize]; + // Only alloc buffer if we dont have any processing threads. + byte[] _incomingBuffer = Config.ProcessingThreads > 0 ? null : new byte[Config.MaxBufferSize]; List _selectSockets = new List(); while (IsRunning) @@ -796,11 +821,41 @@ private void StartSocketLogic() try { // Get a endpoint reference - EndPoint _endpoint = _selectSockets[i].AddressFamily == AddressFamily.InterNetwork ? _fromIPv4Endpoint : _selectSockets[i].AddressFamily == AddressFamily.InterNetworkV6 ? _fromIPv6Endpoint : null; + EndPoint _endpoint = _selectSockets[i].AddressFamily == AddressFamily.InterNetwork ? _fromIPv4Endpoint : _selectSockets[i].AddressFamily == AddressFamily.InterNetworkV6 ? _fromIPv6Endpoint : null; + + byte[] receiveBuffer; + int receiveSize; + HeapMemory memory = null; + + if (Config.ProcessingThreads > 0) + { + // Alloc memory for the packet. Alloc max MTU + memory = MemoryManager.AllocHeapMemory((uint)Config.MaximumMTU); + receiveSize = (int)memory.VirtualCount; + receiveBuffer = memory.Buffer; + } + else + { + receiveBuffer = _incomingBuffer; + receiveSize = _incomingBuffer.Length; + } - int size = _selectSockets[i].ReceiveFrom(_incomingBuffer, 0, _incomingBuffer.Length, SocketFlags.None, ref _endpoint); + // Receive from socket + int size = _selectSockets[i].ReceiveFrom(receiveBuffer, 0, receiveSize, SocketFlags.None, ref _endpoint); - HandlePacket(new ArraySegment(_incomingBuffer, 0, size), _endpoint, true); + if (Config.ProcessingThreads > 0) + { + // Set the size to prevent reading to end + memory.VirtualCount = (uint)size; + + // Process off thread + _processingQueue.Enqueue(new NetTuple(memory, _endpoint)); + } + else + { + // Process on thread + HandlePacket(new ArraySegment(receiveBuffer, 0, receiveSize), _endpoint, true); + } } catch (SocketException e) { @@ -820,6 +875,28 @@ private void StartSocketLogic() } } + private void StartPacketProcessing() + { + while (IsRunning) + { + try + { + while (_processingQueue.TryDequeue(out NetTuple packet)) + { + // Process packet + HandlePacket(new ArraySegment(packet.Item1.Buffer, (int)packet.Item1.VirtualOffset, (int)packet.Item1.VirtualCount), packet.Item2, true); + + // Dealloc the memory + MemoryManager.DeAlloc(packet.Item1); + } + } + catch (Exception e) + { + if (Logging.CurrentLogLevel <= LogLevel.Error) Logging.LogError("Error when processing packet: " + e); + } + } + } + /// /// Polls the RuffleSocket for incoming events about connections. /// @@ -877,7 +954,7 @@ internal bool SendRaw(EndPoint endpoint, ArraySegment payload) } private readonly List> _mergeSegmentResults = new List>(); - internal void HandlePacket(ArraySegment payload, EndPoint endpoint, bool allowMergeUnpack, bool wirePacket = true) + internal void HandlePacket(ArraySegment payload, EndPoint endpoint, bool allowMergeUnpack) { if (payload.Count < 1) { @@ -920,7 +997,7 @@ internal void HandlePacket(ArraySegment payload, EndPoint endpoint, bool a for (int i = 0; i < _mergeSegmentResults.Count; i++) { // Handle the segment - HandlePacket(_mergeSegmentResults[i], endpoint, false, false); + HandlePacket(_mergeSegmentResults[i], endpoint, false); } } }