Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Mplex protocol; #36

Merged
merged 2 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Lantern.Beacon.Console/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ public static async Task Main()
MaxParallelDials = 10,
EnableDiscovery = true,
GossipSubEnabled = true,
TargetPeerCount = 3,
Bootnodes = [
//"/ip4/162.19.222.38/tcp/15401/p2p/16Uiu2HAmLA7eWnZUnjFQNR7sa8uZumNGA5hPvW6wiWoW1cT2Xkgg"
//"/ip4/116.202.215.20/tcp/9000/p2p/16Uiu2HAmB8gmsy3QGaLcL8gQHF5TUAn6fhQNzNT522xArY2tMhKr"
//"/ip4/88.99.208.221/tcp/9105/p2p/16Uiu2HAkvSit4sbSkr6AdiEzUcHFv7KQrssV1y4QVsDt4EVpVTYU"
"/ip4/15.235.118.102/tcp/9000/p2p/16Uiu2HAmP1BbR9tgtLVbpjy7k46uEar2wResbuWaA3ibJNrzYmkv"
],
SyncProtocolOptions = new SyncProtocolOptions
{
Expand Down
87 changes: 37 additions & 50 deletions src/Lantern.Beacon/BeaconClientManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,7 @@ private async Task DialPeer(Multiaddress peer, CancellationToken token = default
return;
}

var supportsLightClientProtocols =
LightClientProtocols.All.All(protocol => peerProtocols!.Contains(protocol));
var supportsLightClientProtocols = LightClientProtocols.All.All(protocol => peerProtocols!.Contains(protocol));

if (supportsLightClientProtocols)
{
Expand Down Expand Up @@ -384,8 +383,7 @@ private async Task MonitorSyncStatus(CancellationToken token)
{
if (!peerState.BootstrapPeers.IsEmpty)
{
var peer = peerState.BootstrapPeers.Values.ElementAt(
new Random().Next(peerState.BootstrapPeers.Count));
var peer = peerState.BootstrapPeers.Values.ElementAt(new Random().Next(peerState.BootstrapPeers.Count));

if (!syncProtocol.IsInitialised)
{
Expand All @@ -395,26 +393,19 @@ private async Task MonitorSyncStatus(CancellationToken token)
peer.Address.Get<TCP>().Value.ToString(),
peer.Address.Get<P2P>().Value.ToString());

var dialTask = peer.DialAsync<LightClientBootstrapProtocol>(token);
var timeoutTask = Task.Delay(TimeSpan.FromSeconds(clientOptions.DialTimeoutSeconds), token);

await Task.WhenAny(dialTask, timeoutTask);
await DialPeerWithProtocol<LightClientBootstrapProtocol>(peer, token);
}

if (!syncProtocol.IsInitialised)
{
_logger.LogWarning(
"Failed to initialize light client from peer: /ip4/{Ip4}/tcp/{TcpPort}/p2p/{PeerId} Disconnecting...",
"Failed to initialize light client from peer: /ip4/{Ip4}/tcp/{TcpPort}/p2p/{PeerId}. Disconnecting...",
peer.Address.Get<IP4>().Value.ToString(),
peer.Address.Get<TCP>().Value.ToString(),
peer.Address.Get<P2P>().Value.ToString());

peerState.BootstrapPeers.TryRemove(peer.Address.GetPeerId()!, out _);

var goodbyeTask = peer.DialAsync<GoodbyeProtocol>(token);
var timeoutTask = Task.Delay(TimeSpan.FromSeconds(clientOptions.DialTimeoutSeconds), token);

await Task.WhenAny(goodbyeTask, timeoutTask);
peerState.BootstrapPeers.TryRemove(peer.Address.GetPeerId()!, out _);
await DialPeerWithProtocol<GoodbyeProtocol>(peer, token);
}
else
{
Expand Down Expand Up @@ -482,10 +473,7 @@ private async Task SyncDenebForkAsync(IRemotePeer peer, CancellationToken token
denebOptimisticPeriod
);

var dialTask = peer.DialAsync<LightClientUpdatesByRangeProtocol>(token);
var timeoutTask = Task.Delay(TimeSpan.FromSeconds(clientOptions.DialTimeoutSeconds), token);

await Task.WhenAny(dialTask, timeoutTask);
await DialPeerWithProtocol<LightClientUpdatesByRangeProtocol>(peer, token);
}

if (denebFinalizedPeriod + 1 < denebCurrentPeriod)
Expand All @@ -501,10 +489,7 @@ private async Task SyncDenebForkAsync(IRemotePeer peer, CancellationToken token
count
);

var dialTask = peer.DialAsync<LightClientUpdatesByRangeProtocol>(token);
var timeoutTask = Task.Delay(TimeSpan.FromSeconds(clientOptions.DialTimeoutSeconds), token);

await Task.WhenAny(dialTask, timeoutTask);
await DialPeerWithProtocol<LightClientUpdatesByRangeProtocol>(peer, token);
}
}

Expand Down Expand Up @@ -542,10 +527,7 @@ private async Task SyncCapellaForkAsync(IRemotePeer peer, CancellationToken toke
capellaOptimisticPeriod
);

var dialTask = peer.DialAsync<LightClientUpdatesByRangeProtocol>(token);
var timeoutTask = Task.Delay(TimeSpan.FromSeconds(clientOptions.DialTimeoutSeconds), token);

await Task.WhenAny(dialTask, timeoutTask);
await DialPeerWithProtocol<LightClientUpdatesByRangeProtocol>(peer, token);
}

if (capellaFinalizedPeriod + 1 < capellaCurrentPeriod)
Expand All @@ -562,10 +544,7 @@ private async Task SyncCapellaForkAsync(IRemotePeer peer, CancellationToken toke
count
);

var dialTask = peer.DialAsync<LightClientUpdatesByRangeProtocol>(token);
var timeoutTask = Task.Delay(TimeSpan.FromSeconds(clientOptions.DialTimeoutSeconds), token);

await Task.WhenAny(dialTask, timeoutTask);
await DialPeerWithProtocol<LightClientUpdatesByRangeProtocol>(peer, token);
}
}

Expand Down Expand Up @@ -603,10 +582,7 @@ private async Task SyncAltairForkAsync(IRemotePeer peer, CancellationToken token
altairOptimisticPeriod
);

var dialTask = peer.DialAsync<LightClientUpdatesByRangeProtocol>(token);
var timeoutTask = Task.Delay(TimeSpan.FromSeconds(clientOptions.DialTimeoutSeconds), token);

await Task.WhenAny(dialTask, timeoutTask);
await DialPeerWithProtocol<LightClientUpdatesByRangeProtocol>(peer, token);
}

if (altairFinalizedPeriod + 1 < altairCurrentPeriod)
Expand All @@ -622,11 +598,26 @@ private async Task SyncAltairForkAsync(IRemotePeer peer, CancellationToken token
startPeriod,
count
);

var dialTask = peer.DialAsync<LightClientUpdatesByRangeProtocol>(token);
var timeoutTask = Task.Delay(TimeSpan.FromSeconds(clientOptions.DialTimeoutSeconds), token);

await Task.WhenAny(dialTask, timeoutTask);
await DialPeerWithProtocol<LightClientUpdatesByRangeProtocol>(peer, token);
}
}

private async Task<bool> DialPeerWithProtocol<T>(IRemotePeer peer, CancellationToken token = default) where T : IProtocol
{
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(token);
timeoutCts.CancelAfter(TimeSpan.FromSeconds(clientOptions.DialTimeoutSeconds));

var dialTask = peer.DialAsync<T>(timeoutCts.Token);

try
{
await dialTask;
return true;
}
catch (OperationCanceledException) when (!token.IsCancellationRequested)
{
return false;
}
}

Expand Down Expand Up @@ -668,11 +659,9 @@ private async Task RunOptimisticUpdateLoopAsync(IRemotePeer peer, CancellationTo

_logger.LogInformation("Requesting optimistic update...");

var dialTask = peer.DialAsync<LightClientOptimisticUpdateProtocol>(token);
var timeoutTask = Task.Delay(TimeSpan.FromSeconds(clientOptions.DialTimeoutSeconds), token);
var completedTask = await Task.WhenAny(dialTask, timeoutTask);

if (completedTask == timeoutTask)
var result = await DialPeerWithProtocol<LightClientOptimisticUpdateProtocol>(peer, token);

if (!result)
{
break;
}
Expand Down Expand Up @@ -727,12 +716,10 @@ private async Task RunFinalityUpdateLoopAsync(IRemotePeer peer, CancellationToke
continue;

_logger.LogInformation("Requesting finality update...");

var dialTask = peer.DialAsync<LightClientFinalityUpdateProtocol>(token);
var timeoutTask = Task.Delay(TimeSpan.FromSeconds(clientOptions.DialTimeoutSeconds), token);
var completedTask = await Task.WhenAny(dialTask, timeoutTask);

if (completedTask == timeoutTask)

var result = await DialPeerWithProtocol<LightClientFinalityUpdateProtocol>(peer, token);

if (!result)
{
break;
}
Expand Down
4 changes: 0 additions & 4 deletions src/Lantern.Beacon/Networking/IPeerState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,4 @@ public interface IPeerState
MetaData MetaData { get; }

void Init(IEnumerable<IProtocol> appLayerProtocols);

int ConnectedPeersCount { get; set; }

int DisconnectedPeersCount { get; set; }
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
using System.Buffers;
using System.Collections.Concurrent;
using Microsoft.Extensions.Logging;
using Multiformats.Address.Protocols;
using Nethermind.Libp2p.Core;
using Nethermind.Libp2p.Core.Exceptions;

Expand Down Expand Up @@ -45,53 +43,10 @@ protected override async Task ConnectAsync(IChannel downChannel, IChannelFactory
context.Connected(context.RemotePeer);

_ = Task.Run(() => HandleSubDialRequests(context, channelFactory, isListener, downChannel, peerState));

var isChannelClosed = false;

_ = Task.Run(async () =>
{
while (!isChannelClosed && !downChannelAwaiter.IsCompleted)
{
await Task.Delay(TimeSpan.FromSeconds(10));
var foundLivePeer = false;

foreach (var gossipPeer in _peerState.GossipPeers.Keys)
{
_logger?.LogDebug("Checking if peer {PeerId} matches ID {Id}", gossipPeer,
context.RemotePeer.Address.GetPeerId());

if (!gossipPeer.Bytes.SequenceEqual(context.RemotePeer.Address.GetPeerId()!.Bytes))
continue;

_logger?.LogDebug("Peer {PeerId} was found in 'GossipPeers'", gossipPeer);
foundLivePeer = true;
break;
}

foreach (var livePeer in _peerState.BootstrapPeers.Keys)
{
_logger?.LogDebug("Checking if peer {PeerId} matches ID {Id}", livePeer,
context.RemotePeer.Address.GetPeerId());

if (!livePeer.Bytes.SequenceEqual(context.RemotePeer.Address.GetPeerId()!.Bytes))
continue;

_logger?.LogDebug("Peer {PeerId} is still in 'LivePeers'", livePeer);
foundLivePeer = true;
break;
}

if (foundLivePeer)
continue;

_logger?.LogDebug("Removing peer {PeerId} from 'GossipPeers'", context.RemotePeer.Address.GetPeerId());
isChannelClosed = true;
}
});

try
{
while (!isChannelClosed && !downChannelAwaiter.IsCompleted)
while (!downChannelAwaiter.IsCompleted)
{
var message = await ReadMessageAsync(downChannel);
await HandleMessageAsync(message, downChannel, channelFactory, context, peerState);
Expand All @@ -100,46 +55,10 @@ protected override async Task ConnectAsync(IChannel downChannel, IChannelFactory
catch (ChannelClosedException ex)
{
_logger?.LogDebug("Closed due to transport disconnection: {Exception}", ex.Message);
isChannelClosed = true;

var foundPeer = false;
foreach (var gossipPeer in _peerState.GossipPeers.Keys)
{
_logger?.LogDebug("Checking if peer {PeerId} matches ID {Id}", gossipPeer,
context.RemotePeer.Address.GetPeerId());

if (!gossipPeer.Bytes.SequenceEqual(context.RemotePeer.Address.GetPeerId()!.Bytes))
continue;

_logger?.LogDebug("Peer {PeerId} was found in 'GossipPeers'", gossipPeer);
foundPeer = true;
break;
}

if(foundPeer)
_peerState.GossipPeers.TryRemove(context.RemotePeer.Address.GetPeerId()!, out _);
}
catch (Exception ex)
{
_logger?.LogDebug("Closed with exception {Exception}", ex.Message);
isChannelClosed = true;

var foundPeer = false;
foreach (var gossipPeer in _peerState.GossipPeers.Keys)
{
_logger?.LogDebug("Checking if peer {PeerId} matches ID {Id}", gossipPeer,
context.RemotePeer.Address.GetPeerId());

if (!gossipPeer.Bytes.SequenceEqual(context.RemotePeer.Address.GetPeerId()!.Bytes))
continue;

_logger?.LogDebug("Peer {PeerId} was found in 'GossipPeers'", gossipPeer);
foundPeer = true;
break;
}

if(foundPeer)
_peerState.GossipPeers.TryRemove(context.RemotePeer.Address.GetPeerId()!, out _);
}

_logger?.LogDebug("Closing all channels");
Expand Down Expand Up @@ -207,8 +126,7 @@ private ChannelState CreateUpChannel(long streamId, IChannelRequest? request, IC
{
tcs?.SetResult();
});

// Initiate background processing of the channel

_ = Task.Run(() => ProcessChannelAsync(downChannel, streamId, upChannel, isListener, channelState));

return channelState;
Expand Down Expand Up @@ -499,7 +417,6 @@ private async Task WriteMessageAsync(IChannel channel, MplexMessage message)
{
try
{
// Calculate sizes first to avoid multiple allocations
var header = (ulong)(message.StreamId << 3) | (ulong)message.Flag;
var headerSize = VarInt.GetSizeInBytes(header);
var lengthSize = VarInt.GetSizeInBytes((ulong)message.Data.Length);
Expand Down
4 changes: 0 additions & 4 deletions src/Lantern.Beacon/Networking/PeerState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ public class PeerState : IPeerState

public MetaData MetaData { get; private set; } = MetaData.CreateDefault();

public int ConnectedPeersCount { get; set; }

public int DisconnectedPeersCount { get; set; }

public void Init(IEnumerable<IProtocol> appLayerProtocols)
{
AppLayerProtocols = appLayerProtocols;
Expand Down
3 changes: 2 additions & 1 deletion test/Lantern.Beacon.Tests/BeaconClientManagerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ public async Task DialPeer_ShouldReturnIfPeerAddressIsNull()
[Test]
public async Task DialPeer_ShouldSaveValidPeerAsLivePeer()
{
var clientOptions = new BeaconClientOptions { EnableDiscovery = false, TargetPeerCount = 1, DialTimeoutSeconds = 1};
var clientOptions = new BeaconClientOptions { EnableDiscovery = false, TargetPeerCount = 1, DialTimeoutSeconds = 10};
var multiAddress = new Multiaddress().Add<IP4>("0.0.0.0").Add<TCP>(0);
var mockRemotePeer = new Mock<IRemotePeer>();
var syncOptions = new SyncProtocolOptions() { GenesisValidatorsRoot = new byte[32], GenesisTime = 1606824023, Preset = SizePreset.MainnetPreset };
Expand All @@ -480,6 +480,7 @@ public async Task DialPeer_ShouldSaveValidPeerAsLivePeer()
_mockPeerState.Setup(x => x.BootstrapPeers).Returns(new ConcurrentDictionary<PeerId, IRemotePeer>());
_mockPeerState.Setup(x => x.PeerProtocols).Returns(peerProtocols);
_mockSyncProtocol.Setup(x => x.DenebLightClientStore).Returns(denebLightClientStore);
_mockSyncProtocol.Setup(x => x.IsInitialised).Returns(true);
_mockSyncProtocol.Setup(x => x.Options).Returns(syncOptions);
_beaconClientManager = new BeaconClientManager(clientOptions, _mockManualDiscoveryProtocol.Object, _mockLiteDbService.Object, _mockCustomDiscoveryProtocol.Object, _mockPeerState.Object, _mockSyncProtocol.Object, _mockPeerFactory.Object, _mockIdentityManager.Object, _mockLoggerFactory.Object);

Expand Down
Loading