From 18ca3162440b936b16809a46e31a59a3eca336dc Mon Sep 17 00:00:00 2001 From: uink45 <79078981+uink45@users.noreply.github.com> Date: Thu, 17 Oct 2024 13:30:17 +1000 Subject: [PATCH 1/2] Update Mplex protocol; --- src/Lantern.Beacon.Console/Program.cs | 2 + src/Lantern.Beacon/BeaconClientManager.cs | 85 ++++++++---------- src/Lantern.Beacon/Networking/IPeerState.cs | 4 - .../Libp2pProtocols/Mplex/MplexProtocol.cs | 87 +------------------ src/Lantern.Beacon/Networking/PeerState.cs | 4 - 5 files changed, 40 insertions(+), 142 deletions(-) diff --git a/src/Lantern.Beacon.Console/Program.cs b/src/Lantern.Beacon.Console/Program.cs index b8fdbeb..73bda85 100644 --- a/src/Lantern.Beacon.Console/Program.cs +++ b/src/Lantern.Beacon.Console/Program.cs @@ -48,10 +48,12 @@ public static async Task Main() MaxParallelDials = 10, EnableDiscovery = true, GossipSubEnabled = true, + TargetPeerCount = 3, Bootnodes = [ //"/ip4/162.19.222.38/tcp/15401/p2p/16Uiu2HAmLA7eWnZUnjFQNR7sa8uZumNGA5hPvW6wiWoW1cT2Xkgg" //"/ip4/116.202.215.20/tcp/9000/p2p/16Uiu2HAmB8gmsy3QGaLcL8gQHF5TUAn6fhQNzNT522xArY2tMhKr" //"/ip4/88.99.208.221/tcp/9105/p2p/16Uiu2HAkvSit4sbSkr6AdiEzUcHFv7KQrssV1y4QVsDt4EVpVTYU" + "/ip4/15.235.118.102/tcp/9000/p2p/16Uiu2HAmP1BbR9tgtLVbpjy7k46uEar2wResbuWaA3ibJNrzYmkv" ], SyncProtocolOptions = new SyncProtocolOptions { diff --git a/src/Lantern.Beacon/BeaconClientManager.cs b/src/Lantern.Beacon/BeaconClientManager.cs index 3ceb410..5b534dc 100644 --- a/src/Lantern.Beacon/BeaconClientManager.cs +++ b/src/Lantern.Beacon/BeaconClientManager.cs @@ -324,8 +324,7 @@ private async Task DialPeer(Multiaddress peer, CancellationToken token = default return; } - var supportsLightClientProtocols = - LightClientProtocols.All.All(protocol => peerProtocols!.Contains(protocol)); + var supportsLightClientProtocols = LightClientProtocols.All.All(protocol => peerProtocols!.Contains(protocol)); if (supportsLightClientProtocols) { @@ -384,8 +383,7 @@ private async Task MonitorSyncStatus(CancellationToken token) { if (!peerState.BootstrapPeers.IsEmpty) { - var peer = peerState.BootstrapPeers.Values.ElementAt( - new Random().Next(peerState.BootstrapPeers.Count)); + var peer = peerState.BootstrapPeers.Values.ElementAt(new Random().Next(peerState.BootstrapPeers.Count)); if (!syncProtocol.IsInitialised) { @@ -395,26 +393,19 @@ private async Task MonitorSyncStatus(CancellationToken token) peer.Address.Get().Value.ToString(), peer.Address.Get().Value.ToString()); - var dialTask = peer.DialAsync(token); - var timeoutTask = Task.Delay(TimeSpan.FromSeconds(clientOptions.DialTimeoutSeconds), token); - - await Task.WhenAny(dialTask, timeoutTask); + await DialPeerWithProtocol(peer, token); } if (!syncProtocol.IsInitialised) { _logger.LogWarning( - "Failed to initialize light client from peer: /ip4/{Ip4}/tcp/{TcpPort}/p2p/{PeerId} Disconnecting...", + "Failed to initialize light client from peer: /ip4/{Ip4}/tcp/{TcpPort}/p2p/{PeerId}. Disconnecting...", peer.Address.Get().Value.ToString(), peer.Address.Get().Value.ToString(), peer.Address.Get().Value.ToString()); peerState.BootstrapPeers.TryRemove(peer.Address.GetPeerId()!, out _); - - var goodbyeTask = peer.DialAsync(token); - var timeoutTask = Task.Delay(TimeSpan.FromSeconds(clientOptions.DialTimeoutSeconds), token); - - await Task.WhenAny(goodbyeTask, timeoutTask); + await DialPeerWithProtocol(peer, token); } else { @@ -482,10 +473,7 @@ private async Task SyncDenebForkAsync(IRemotePeer peer, CancellationToken token denebOptimisticPeriod ); - var dialTask = peer.DialAsync(token); - var timeoutTask = Task.Delay(TimeSpan.FromSeconds(clientOptions.DialTimeoutSeconds), token); - - await Task.WhenAny(dialTask, timeoutTask); + await DialPeerWithProtocol(peer, token); } if (denebFinalizedPeriod + 1 < denebCurrentPeriod) @@ -501,10 +489,7 @@ private async Task SyncDenebForkAsync(IRemotePeer peer, CancellationToken token count ); - var dialTask = peer.DialAsync(token); - var timeoutTask = Task.Delay(TimeSpan.FromSeconds(clientOptions.DialTimeoutSeconds), token); - - await Task.WhenAny(dialTask, timeoutTask); + await DialPeerWithProtocol(peer, token); } } @@ -542,10 +527,7 @@ private async Task SyncCapellaForkAsync(IRemotePeer peer, CancellationToken toke capellaOptimisticPeriod ); - var dialTask = peer.DialAsync(token); - var timeoutTask = Task.Delay(TimeSpan.FromSeconds(clientOptions.DialTimeoutSeconds), token); - - await Task.WhenAny(dialTask, timeoutTask); + await DialPeerWithProtocol(peer, token); } if (capellaFinalizedPeriod + 1 < capellaCurrentPeriod) @@ -562,10 +544,7 @@ private async Task SyncCapellaForkAsync(IRemotePeer peer, CancellationToken toke count ); - var dialTask = peer.DialAsync(token); - var timeoutTask = Task.Delay(TimeSpan.FromSeconds(clientOptions.DialTimeoutSeconds), token); - - await Task.WhenAny(dialTask, timeoutTask); + await DialPeerWithProtocol(peer, token); } } @@ -603,10 +582,7 @@ private async Task SyncAltairForkAsync(IRemotePeer peer, CancellationToken token altairOptimisticPeriod ); - var dialTask = peer.DialAsync(token); - var timeoutTask = Task.Delay(TimeSpan.FromSeconds(clientOptions.DialTimeoutSeconds), token); - - await Task.WhenAny(dialTask, timeoutTask); + await DialPeerWithProtocol(peer, token); } if (altairFinalizedPeriod + 1 < altairCurrentPeriod) @@ -622,11 +598,26 @@ private async Task SyncAltairForkAsync(IRemotePeer peer, CancellationToken token startPeriod, count ); - - var dialTask = peer.DialAsync(token); - var timeoutTask = Task.Delay(TimeSpan.FromSeconds(clientOptions.DialTimeoutSeconds), token); - await Task.WhenAny(dialTask, timeoutTask); + await DialPeerWithProtocol(peer, token); + } + } + + private async Task DialPeerWithProtocol(IRemotePeer peer, CancellationToken token = default) where T : IProtocol + { + using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(token); + timeoutCts.CancelAfter(TimeSpan.FromSeconds(clientOptions.DialTimeoutSeconds)); + + var dialTask = peer.DialAsync(timeoutCts.Token); + + try + { + await dialTask; + return true; + } + catch (OperationCanceledException) when (!token.IsCancellationRequested) + { + return false; } } @@ -668,11 +659,9 @@ private async Task RunOptimisticUpdateLoopAsync(IRemotePeer peer, CancellationTo _logger.LogInformation("Requesting optimistic update..."); - var dialTask = peer.DialAsync(token); - var timeoutTask = Task.Delay(TimeSpan.FromSeconds(clientOptions.DialTimeoutSeconds), token); - var completedTask = await Task.WhenAny(dialTask, timeoutTask); - - if (completedTask == timeoutTask) + var result = await DialPeerWithProtocol(peer, token); + + if (!result) { break; } @@ -727,12 +716,10 @@ private async Task RunFinalityUpdateLoopAsync(IRemotePeer peer, CancellationToke continue; _logger.LogInformation("Requesting finality update..."); - - var dialTask = peer.DialAsync(token); - var timeoutTask = Task.Delay(TimeSpan.FromSeconds(clientOptions.DialTimeoutSeconds), token); - var completedTask = await Task.WhenAny(dialTask, timeoutTask); - - if (completedTask == timeoutTask) + + var result = await DialPeerWithProtocol(peer, token); + + if (!result) { break; } diff --git a/src/Lantern.Beacon/Networking/IPeerState.cs b/src/Lantern.Beacon/Networking/IPeerState.cs index 4fe3c95..27942c3 100644 --- a/src/Lantern.Beacon/Networking/IPeerState.cs +++ b/src/Lantern.Beacon/Networking/IPeerState.cs @@ -18,8 +18,4 @@ public interface IPeerState MetaData MetaData { get; } void Init(IEnumerable appLayerProtocols); - - int ConnectedPeersCount { get; set; } - - int DisconnectedPeersCount { get; set; } } \ No newline at end of file diff --git a/src/Lantern.Beacon/Networking/Libp2pProtocols/Mplex/MplexProtocol.cs b/src/Lantern.Beacon/Networking/Libp2pProtocols/Mplex/MplexProtocol.cs index 19ebe33..87e5a1f 100644 --- a/src/Lantern.Beacon/Networking/Libp2pProtocols/Mplex/MplexProtocol.cs +++ b/src/Lantern.Beacon/Networking/Libp2pProtocols/Mplex/MplexProtocol.cs @@ -1,7 +1,5 @@ using System.Buffers; -using System.Collections.Concurrent; using Microsoft.Extensions.Logging; -using Multiformats.Address.Protocols; using Nethermind.Libp2p.Core; using Nethermind.Libp2p.Core.Exceptions; @@ -45,53 +43,10 @@ protected override async Task ConnectAsync(IChannel downChannel, IChannelFactory context.Connected(context.RemotePeer); _ = Task.Run(() => HandleSubDialRequests(context, channelFactory, isListener, downChannel, peerState)); - - var isChannelClosed = false; - - _ = Task.Run(async () => - { - while (!isChannelClosed && !downChannelAwaiter.IsCompleted) - { - await Task.Delay(TimeSpan.FromSeconds(10)); - var foundLivePeer = false; - - foreach (var gossipPeer in _peerState.GossipPeers.Keys) - { - _logger?.LogDebug("Checking if peer {PeerId} matches ID {Id}", gossipPeer, - context.RemotePeer.Address.GetPeerId()); - - if (!gossipPeer.Bytes.SequenceEqual(context.RemotePeer.Address.GetPeerId()!.Bytes)) - continue; - - _logger?.LogDebug("Peer {PeerId} was found in 'GossipPeers'", gossipPeer); - foundLivePeer = true; - break; - } - - foreach (var livePeer in _peerState.BootstrapPeers.Keys) - { - _logger?.LogDebug("Checking if peer {PeerId} matches ID {Id}", livePeer, - context.RemotePeer.Address.GetPeerId()); - - if (!livePeer.Bytes.SequenceEqual(context.RemotePeer.Address.GetPeerId()!.Bytes)) - continue; - - _logger?.LogDebug("Peer {PeerId} is still in 'LivePeers'", livePeer); - foundLivePeer = true; - break; - } - - if (foundLivePeer) - continue; - - _logger?.LogDebug("Removing peer {PeerId} from 'GossipPeers'", context.RemotePeer.Address.GetPeerId()); - isChannelClosed = true; - } - }); try { - while (!isChannelClosed && !downChannelAwaiter.IsCompleted) + while (!downChannelAwaiter.IsCompleted) { var message = await ReadMessageAsync(downChannel); await HandleMessageAsync(message, downChannel, channelFactory, context, peerState); @@ -100,46 +55,10 @@ protected override async Task ConnectAsync(IChannel downChannel, IChannelFactory catch (ChannelClosedException ex) { _logger?.LogDebug("Closed due to transport disconnection: {Exception}", ex.Message); - isChannelClosed = true; - - var foundPeer = false; - foreach (var gossipPeer in _peerState.GossipPeers.Keys) - { - _logger?.LogDebug("Checking if peer {PeerId} matches ID {Id}", gossipPeer, - context.RemotePeer.Address.GetPeerId()); - - if (!gossipPeer.Bytes.SequenceEqual(context.RemotePeer.Address.GetPeerId()!.Bytes)) - continue; - - _logger?.LogDebug("Peer {PeerId} was found in 'GossipPeers'", gossipPeer); - foundPeer = true; - break; - } - - if(foundPeer) - _peerState.GossipPeers.TryRemove(context.RemotePeer.Address.GetPeerId()!, out _); } catch (Exception ex) { _logger?.LogDebug("Closed with exception {Exception}", ex.Message); - isChannelClosed = true; - - var foundPeer = false; - foreach (var gossipPeer in _peerState.GossipPeers.Keys) - { - _logger?.LogDebug("Checking if peer {PeerId} matches ID {Id}", gossipPeer, - context.RemotePeer.Address.GetPeerId()); - - if (!gossipPeer.Bytes.SequenceEqual(context.RemotePeer.Address.GetPeerId()!.Bytes)) - continue; - - _logger?.LogDebug("Peer {PeerId} was found in 'GossipPeers'", gossipPeer); - foundPeer = true; - break; - } - - if(foundPeer) - _peerState.GossipPeers.TryRemove(context.RemotePeer.Address.GetPeerId()!, out _); } _logger?.LogDebug("Closing all channels"); @@ -207,8 +126,7 @@ private ChannelState CreateUpChannel(long streamId, IChannelRequest? request, IC { tcs?.SetResult(); }); - - // Initiate background processing of the channel + _ = Task.Run(() => ProcessChannelAsync(downChannel, streamId, upChannel, isListener, channelState)); return channelState; @@ -499,7 +417,6 @@ private async Task WriteMessageAsync(IChannel channel, MplexMessage message) { try { - // Calculate sizes first to avoid multiple allocations var header = (ulong)(message.StreamId << 3) | (ulong)message.Flag; var headerSize = VarInt.GetSizeInBytes(header); var lengthSize = VarInt.GetSizeInBytes((ulong)message.Data.Length); diff --git a/src/Lantern.Beacon/Networking/PeerState.cs b/src/Lantern.Beacon/Networking/PeerState.cs index 25ba70b..5f0a878 100644 --- a/src/Lantern.Beacon/Networking/PeerState.cs +++ b/src/Lantern.Beacon/Networking/PeerState.cs @@ -17,10 +17,6 @@ public class PeerState : IPeerState public MetaData MetaData { get; private set; } = MetaData.CreateDefault(); - public int ConnectedPeersCount { get; set; } - - public int DisconnectedPeersCount { get; set; } - public void Init(IEnumerable appLayerProtocols) { AppLayerProtocols = appLayerProtocols; From 0555e66c1bfadca4e34e270af4415ca05c226f25 Mon Sep 17 00:00:00 2001 From: uink45 <79078981+uink45@users.noreply.github.com> Date: Thu, 17 Oct 2024 14:00:01 +1000 Subject: [PATCH 2/2] Fix failing test. --- src/Lantern.Beacon/BeaconClientManager.cs | 2 +- test/Lantern.Beacon.Tests/BeaconClientManagerTests.cs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/Lantern.Beacon/BeaconClientManager.cs b/src/Lantern.Beacon/BeaconClientManager.cs index 5b534dc..af063b4 100644 --- a/src/Lantern.Beacon/BeaconClientManager.cs +++ b/src/Lantern.Beacon/BeaconClientManager.cs @@ -403,7 +403,7 @@ private async Task MonitorSyncStatus(CancellationToken token) peer.Address.Get().Value.ToString(), peer.Address.Get().Value.ToString(), peer.Address.Get().Value.ToString()); - + peerState.BootstrapPeers.TryRemove(peer.Address.GetPeerId()!, out _); await DialPeerWithProtocol(peer, token); } diff --git a/test/Lantern.Beacon.Tests/BeaconClientManagerTests.cs b/test/Lantern.Beacon.Tests/BeaconClientManagerTests.cs index a43ce6d..e471693 100644 --- a/test/Lantern.Beacon.Tests/BeaconClientManagerTests.cs +++ b/test/Lantern.Beacon.Tests/BeaconClientManagerTests.cs @@ -455,7 +455,7 @@ public async Task DialPeer_ShouldReturnIfPeerAddressIsNull() [Test] public async Task DialPeer_ShouldSaveValidPeerAsLivePeer() { - var clientOptions = new BeaconClientOptions { EnableDiscovery = false, TargetPeerCount = 1, DialTimeoutSeconds = 1}; + var clientOptions = new BeaconClientOptions { EnableDiscovery = false, TargetPeerCount = 1, DialTimeoutSeconds = 10}; var multiAddress = new Multiaddress().Add("0.0.0.0").Add(0); var mockRemotePeer = new Mock(); var syncOptions = new SyncProtocolOptions() { GenesisValidatorsRoot = new byte[32], GenesisTime = 1606824023, Preset = SizePreset.MainnetPreset }; @@ -480,6 +480,7 @@ public async Task DialPeer_ShouldSaveValidPeerAsLivePeer() _mockPeerState.Setup(x => x.BootstrapPeers).Returns(new ConcurrentDictionary()); _mockPeerState.Setup(x => x.PeerProtocols).Returns(peerProtocols); _mockSyncProtocol.Setup(x => x.DenebLightClientStore).Returns(denebLightClientStore); + _mockSyncProtocol.Setup(x => x.IsInitialised).Returns(true); _mockSyncProtocol.Setup(x => x.Options).Returns(syncOptions); _beaconClientManager = new BeaconClientManager(clientOptions, _mockManualDiscoveryProtocol.Object, _mockLiteDbService.Object, _mockCustomDiscoveryProtocol.Object, _mockPeerState.Object, _mockSyncProtocol.Object, _mockPeerFactory.Object, _mockIdentityManager.Object, _mockLoggerFactory.Object);