From c465de202c917159c2ccfb3c0bf62d0f21f2b688 Mon Sep 17 00:00:00 2001 From: uink45 <79078981+uink45@users.noreply.github.com> Date: Thu, 25 Jul 2024 15:57:58 +1000 Subject: [PATCH] Refactor code; --- .../CustomConsoleLogger.cs | 67 ++++++++++ .../CustomConsoleLoggerProvider.cs | 24 ++++ src/Lantern.Beacon.Console/Program.cs | 21 +++- src/Lantern.Beacon/BeaconClient.cs | 93 ++------------ src/Lantern.Beacon/BeaconClientManager.cs | 4 +- .../BeaconClientPeerFactoryBuilder.cs | 2 +- .../Networking/Gossip/GossipSubManager.cs | 81 +++++++++++- .../Libp2pProtocols/Mplex/MplexProtocol.cs | 11 +- .../Networking/LightClientProtocols.cs | 8 +- .../ReqRespProtocols/GoodbyeProtocol.cs | 75 ++++++----- .../LightClientBootstrapProtocol.cs | 68 +++++++--- .../LightClientFinalityUpdateProtocol.cs | 60 ++++++--- .../LightClientOptimisticUpdateProtocol.cs | 61 ++++++--- .../LightClientUpdatesByRangeProtocol.cs | 69 +++++++--- .../ReqRespProtocols/MetaDataProtocol.cs | 82 ++++++------ .../ReqRespProtocols/PingProtocol.cs | 70 ++++++----- .../ReqRespProtocols/StatusProtocol.cs | 106 ++++++++-------- src/Lantern.Beacon/Storage/LiteDbService.cs | 118 ++++++++++-------- .../BeaconClientManagerTests.cs | 1 - test/Lantern.Beacon.Tests/CoreTests.cs | 6 +- 20 files changed, 649 insertions(+), 378 deletions(-) create mode 100644 src/Lantern.Beacon.Console/CustomConsoleLogger.cs create mode 100644 src/Lantern.Beacon.Console/CustomConsoleLoggerProvider.cs diff --git a/src/Lantern.Beacon.Console/CustomConsoleLogger.cs b/src/Lantern.Beacon.Console/CustomConsoleLogger.cs new file mode 100644 index 0000000..f50a7f6 --- /dev/null +++ b/src/Lantern.Beacon.Console/CustomConsoleLogger.cs @@ -0,0 +1,67 @@ +using Microsoft.Extensions.Logging; + +namespace Lantern.Beacon.Console; + +public class CustomConsoleLogger( + string name, + Func filter, + CustomConsoleLogger.CustomConsoleLoggerConfiguration config) + : ILogger +{ + private readonly string _name = name ?? throw new ArgumentNullException(nameof(name)); + private readonly Func _filter = filter ?? throw new ArgumentNullException(nameof(filter)); + private readonly CustomConsoleLoggerConfiguration _config = config ?? throw new ArgumentNullException(nameof(config)); + + public IDisposable BeginScope(TState state) => null; + + public bool IsEnabled(LogLevel logLevel) => _filter(_config); + + public void Log(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func formatter) + { + if (!IsEnabled(logLevel)) return; + + if (_config.EventId == 0 || _config.EventId == eventId.Id) + { + var timestamp = DateTime.UtcNow.ToString(_config.TimestampPrefix); + var logLevelString = GetLogLevelString(logLevel); + var logMessage = RemoveCategoryAndId(formatter(state, exception)); + System.Console.WriteLine($"{timestamp} [{logLevelString}] {logMessage}"); + } + } + + private static string RemoveCategoryAndId(string message) + { + // Logic to remove the category and eventId from the message + // Assuming the category will be of the pattern 'Category[EventId]: message' + var index = message.IndexOf(']'); + + if (index != -1 && message.Length > index + 1) + { + message = message[(index + 2)..]; + } + + return message; + } + + private string GetLogLevelString(LogLevel logLevel) + { + // Convert log level to custom string representation + return logLevel switch + { + LogLevel.Trace => "Trace", + LogLevel.Debug => "Debug", + LogLevel.Information => "Info", + LogLevel.Warning => "Warning", + LogLevel.Error => "Error", + LogLevel.Critical => "Critical", + LogLevel.None => "None", + _ => "Unknown" + }; + } + + public class CustomConsoleLoggerConfiguration + { + public int EventId { get; set; } + public string TimestampPrefix { get; set; } + } +} \ No newline at end of file diff --git a/src/Lantern.Beacon.Console/CustomConsoleLoggerProvider.cs b/src/Lantern.Beacon.Console/CustomConsoleLoggerProvider.cs new file mode 100644 index 0000000..4f89ca0 --- /dev/null +++ b/src/Lantern.Beacon.Console/CustomConsoleLoggerProvider.cs @@ -0,0 +1,24 @@ +using Microsoft.Extensions.Logging; + +namespace Lantern.Beacon.Console; + +public class CustomConsoleLoggerProvider : ILoggerProvider +{ + private readonly CustomConsoleLogger.CustomConsoleLoggerConfiguration _config; + private readonly Func _filter; + + public CustomConsoleLoggerProvider(Func filter, CustomConsoleLogger.CustomConsoleLoggerConfiguration config) + { + _filter = filter; + _config = config; + } + + public ILogger CreateLogger(string categoryName) + { + return new CustomConsoleLogger(categoryName, _filter, _config); + } + + public void Dispose() + { + } +} \ No newline at end of file diff --git a/src/Lantern.Beacon.Console/Program.cs b/src/Lantern.Beacon.Console/Program.cs index 39a4bc6..302c837 100644 --- a/src/Lantern.Beacon.Console/Program.cs +++ b/src/Lantern.Beacon.Console/Program.cs @@ -113,7 +113,7 @@ public static async Task Main() var libp2p2LoggerFactory = LoggerFactory.Create(builder => { builder - .SetMinimumLevel(LogLevel.Debug) + .SetMinimumLevel(LogLevel.Information) // .AddFilter((category, level) => // { // if (category.StartsWith("Nethermind.Libp2p.Protocols.Pubsub.GossipsubProtocolV11") || category.StartsWith("Nethermind.Libp2p.Protocols.Pubsub.GossipsubProtocol")) @@ -131,6 +131,18 @@ public static async Task Main() l.UseUtcTimestamp = true; }); }); + // var libp2p2LoggerFactory = LoggerFactory.Create(builder => + // { + // builder + // .SetMinimumLevel(LogLevel.Information) + // .AddProvider(new CustomConsoleLoggerProvider( + // config => config.EventId == 0, + // new CustomConsoleLogger.CustomConsoleLoggerConfiguration + // { + // EventId = 0, + // TimestampPrefix = "[HH:mm:ss]" + // })); + // }); services.AddBeaconClient(beaconClientBuilder => { beaconClientBuilder.AddDiscoveryProtocol(discv5Builder => @@ -144,7 +156,10 @@ public static async Task Main() beaconClientBuilder.WithBeaconClientOptions(options => { options.TcpPort = 9005; - options.EnableDiscovery = false; + options.EnableDiscovery = true; + //options.Bootnodes = ["/ip4/50.195.130.74/tcp/9000/p2p/16Uiu2HAkvSLFzPogiUZn1wFEskrUoJt9DGot3PbfeSE5zHqS32FM"]; + //options.Bootnodes = ["/ip4/73.186.232.187/tcp/9105/p2p/16Uiu2HAm37UA7fk8r2AnYtGLbddwkS2WEeSPTsjNDGh3gDW7VUBQ"]; // Teku + //options.Bootnodes = ["/ip4/69.175.102.62/tcp/31018/p2p/16Uiu2HAm2FWXMoKEsshxjXNsXmFwxPAm4eaWmcffFTGgNs3gi4Ww"]; // Erigon //options.Bootnodes = ["/ip4/0.0.0.0/tcp/9000/p2p/16Uiu2HAm6R996q426GYUyExKSYdKxhbD5iYedbuqQovVPTJFVHPv"]; //options.Bootnodes = ["/ip4/135.148.103.80/tcp/9000/p2p/16Uiu2HAkwvVXtZj6u3R2F7hEXpnbDUom3rDepABdDCSzyzAM2k69"]; //options.Bootnodes = ["/ip4/54.38.80.34/tcp/9000/p2p/16Uiu2HAm8t1aQArVwrJ9fwHRGXL2sXumPGTvmsne14piPaFJ5FYi"]; // Lighthouse @@ -158,7 +173,7 @@ public static async Task Main() syncProtocol.Preset = SizePreset.MainnetPreset; syncProtocol.GenesisValidatorsRoot = Convert.FromHexString("4b363db94e286120d76eb905340fdd4e54bfe9f06bf33ff6cf5ad27f511bfe95"); syncProtocol.GenesisTime = 1606824023; - syncProtocol.TrustedBlockRoot = Convert.FromHexString("0185a0716c173da858a048bfe9581c9329cd267fd086eb1f314166e503ec4a90"); + syncProtocol.TrustedBlockRoot = Convert.FromHexString("d3045825aa880bee480fb638f164e49bd887a599b3b89f37ea6bfc4c4f7aadd3"); }); beaconClientBuilder.AddLibp2pProtocol(libp2PBuilder => libp2PBuilder); beaconClientBuilder.WithLoggerFactory(libp2p2LoggerFactory); diff --git a/src/Lantern.Beacon/BeaconClient.cs b/src/Lantern.Beacon/BeaconClient.cs index 7db148e..69513a4 100644 --- a/src/Lantern.Beacon/BeaconClient.cs +++ b/src/Lantern.Beacon/BeaconClient.cs @@ -24,7 +24,14 @@ public async Task InitAsync(CancellationToken token = default) try { liteDbService.Init(); - InitialiseSyncProtocol(); + + var altairStore = liteDbService.Fetch(nameof(AltairLightClientStore)); + var capellaStore = liteDbService.Fetch(nameof(CapellaLightClientStore)); + var denebStore = liteDbService.Fetch(nameof(DenebLightClientStore)); + var finalityUpdate = liteDbService.Fetch(nameof(DenebLightClientFinalityUpdate)); + var optimisticUpdate = liteDbService.Fetch(nameof(DenebLightClientOptimisticUpdate)); + + syncProtocol.Init(altairStore, capellaStore, denebStore, finalityUpdate, optimisticUpdate); peerState.Init(peerFactoryBuilder.AppLayerProtocols); gossipSubManager.Init(); @@ -33,9 +40,6 @@ public async Task InitAsync(CancellationToken token = default) return; } - gossipSubManager.LightClientFinalityUpdate.OnMessage += HandleLightClientFinalityUpdate; - gossipSubManager.LightClientOptimisticUpdate.OnMessage += HandleLightClientOptimisticUpdate; - await beaconClientManager.InitAsync(token); } catch (Exception e) @@ -65,85 +69,4 @@ public async Task StopAsync() await beaconClientManager.StopAsync(); liteDbService.Dispose(); } - - private void HandleLightClientFinalityUpdate(byte[] update) - { - var denebFinalizedPeriod = AltairHelpers.ComputeSyncCommitteePeriod(Phase0Helpers.ComputeEpochAtSlot(syncProtocol.DenebLightClientStore.FinalizedHeader.Beacon.Slot)); - var denebCurrentPeriod = AltairHelpers.ComputeSyncCommitteePeriod(Phase0Helpers.ComputeEpochAtSlot(Phase0Helpers.ComputeCurrentSlot(syncProtocol.Options.GenesisTime))); - var decompressedData = Snappy.Decode(update); - var currentSlot = Phase0Helpers.ComputeCurrentSlot(syncProtocol.Options.GenesisTime); - var lightClientFinalityUpdate = DenebLightClientFinalityUpdate.Deserialize(decompressedData, syncProtocol.Options.Preset); - - _logger.LogInformation("Received light client finality update from gossip"); - - if (denebFinalizedPeriod + 1 < denebCurrentPeriod) - return; - - var oldFinalizedHeader = syncProtocol.DenebLightClientStore.FinalizedHeader; - var result = DenebProcessors.ProcessLightClientFinalityUpdate(syncProtocol.DenebLightClientStore, lightClientFinalityUpdate, currentSlot, syncProtocol.Options, syncProtocol.Logger); - _logger.LogInformation("Processed light client finality update from gossip"); - - if (result) - { - if (!DenebHelpers.ShouldForwardFinalizedLightClientUpdate(lightClientFinalityUpdate, oldFinalizedHeader, syncProtocol)) - return; - - gossipSubManager.LightClientFinalityUpdate!.Publish(update); - _logger.LogInformation("Forwarded light client finality update to gossip"); - - syncProtocol.CurrentLightClientFinalityUpdate = lightClientFinalityUpdate; - liteDbService.Store(nameof(DenebLightClientFinalityUpdate), lightClientFinalityUpdate); - liteDbService.StoreOrUpdate(nameof(DenebLightClientStore), syncProtocol.DenebLightClientStore); - } - else - { - _logger.LogWarning("Failed to process light client finality update from gossip. Ignoring..."); - } - } - - private void HandleLightClientOptimisticUpdate(byte[] update) - { - var denebFinalizedPeriod = AltairHelpers.ComputeSyncCommitteePeriod(Phase0Helpers.ComputeEpochAtSlot(syncProtocol.DenebLightClientStore.FinalizedHeader.Beacon.Slot)); - var denebCurrentPeriod = AltairHelpers.ComputeSyncCommitteePeriod(Phase0Helpers.ComputeEpochAtSlot(Phase0Helpers.ComputeCurrentSlot(syncProtocol.Options.GenesisTime))); - var decompressedData = Snappy.Decode(update); - var currentSlot = Phase0Helpers.ComputeCurrentSlot(syncProtocol.Options.GenesisTime); - var lightClientOptimisticUpdate = DenebLightClientOptimisticUpdate.Deserialize(decompressedData, syncProtocol.Options.Preset); - - _logger.LogInformation("Received light client optimistic update from gossip"); - - if (denebFinalizedPeriod + 1 < denebCurrentPeriod) - return; - - var oldOptimisticHeader = syncProtocol.DenebLightClientStore.OptimisticHeader; - var result = DenebProcessors.ProcessLightClientOptimisticUpdate(syncProtocol.DenebLightClientStore, lightClientOptimisticUpdate, currentSlot, syncProtocol.Options, syncProtocol.Logger); - - if (result) - { - _logger.LogInformation("Processed light client optimistic update from gossip"); - if (!DenebHelpers.ShouldForwardLightClientOptimisticUpdate(lightClientOptimisticUpdate, oldOptimisticHeader, syncProtocol)) - return; - - gossipSubManager.LightClientFinalityUpdate!.Publish(update); - _logger.LogInformation("Forwarded light client optimistic update to gossip"); - - syncProtocol.CurrentLightClientOptimisticUpdate = lightClientOptimisticUpdate; - liteDbService.Store(nameof(DenebLightClientOptimisticUpdate), lightClientOptimisticUpdate); - liteDbService.StoreOrUpdate(nameof(DenebLightClientStore), syncProtocol.DenebLightClientStore); - } - else - { - _logger.LogWarning("Failed to process light client optimistic update from gossip. Ignoring..."); - } - } - - private void InitialiseSyncProtocol() - { - var altairStore = liteDbService.Fetch(nameof(AltairLightClientStore)); - var capellaStore = liteDbService.Fetch(nameof(CapellaLightClientStore)); - var denebStore = liteDbService.Fetch(nameof(DenebLightClientStore)); - var finalityUpdate = liteDbService.Fetch(nameof(DenebLightClientFinalityUpdate)); - var optimisticUpdate = liteDbService.Fetch(nameof(DenebLightClientOptimisticUpdate)); - - syncProtocol.Init(altairStore, capellaStore, denebStore, finalityUpdate, optimisticUpdate); - } } \ No newline at end of file diff --git a/src/Lantern.Beacon/BeaconClientManager.cs b/src/Lantern.Beacon/BeaconClientManager.cs index c745cf5..4e1cd64 100644 --- a/src/Lantern.Beacon/BeaconClientManager.cs +++ b/src/Lantern.Beacon/BeaconClientManager.cs @@ -46,7 +46,6 @@ public async Task InitAsync(CancellationToken token = default) var identity = new Identity(); LocalPeer = peerFactory.Create(identity); - LocalPeer.Address.ReplaceOrAdd(identityManager.Record.GetEntry(EnrEntryKey.Tcp).Value); LocalPeer.Address.ReplaceOrAdd(identityManager.Record.ToPeerId()); @@ -271,7 +270,6 @@ private async Task DialPeer(Multiaddress? peer, CancellationToken token = defaul return; } - // Make it so that if not the light client is not initialised then if the peer supports bootstrap protocol at least then still connect to it var supportsLightClientProtocols = LightClientProtocols.All.All(protocol => peerProtocols!.Contains(protocol)); if (supportsLightClientProtocols) @@ -321,7 +319,7 @@ private async Task RunSyncProtocolAsync(IRemotePeer peer, CancellationToken toke } else { - _logger.LogInformation("Successfully initialised light client. Starting sync"); + _logger.LogInformation("Successfully initialised light client. Started syncing"); var activeFork = syncProtocol.ActiveFork; switch (activeFork) diff --git a/src/Lantern.Beacon/BeaconClientPeerFactoryBuilder.cs b/src/Lantern.Beacon/BeaconClientPeerFactoryBuilder.cs index fef75c4..50703dc 100644 --- a/src/Lantern.Beacon/BeaconClientPeerFactoryBuilder.cs +++ b/src/Lantern.Beacon/BeaconClientPeerFactoryBuilder.cs @@ -28,7 +28,7 @@ protected override ProtocolStack BuildStack() .Over() .Over() .Over() - .Over();//.Or(); + .Over(); return Over() diff --git a/src/Lantern.Beacon/Networking/Gossip/GossipSubManager.cs b/src/Lantern.Beacon/Networking/Gossip/GossipSubManager.cs index 82f4ce8..937cd0e 100644 --- a/src/Lantern.Beacon/Networking/Gossip/GossipSubManager.cs +++ b/src/Lantern.Beacon/Networking/Gossip/GossipSubManager.cs @@ -1,12 +1,17 @@ +using IronSnappy; using Lantern.Beacon.Networking.Discovery; using Lantern.Beacon.Networking.Gossip.Topics; +using Lantern.Beacon.Storage; using Lantern.Beacon.Sync; +using Lantern.Beacon.Sync.Helpers; +using Lantern.Beacon.Sync.Processors; +using Lantern.Beacon.Sync.Types.Ssz.Deneb; using Microsoft.Extensions.Logging; using Nethermind.Libp2p.Protocols.Pubsub; namespace Lantern.Beacon.Networking.Gossip; -public class GossipSubManager(ManualDiscoveryProtocol discoveryProtocol, SyncProtocolOptions syncProtocolOptions, PubsubRouter router, IBeaconClientManager beaconClientManager, ILoggerFactory loggerFactory) : IGossipSubManager +public class GossipSubManager(ManualDiscoveryProtocol discoveryProtocol, SyncProtocolOptions syncProtocolOptions, PubsubRouter router, IBeaconClientManager beaconClientManager, ISyncProtocol syncProtocol, ILiteDbService liteDbService, ILoggerFactory loggerFactory) : IGossipSubManager { private readonly ILogger _logger = loggerFactory.CreateLogger(); private CancellationTokenSource? _cancellationTokenSource; @@ -18,7 +23,9 @@ public void Init() { LightClientFinalityUpdate = router.Subscribe(LightClientFinalityUpdateTopic.GetTopicString(syncProtocolOptions)); LightClientOptimisticUpdate = router.Subscribe(LightClientOptimisticUpdateTopic.GetTopicString(syncProtocolOptions)); - + LightClientFinalityUpdate.OnMessage += HandleLightClientFinalityUpdate; + LightClientOptimisticUpdate.OnMessage += HandleLightClientOptimisticUpdate; + _logger.LogDebug("Subscribed to topic: {LightClientFinalityUpdate}", LightClientFinalityUpdateTopic.GetTopicString(syncProtocolOptions)); _logger.LogDebug("Subscribed to topic: {LightClientOptimisticUpdate}", LightClientOptimisticUpdateTopic.GetTopicString(syncProtocolOptions)); } @@ -63,4 +70,74 @@ public async Task StopAsync() _cancellationTokenSource.Dispose(); _cancellationTokenSource = null; } + + private void HandleLightClientFinalityUpdate(byte[] update) + { + var denebFinalizedPeriod = AltairHelpers.ComputeSyncCommitteePeriod(Phase0Helpers.ComputeEpochAtSlot(syncProtocol.DenebLightClientStore.FinalizedHeader.Beacon.Slot)); + var denebCurrentPeriod = AltairHelpers.ComputeSyncCommitteePeriod(Phase0Helpers.ComputeEpochAtSlot(Phase0Helpers.ComputeCurrentSlot(syncProtocol.Options.GenesisTime))); + var decompressedData = Snappy.Decode(update); + var currentSlot = Phase0Helpers.ComputeCurrentSlot(syncProtocol.Options.GenesisTime); + var lightClientFinalityUpdate = DenebLightClientFinalityUpdate.Deserialize(decompressedData, syncProtocol.Options.Preset); + + _logger.LogInformation("Received light client finality update from gossip for slot {Slot}", lightClientFinalityUpdate.SignatureSlot); + + if (denebFinalizedPeriod + 1 < denebCurrentPeriod) + return; + + var oldFinalizedHeader = syncProtocol.DenebLightClientStore.FinalizedHeader; + var result = DenebProcessors.ProcessLightClientFinalityUpdate(syncProtocol.DenebLightClientStore, lightClientFinalityUpdate, currentSlot, syncProtocol.Options, syncProtocol.Logger); + _logger.LogInformation("Processed light client finality update from gossip"); + + if (result) + { + if (!DenebHelpers.ShouldForwardFinalizedLightClientUpdate(lightClientFinalityUpdate, oldFinalizedHeader, syncProtocol)) + return; + + LightClientFinalityUpdate!.Publish(update); + _logger.LogInformation("Forwarded light client finality update to gossip"); + + syncProtocol.CurrentLightClientFinalityUpdate = lightClientFinalityUpdate; + liteDbService.Store(nameof(DenebLightClientFinalityUpdate), lightClientFinalityUpdate); + liteDbService.StoreOrUpdate(nameof(DenebLightClientStore), syncProtocol.DenebLightClientStore); + } + else + { + _logger.LogWarning("Failed to process light client finality update from gossip. Ignoring..."); + } + } + + private void HandleLightClientOptimisticUpdate(byte[] update) + { + var denebFinalizedPeriod = AltairHelpers.ComputeSyncCommitteePeriod(Phase0Helpers.ComputeEpochAtSlot(syncProtocol.DenebLightClientStore.FinalizedHeader.Beacon.Slot)); + var denebCurrentPeriod = AltairHelpers.ComputeSyncCommitteePeriod(Phase0Helpers.ComputeEpochAtSlot(Phase0Helpers.ComputeCurrentSlot(syncProtocol.Options.GenesisTime))); + var decompressedData = Snappy.Decode(update); + var currentSlot = Phase0Helpers.ComputeCurrentSlot(syncProtocol.Options.GenesisTime); + var lightClientOptimisticUpdate = DenebLightClientOptimisticUpdate.Deserialize(decompressedData, syncProtocol.Options.Preset); + + _logger.LogInformation("Received light client optimistic update from gossip for slot {Slot}", lightClientOptimisticUpdate.SignatureSlot); + + if (denebFinalizedPeriod + 1 < denebCurrentPeriod) + return; + + var oldOptimisticHeader = syncProtocol.DenebLightClientStore.OptimisticHeader; + var result = DenebProcessors.ProcessLightClientOptimisticUpdate(syncProtocol.DenebLightClientStore, lightClientOptimisticUpdate, currentSlot, syncProtocol.Options, syncProtocol.Logger); + + if (result) + { + _logger.LogInformation("Processed light client optimistic update from gossip"); + if (!DenebHelpers.ShouldForwardLightClientOptimisticUpdate(lightClientOptimisticUpdate, oldOptimisticHeader, syncProtocol)) + return; + + LightClientFinalityUpdate!.Publish(update); + _logger.LogInformation("Forwarded light client optimistic update to gossip"); + + syncProtocol.CurrentLightClientOptimisticUpdate = lightClientOptimisticUpdate; + liteDbService.Store(nameof(DenebLightClientOptimisticUpdate), lightClientOptimisticUpdate); + liteDbService.StoreOrUpdate(nameof(DenebLightClientStore), syncProtocol.DenebLightClientStore); + } + else + { + _logger.LogWarning("Failed to process light client optimistic update from gossip. Ignoring..."); + } + } } \ No newline at end of file diff --git a/src/Lantern.Beacon/Networking/Libp2pProtocols/Mplex/MplexProtocol.cs b/src/Lantern.Beacon/Networking/Libp2pProtocols/Mplex/MplexProtocol.cs index 0df3d37..55cd933 100644 --- a/src/Lantern.Beacon/Networking/Libp2pProtocols/Mplex/MplexProtocol.cs +++ b/src/Lantern.Beacon/Networking/Libp2pProtocols/Mplex/MplexProtocol.cs @@ -65,12 +65,18 @@ protected override async Task ConnectAsync(IChannel downChannel, IChannelFactory foreach (var upChannel in peerState.InitiatorChannels.Values) { - _ = upChannel.Channel?.CloseAsync(); + if (upChannel.Channel != null) + { + await upChannel.Channel.CloseAsync(); + } } foreach (var upChannel in peerState.ReceiverChannels.Values) { - _ = upChannel.Channel?.CloseAsync(); + if(upChannel.Channel != null) + { + await upChannel.Channel.CloseAsync(); + } } peerState.InitiatorChannels.Clear(); @@ -422,7 +428,6 @@ private async Task WriteMessageAsync(IChannel channel, MplexMessage message) var headerSize = VarInt.GetSizeInBytes(header); var lengthSize = VarInt.GetSizeInBytes((ulong)message.Data.Length); var totalSize = headerSize + lengthSize + (int)message.Data.Length; - var buffer = new byte[totalSize]; var offset = 0; diff --git a/src/Lantern.Beacon/Networking/LightClientProtocols.cs b/src/Lantern.Beacon/Networking/LightClientProtocols.cs index 38b82d7..f023c5b 100644 --- a/src/Lantern.Beacon/Networking/LightClientProtocols.cs +++ b/src/Lantern.Beacon/Networking/LightClientProtocols.cs @@ -2,10 +2,10 @@ namespace Lantern.Beacon.Networking; public static class LightClientProtocols { - public const string LightClientBootstrap = "/eth2/beacon_chain/req/light_client_bootstrap/1/ssz_snappy"; - public const string LightClientFinalityUpdate = "/eth2/beacon_chain/req/light_client_finality_update/1/ssz_snappy"; - public const string LightClientOptimisticUpdate = "/eth2/beacon_chain/req/light_client_optimistic_update/1/ssz_snappy"; - public const string LightClientUpdatesByRange = "/eth2/beacon_chain/req/light_client_updates_by_range/1/ssz_snappy"; + private const string LightClientBootstrap = "/eth2/beacon_chain/req/light_client_bootstrap/1/ssz_snappy"; + private const string LightClientFinalityUpdate = "/eth2/beacon_chain/req/light_client_finality_update/1/ssz_snappy"; + private const string LightClientOptimisticUpdate = "/eth2/beacon_chain/req/light_client_optimistic_update/1/ssz_snappy"; + private const string LightClientUpdatesByRange = "/eth2/beacon_chain/req/light_client_updates_by_range/1/ssz_snappy"; public static readonly IReadOnlyList All = new List { diff --git a/src/Lantern.Beacon/Networking/ReqRespProtocols/GoodbyeProtocol.cs b/src/Lantern.Beacon/Networking/ReqRespProtocols/GoodbyeProtocol.cs index 04b4ba7..e254536 100644 --- a/src/Lantern.Beacon/Networking/ReqRespProtocols/GoodbyeProtocol.cs +++ b/src/Lantern.Beacon/Networking/ReqRespProtocols/GoodbyeProtocol.cs @@ -76,42 +76,53 @@ public async Task DialAsync(IChannel downChannel, IChannelFactory? upChannelFact public async Task ListenAsync(IChannel downChannel, IChannelFactory? upChannelFactory, IPeerContext context) { - _logger?.LogDebug("Listening for goodbye response from {PeerId}", context.RemotePeer.Address); - - var receivedData = new List(); - - await foreach (var readOnlySequence in downChannel.ReadAllAsync()) - { - receivedData.Add(readOnlySequence.ToArray()); - } - - var flatData = receivedData.SelectMany(x => x).ToArray(); - var result = ReqRespHelpers.DecodeRequest(flatData); - - if (result == null) + _logger?.LogInformation("Listening for goodbye response from {PeerId}", context.RemotePeer.Address); + + try { - _logger?.LogError("Failed to decode ping request from {PeerId}", context.RemotePeer.Address.Get()); - await downChannel.CloseAsync(); - return; - } - - var goodbyeResponse = Goodbye.Deserialize(result); - - _logger?.LogInformation("Received goodbye response from {PeerId} with reason {Reason}", context.RemotePeer.Address.Get(), (GoodbyeReasonCodes)goodbyeResponse.Reason); - - var goodbye = Goodbye.CreateFrom(goodbyeResponse.Reason); - var sszData = Goodbye.Serialize(goodbye); - var payload = ReqRespHelpers.EncodeResponse(sszData, ResponseCodes.Success); - var rawData = new ReadOnlySequence(payload); - - await downChannel.WriteAsync(rawData); + var receivedData = new List(); - _logger?.LogDebug("Sent goodbye response to {PeerId} with reason {Reason}", context.RemotePeer.Address.Get(), (GoodbyeReasonCodes)goodbyeResponse.Reason); + await foreach (var readOnlySequence in downChannel.ReadAllAsync()) + { + receivedData.Add(readOnlySequence.ToArray()); + } - if (peerState.LivePeers.ContainsKey(context.RemotePeer.Address.GetPeerId()!)) + var flatData = receivedData.SelectMany(x => x).ToArray(); + var result = ReqRespHelpers.DecodeRequest(flatData); + + if (result == null) + { + _logger?.LogError("Failed to decode ping request from {PeerId}", context.RemotePeer.Address.Get()); + await downChannel.CloseAsync(); + return; + } + + var goodbyeResponse = Goodbye.Deserialize(result); + + _logger?.LogInformation("Received goodbye response from {PeerId} with reason {Reason}", + context.RemotePeer.Address.Get(), (GoodbyeReasonCodes)goodbyeResponse.Reason); + + var goodbye = Goodbye.CreateFrom(goodbyeResponse.Reason); + var sszData = Goodbye.Serialize(goodbye); + var payload = ReqRespHelpers.EncodeResponse(sszData, ResponseCodes.Success); + var rawData = new ReadOnlySequence(payload); + + await downChannel.WriteAsync(rawData); + + _logger?.LogDebug("Sent goodbye response to {PeerId} with reason {Reason}", + context.RemotePeer.Address.Get(), (GoodbyeReasonCodes)goodbyeResponse.Reason); + + if (peerState.LivePeers.ContainsKey(context.RemotePeer.Address.GetPeerId()!)) + { + peerState.LivePeers.TryRemove(context.RemotePeer.Address.GetPeerId()!, out var peer); + _logger?.LogInformation("Removed peer {PeerId} from live peers", context.RemotePeer.Address.Get()); + } + + } + catch (Exception ex) { - peerState.LivePeers.TryRemove(context.RemotePeer.Address.GetPeerId()!, out var peer); - _logger?.LogInformation("Removed peer {PeerId} from live peers", context.RemotePeer.Address.Get()); + _logger?.LogError(ex, "Error while listening for goodbye request from {PeerId}", context.RemotePeer.Address.Get()); + await downChannel.CloseAsync(); } } } \ No newline at end of file diff --git a/src/Lantern.Beacon/Networking/ReqRespProtocols/LightClientBootstrapProtocol.cs b/src/Lantern.Beacon/Networking/ReqRespProtocols/LightClientBootstrapProtocol.cs index 662ff7d..3d7a2b9 100644 --- a/src/Lantern.Beacon/Networking/ReqRespProtocols/LightClientBootstrapProtocol.cs +++ b/src/Lantern.Beacon/Networking/ReqRespProtocols/LightClientBootstrapProtocol.cs @@ -64,31 +64,63 @@ public async Task DialAsync(IChannel downChannel, IChannelFactory? upChannelFact { case ForkType.Deneb: var denebLightClientBootstrap = DenebLightClientBootstrap.Deserialize(result.Item3, syncProtocol.Options.Preset); - syncProtocol.InitialiseStoreFromDenebBootstrap(syncProtocol.Options.TrustedBlockRoot, denebLightClientBootstrap); - syncProtocol.SetActiveFork(ForkType.Deneb); - liteDbService.Store(nameof(DenebLightClientBootstrap), denebLightClientBootstrap); - _logger?.LogInformation("Processed light client bootstrap from {PeerId} for fork {ForkType}", context.RemotePeer.Address.Get(), forkType); + var denebResult = syncProtocol.InitialiseStoreFromDenebBootstrap(syncProtocol.Options.TrustedBlockRoot, denebLightClientBootstrap); + + if (denebResult) + { + syncProtocol.SetActiveFork(ForkType.Deneb); + liteDbService.Store(nameof(DenebLightClientBootstrap), denebLightClientBootstrap); + _logger?.LogInformation("Processed light client bootstrap from {PeerId} for fork {ForkType}", context.RemotePeer.Address.Get(), forkType); + } + else + { + _logger?.LogError("Failed to process light client bootstrap from {PeerId} for fork {ForkType}", context.RemotePeer.Address.Get(), forkType); + } break; case ForkType.Capella: var capellaLightClientBootstrap = CapellaLightClientBootstrap.Deserialize(result.Item3, syncProtocol.Options.Preset); - syncProtocol.InitialiseStoreFromCapellaBootstrap(syncProtocol.Options.TrustedBlockRoot, capellaLightClientBootstrap); - syncProtocol.SetActiveFork(ForkType.Capella); - liteDbService.Store(nameof(CapellaLightClientBootstrap), capellaLightClientBootstrap); - _logger?.LogInformation("Processed light client bootstrap from {PeerId} for fork {ForkType}", context.RemotePeer.Address.Get(), forkType); + var capellaResult = syncProtocol.InitialiseStoreFromCapellaBootstrap(syncProtocol.Options.TrustedBlockRoot, capellaLightClientBootstrap); + + if (capellaResult) + { + syncProtocol.SetActiveFork(ForkType.Capella); + liteDbService.Store(nameof(CapellaLightClientBootstrap), capellaLightClientBootstrap); + _logger?.LogInformation("Processed light client bootstrap from {PeerId} for fork {ForkType}", context.RemotePeer.Address.Get(), forkType); + } + else + { + _logger?.LogError("Failed to process light client bootstrap from {PeerId} for fork {ForkType}", context.RemotePeer.Address.Get(), forkType); + } break; case ForkType.Bellatrix: var bellatrixLightClientBootstrap = AltairLightClientBootstrap.Deserialize(result.Item3, syncProtocol.Options.Preset); - syncProtocol.InitialiseStoreFromAltairBootstrap(syncProtocol.Options.TrustedBlockRoot, bellatrixLightClientBootstrap); - syncProtocol.SetActiveFork(ForkType.Bellatrix); - liteDbService.Store(nameof(AltairLightClientBootstrap), bellatrixLightClientBootstrap); - _logger?.LogInformation("Processed light client bootstrap from {PeerId} for fork {ForkType}", context.RemotePeer.Address.Get(), forkType); + var bellatrixResult = syncProtocol.InitialiseStoreFromAltairBootstrap(syncProtocol.Options.TrustedBlockRoot, bellatrixLightClientBootstrap); + + if (bellatrixResult) + { + syncProtocol.SetActiveFork(ForkType.Bellatrix); + liteDbService.Store(nameof(AltairLightClientBootstrap), bellatrixLightClientBootstrap); + _logger?.LogInformation("Processed light client bootstrap from {PeerId} for fork {ForkType}", context.RemotePeer.Address.Get(), forkType); + } + else + { + _logger?.LogError("Failed to process light client bootstrap from {PeerId} for fork {ForkType}", context.RemotePeer.Address.Get(), forkType); + } break; case ForkType.Altair: var altairLightClientBootstrap = AltairLightClientBootstrap.Deserialize(result.Item3, syncProtocol.Options.Preset); - syncProtocol.InitialiseStoreFromAltairBootstrap(syncProtocol.Options.TrustedBlockRoot, altairLightClientBootstrap); - syncProtocol.SetActiveFork(ForkType.Altair); - liteDbService.Store(nameof(AltairLightClientBootstrap), altairLightClientBootstrap); - _logger?.LogInformation("Processed light client bootstrap from {PeerId} for fork {ForkType}", context.RemotePeer.Address.Get(), forkType); + var altairResult = syncProtocol.InitialiseStoreFromAltairBootstrap(syncProtocol.Options.TrustedBlockRoot, altairLightClientBootstrap); + + if (altairResult) + { + syncProtocol.SetActiveFork(ForkType.Altair); + liteDbService.Store(nameof(AltairLightClientBootstrap), altairLightClientBootstrap); + _logger?.LogInformation("Processed light client bootstrap from {PeerId} for fork {ForkType}", context.RemotePeer.Address.Get(), forkType); + } + else + { + _logger?.LogError("Failed to process light client bootstrap from {PeerId} for fork {ForkType}", context.RemotePeer.Address.Get(), forkType); + } break; case ForkType.Phase0: _logger?.LogError("Received light client bootstrap response with unexpected fork type from {PeerId}", context.RemotePeer.Address.Get()); @@ -147,13 +179,13 @@ public async Task ListenAsync(IChannel downChannel, IChannelFactory? upChannelFa } else { - _logger?.LogDebug("Sending light client bootstrap response to {PeerId}", - context.RemotePeer.Address.Get()); var sszData = DenebLightClientBootstrap.Serialize(response, syncProtocol.Options.Preset); var encodedResponse = ReqRespHelpers.EncodeResponse(sszData, forkDigest, ResponseCodes.Success); var rawData = new ReadOnlySequence(encodedResponse); await downChannel.WriteAsync(rawData); + _logger?.LogDebug("Sent light client bootstrap response to {PeerId}", + context.RemotePeer.Address.Get()); } } catch (Exception ex) diff --git a/src/Lantern.Beacon/Networking/ReqRespProtocols/LightClientFinalityUpdateProtocol.cs b/src/Lantern.Beacon/Networking/ReqRespProtocols/LightClientFinalityUpdateProtocol.cs index 6eef3e6..79cce98 100644 --- a/src/Lantern.Beacon/Networking/ReqRespProtocols/LightClientFinalityUpdateProtocol.cs +++ b/src/Lantern.Beacon/Networking/ReqRespProtocols/LightClientFinalityUpdateProtocol.cs @@ -58,27 +58,59 @@ public async Task DialAsync(IChannel downChannel, IChannelFactory? upChannelFact { case ForkType.Deneb: var denebLightClientFinalityUpdate = DenebLightClientFinalityUpdate.Deserialize(result.Item3, syncProtocol.Options.Preset); - DenebProcessors.ProcessLightClientFinalityUpdate(syncProtocol.DenebLightClientStore, denebLightClientFinalityUpdate, currentSlot, syncProtocol.Options, syncProtocol.Logger); - liteDbService.StoreOrUpdate(nameof(DenebLightClientStore), syncProtocol.DenebLightClientStore); - _logger?.LogInformation("Processed light client finality update from {PeerId}", context.RemotePeer.Address.Get()); + var denebResult = DenebProcessors.ProcessLightClientFinalityUpdate(syncProtocol.DenebLightClientStore, denebLightClientFinalityUpdate, currentSlot, syncProtocol.Options, syncProtocol.Logger); + + if (denebResult) + { + liteDbService.StoreOrUpdate(nameof(DenebLightClientStore), syncProtocol.DenebLightClientStore); + _logger?.LogInformation("Processed light client finality update from {PeerId}", context.RemotePeer.Address.Get()); + } + else + { + _logger?.LogError("Failed to process light client finality update from {PeerId}", context.RemotePeer.Address.Get()); + } break; case ForkType.Capella: var capellaLightClientFinalityUpdate = CapellaLightClientFinalityUpdate.Deserialize(result.Item3, syncProtocol.Options.Preset); - CapellaProcessors.ProcessLightClientFinalityUpdate(syncProtocol.CapellaLightClientStore, capellaLightClientFinalityUpdate, currentSlot, syncProtocol.Options, syncProtocol.Logger); - liteDbService.StoreOrUpdate(nameof(DenebLightClientStore), syncProtocol.DenebLightClientStore); - _logger?.LogInformation("Processed light client finality update from {PeerId}", context.RemotePeer.Address.Get()); + var capellaResult = CapellaProcessors.ProcessLightClientFinalityUpdate(syncProtocol.CapellaLightClientStore, capellaLightClientFinalityUpdate, currentSlot, syncProtocol.Options, syncProtocol.Logger); + + if (capellaResult) + { + liteDbService.StoreOrUpdate(nameof(DenebLightClientStore), syncProtocol.DenebLightClientStore); + _logger?.LogInformation("Processed light client finality update from {PeerId}", context.RemotePeer.Address.Get()); + } + else + { + _logger?.LogError("Failed to process light client finality update from {PeerId}", context.RemotePeer.Address.Get()); + } break; case ForkType.Bellatrix: var bellatrixLightClientFinalityUpdate = AltairLightClientFinalityUpdate.Deserialize(result.Item3, syncProtocol.Options.Preset); - AltairProcessors.ProcessLightClientFinalityUpdate(syncProtocol.AltairLightClientStore, bellatrixLightClientFinalityUpdate, currentSlot, syncProtocol.Options, syncProtocol.Logger); - liteDbService.StoreOrUpdate(nameof(DenebLightClientStore), syncProtocol.DenebLightClientStore); - _logger?.LogInformation("Processed light client finality update from {PeerId}", context.RemotePeer.Address.Get()); + var bellatrixResult = AltairProcessors.ProcessLightClientFinalityUpdate(syncProtocol.AltairLightClientStore, bellatrixLightClientFinalityUpdate, currentSlot, syncProtocol.Options, syncProtocol.Logger); + + if (bellatrixResult) + { + liteDbService.StoreOrUpdate(nameof(DenebLightClientStore), syncProtocol.DenebLightClientStore); + _logger?.LogInformation("Processed light client finality update from {PeerId}", context.RemotePeer.Address.Get()); + } + else + { + _logger?.LogError("Failed to process light client finality update from {PeerId}", context.RemotePeer.Address.Get()); + } break; case ForkType.Altair: var altairLightClientFinalityUpdate = AltairLightClientFinalityUpdate.Deserialize(result.Item3, syncProtocol.Options.Preset); - AltairProcessors.ProcessLightClientFinalityUpdate(syncProtocol.AltairLightClientStore, altairLightClientFinalityUpdate, currentSlot, syncProtocol.Options, syncProtocol.Logger); - liteDbService.StoreOrUpdate(nameof(DenebLightClientStore), syncProtocol.DenebLightClientStore); - _logger?.LogInformation("Processed light client finality update from {PeerId}", context.RemotePeer.Address.Get()); + var altairResult = AltairProcessors.ProcessLightClientFinalityUpdate(syncProtocol.AltairLightClientStore, altairLightClientFinalityUpdate, currentSlot, syncProtocol.Options, syncProtocol.Logger); + + if (altairResult) + { + liteDbService.StoreOrUpdate(nameof(DenebLightClientStore), syncProtocol.DenebLightClientStore); + _logger?.LogInformation("Processed light client finality update from {PeerId}", context.RemotePeer.Address.Get()); + } + else + { + _logger?.LogError("Failed to process light client finality update from {PeerId}", context.RemotePeer.Address.Get()); + } break; case ForkType.Phase0: _logger?.LogError("Received light client finality response with unexpected fork type from {PeerId}", context.RemotePeer.Address.Get()); @@ -87,7 +119,7 @@ public async Task DialAsync(IChannel downChannel, IChannelFactory? upChannelFact } catch (Exception e) { - _logger?.LogError(e, "Failed to receieve light client finality update from {PeerId}", context.RemotePeer.Address.Get()); + _logger?.LogError(e, "Failed to receive light client finality update from {PeerId}", context.RemotePeer.Address.Get()); await downChannel.CloseAsync(); } } @@ -112,12 +144,12 @@ public async Task ListenAsync(IChannel downChannel, IChannelFactory? upChannelFa } else { - _logger?.LogInformation("Sending light client finality update response to {PeerId}", context.RemotePeer.Address.Get()); var sszData = DenebLightClientFinalityUpdate.Serialize(response, syncProtocol.Options.Preset); var encodedResponse = ReqRespHelpers.EncodeResponse(sszData, forkDigest, ResponseCodes.Success); var rawData = new ReadOnlySequence(encodedResponse); await downChannel.WriteAsync(rawData); + _logger?.LogInformation("Sent light client finality update response to {PeerId}", context.RemotePeer.Address.Get()); } } catch (Exception ex) diff --git a/src/Lantern.Beacon/Networking/ReqRespProtocols/LightClientOptimisticUpdateProtocol.cs b/src/Lantern.Beacon/Networking/ReqRespProtocols/LightClientOptimisticUpdateProtocol.cs index fb9d5fe..bfc4357 100644 --- a/src/Lantern.Beacon/Networking/ReqRespProtocols/LightClientOptimisticUpdateProtocol.cs +++ b/src/Lantern.Beacon/Networking/ReqRespProtocols/LightClientOptimisticUpdateProtocol.cs @@ -57,27 +57,59 @@ public async Task DialAsync(IChannel downChannel, IChannelFactory? upChannelFact { case ForkType.Deneb: var denebLightClientOptimisticUpdate = DenebLightClientOptimisticUpdate.Deserialize(result.Item3, syncProtocol.Options.Preset); - DenebProcessors.ProcessLightClientOptimisticUpdate(syncProtocol.DenebLightClientStore, denebLightClientOptimisticUpdate, currentSlot, syncProtocol.Options, syncProtocol.Logger); - liteDbService.StoreOrUpdate(nameof(DenebLightClientStore), syncProtocol.DenebLightClientStore); - _logger?.LogInformation("Processed light client optimistic update from {PeerId}", context.RemotePeer.Address.Get()); + var denebResult = DenebProcessors.ProcessLightClientOptimisticUpdate(syncProtocol.DenebLightClientStore, denebLightClientOptimisticUpdate, currentSlot, syncProtocol.Options, syncProtocol.Logger); + + if (denebResult) + { + liteDbService.StoreOrUpdate(nameof(DenebLightClientStore), syncProtocol.DenebLightClientStore); + _logger?.LogInformation("Processed light client optimistic update from {PeerId}", context.RemotePeer.Address.Get()); + } + else + { + _logger?.LogError("Failed to process light client optimistic update from {PeerId}", context.RemotePeer.Address.Get()); + } break; case ForkType.Capella: var capellaLightClientOptimisticUpdate = CapellaLightClientOptimisticUpdate.Deserialize(result.Item3, syncProtocol.Options.Preset); - CapellaProcessors.ProcessLightClientOptimisticUpdate(syncProtocol.CapellaLightClientStore, capellaLightClientOptimisticUpdate, currentSlot, syncProtocol.Options, syncProtocol.Logger); - liteDbService.StoreOrUpdate(nameof(CapellaLightClientStore), syncProtocol.CapellaLightClientStore); - _logger?.LogInformation("Processed light client optimistic update from {PeerId}", context.RemotePeer.Address.Get()); + var capellaResult = CapellaProcessors.ProcessLightClientOptimisticUpdate(syncProtocol.CapellaLightClientStore, capellaLightClientOptimisticUpdate, currentSlot, syncProtocol.Options, syncProtocol.Logger); + + if (capellaResult) + { + liteDbService.StoreOrUpdate(nameof(CapellaLightClientStore), syncProtocol.CapellaLightClientStore); + _logger?.LogInformation("Processed light client optimistic update from {PeerId}", context.RemotePeer.Address.Get()); + } + else + { + _logger?.LogError("Failed to process light client optimistic update from {PeerId}", context.RemotePeer.Address.Get()); + } break; case ForkType.Bellatrix: var bellatrixLightClientOptimisticUpdate = AltairLightClientOptimisticUpdate.Deserialize(result.Item3, syncProtocol.Options.Preset); - AltairProcessors.ProcessLightClientOptimisticUpdate(syncProtocol.AltairLightClientStore, bellatrixLightClientOptimisticUpdate, currentSlot, syncProtocol.Options, syncProtocol.Logger); - liteDbService.StoreOrUpdate(nameof(AltairLightClientStore), syncProtocol.AltairLightClientStore); - _logger?.LogInformation("Processed light client optimistic update from {PeerId}", context.RemotePeer.Address.Get()); + var bellatrixResult = AltairProcessors.ProcessLightClientOptimisticUpdate(syncProtocol.AltairLightClientStore, bellatrixLightClientOptimisticUpdate, currentSlot, syncProtocol.Options, syncProtocol.Logger); + + if (bellatrixResult) + { + liteDbService.StoreOrUpdate(nameof(AltairLightClientStore), syncProtocol.AltairLightClientStore); + _logger?.LogInformation("Processed light client optimistic update from {PeerId}", context.RemotePeer.Address.Get()); + } + else + { + _logger?.LogError("Failed to process light client optimistic update from {PeerId}", context.RemotePeer.Address.Get()); + } break; case ForkType.Altair: var altairLightClientOptimisticUpdate = AltairLightClientOptimisticUpdate.Deserialize(result.Item3, syncProtocol.Options.Preset); - AltairProcessors.ProcessLightClientOptimisticUpdate(syncProtocol.AltairLightClientStore, altairLightClientOptimisticUpdate, currentSlot, syncProtocol.Options, syncProtocol.Logger); - liteDbService.StoreOrUpdate(nameof(AltairLightClientStore), syncProtocol.AltairLightClientStore); - _logger?.LogInformation("Processed light client optimistic update from {PeerId}", context.RemotePeer.Address.Get()); + var altairResult = AltairProcessors.ProcessLightClientOptimisticUpdate(syncProtocol.AltairLightClientStore, altairLightClientOptimisticUpdate, currentSlot, syncProtocol.Options, syncProtocol.Logger); + + if (altairResult) + { + liteDbService.StoreOrUpdate(nameof(AltairLightClientStore), syncProtocol.AltairLightClientStore); + _logger?.LogInformation("Processed light client optimistic update from {PeerId}", context.RemotePeer.Address.Get()); + } + else + { + _logger?.LogError("Failed to process light client optimistic update from {PeerId}", context.RemotePeer.Address.Get()); + } break; case ForkType.Phase0: _logger?.LogError("Received light client optimistic update response with unexpected fork type from {PeerId}", context.RemotePeer.Address.Get()); @@ -86,7 +118,7 @@ public async Task DialAsync(IChannel downChannel, IChannelFactory? upChannelFact } catch (Exception e) { - _logger?.LogError(e, "Failed to receieve light client optimistic update from {PeerId}", context.RemotePeer.Address.Get()); + _logger?.LogError(e, "Failed to receive light client optimistic update from {PeerId}", context.RemotePeer.Address.Get()); await downChannel.CloseAsync(); } } @@ -111,12 +143,13 @@ public async Task ListenAsync(IChannel downChannel, IChannelFactory? upChannelFa } else { - _logger?.LogInformation("Sending light client optimistic update response to {PeerId}", context.RemotePeer.Address.Get()); var sszData = DenebLightClientOptimisticUpdate.Serialize(response, syncProtocol.Options.Preset); var encodedResponse = ReqRespHelpers.EncodeResponse(sszData, forkDigest, ResponseCodes.Success); var rawData = new ReadOnlySequence(encodedResponse); await downChannel.WriteAsync(rawData); + + _logger?.LogInformation("Sent light client optimistic update response to {PeerId}", context.RemotePeer.Address.Get()); } } catch (Exception ex) diff --git a/src/Lantern.Beacon/Networking/ReqRespProtocols/LightClientUpdatesByRangeProtocol.cs b/src/Lantern.Beacon/Networking/ReqRespProtocols/LightClientUpdatesByRangeProtocol.cs index 9689b73..fc4dd81 100644 --- a/src/Lantern.Beacon/Networking/ReqRespProtocols/LightClientUpdatesByRangeProtocol.cs +++ b/src/Lantern.Beacon/Networking/ReqRespProtocols/LightClientUpdatesByRangeProtocol.cs @@ -80,31 +80,64 @@ public async Task DialAsync(IChannel downChannel, IChannelFactory? upChannelFact { case ForkType.Deneb: var denebLightClientUpdate = DenebLightClientUpdate.Deserialize(result.Item3, syncProtocol.Options.Preset); - DenebProcessors.ProcessLightClientUpdate(syncProtocol.DenebLightClientStore, denebLightClientUpdate, currentSlot, syncProtocol.Options, syncProtocol.Logger); - liteDbService.Store(nameof(DenebLightClientUpdate), denebLightClientUpdate); - liteDbService.StoreOrUpdate(nameof(DenebLightClientStore), syncProtocol.DenebLightClientStore); - _logger?.LogInformation("Processed light client update response from {PeerId}", context.RemotePeer.Address.Get()); + var denebResult = DenebProcessors.ProcessLightClientUpdate(syncProtocol.DenebLightClientStore, denebLightClientUpdate, currentSlot, syncProtocol.Options, syncProtocol.Logger); + + if (denebResult) + { + liteDbService.Store(nameof(DenebLightClientUpdate), denebLightClientUpdate); + liteDbService.StoreOrUpdate(nameof(DenebLightClientStore), syncProtocol.DenebLightClientStore); + + _logger?.LogInformation("Processed light client update response from {PeerId}", context.RemotePeer.Address.Get()); + } + else + { + _logger?.LogError("Failed to process light client update response from {PeerId}", context.RemotePeer.Address.Get()); + } break; case ForkType.Capella: var capellaLightClientUpdate = CapellaLightClientUpdate.Deserialize(result.Item3, syncProtocol.Options.Preset); - CapellaProcessors.ProcessLightClientUpdate(syncProtocol.CapellaLightClientStore, capellaLightClientUpdate, currentSlot, syncProtocol.Options, syncProtocol.Logger); - liteDbService.Store(nameof(CapellaLightClientUpdate), capellaLightClientUpdate); - liteDbService.StoreOrUpdate(nameof(CapellaLightClientStore), syncProtocol.CapellaLightClientStore); - _logger?.LogInformation("Processed light client update response from {PeerId}", context.RemotePeer.Address.Get()); + var capellaResult = CapellaProcessors.ProcessLightClientUpdate(syncProtocol.CapellaLightClientStore, capellaLightClientUpdate, currentSlot, syncProtocol.Options, syncProtocol.Logger); + + if (capellaResult) + { + liteDbService.Store(nameof(CapellaLightClientUpdate), capellaLightClientUpdate); + liteDbService.StoreOrUpdate(nameof(CapellaLightClientStore), syncProtocol.CapellaLightClientStore); + _logger?.LogInformation("Processed light client update response from {PeerId}", context.RemotePeer.Address.Get()); + } + else + { + _logger?.LogError("Failed to process light client update response from {PeerId}", context.RemotePeer.Address.Get()); + } break; case ForkType.Bellatrix: var bellatrixLightClientUpdate = AltairLightClientUpdate.Deserialize(result.Item3, syncProtocol.Options.Preset); - AltairProcessors.ProcessLightClientUpdate(syncProtocol.AltairLightClientStore, bellatrixLightClientUpdate, currentSlot, syncProtocol.Options, syncProtocol.Logger); - liteDbService.Store(nameof(AltairLightClientUpdate), bellatrixLightClientUpdate); - liteDbService.StoreOrUpdate(nameof(AltairLightClientStore), syncProtocol.AltairLightClientStore); - _logger?.LogInformation("Processed light client update response from {PeerId}", context.RemotePeer.Address.Get()); + var bellatrixResult = AltairProcessors.ProcessLightClientUpdate(syncProtocol.AltairLightClientStore, bellatrixLightClientUpdate, currentSlot, syncProtocol.Options, syncProtocol.Logger); + + if (bellatrixResult) + { + liteDbService.Store(nameof(AltairLightClientUpdate), bellatrixLightClientUpdate); + liteDbService.StoreOrUpdate(nameof(AltairLightClientStore), syncProtocol.AltairLightClientStore); + _logger?.LogInformation("Processed light client update response from {PeerId}", context.RemotePeer.Address.Get()); + } + else + { + _logger?.LogError("Failed to process light client update response from {PeerId}", context.RemotePeer.Address.Get()); + } break; case ForkType.Altair: var altairLightClientUpdate = AltairLightClientUpdate.Deserialize(result.Item3, syncProtocol.Options.Preset); - AltairProcessors.ProcessLightClientUpdate(syncProtocol.AltairLightClientStore, altairLightClientUpdate, currentSlot, syncProtocol.Options, syncProtocol.Logger); - liteDbService.Store(nameof(AltairLightClientUpdate), altairLightClientUpdate); - liteDbService.StoreOrUpdate(nameof(AltairLightClientStore), syncProtocol.AltairLightClientStore); - _logger?.LogInformation("Processed light client update response from {PeerId}", context.RemotePeer.Address.Get()); + var altairResult = AltairProcessors.ProcessLightClientUpdate(syncProtocol.AltairLightClientStore, altairLightClientUpdate, currentSlot, syncProtocol.Options, syncProtocol.Logger); + + if (altairResult) + { + liteDbService.Store(nameof(AltairLightClientUpdate), altairLightClientUpdate); + liteDbService.StoreOrUpdate(nameof(AltairLightClientStore), syncProtocol.AltairLightClientStore); + _logger?.LogInformation("Processed light client update response from {PeerId}", context.RemotePeer.Address.Get()); + } + else + { + _logger?.LogError("Failed to process light client update response from {PeerId}", context.RemotePeer.Address.Get()); + } break; case ForkType.Phase0: _logger?.LogError("Received light client update response with unexpected fork type from {PeerId}", context.RemotePeer.Address.Get()); @@ -161,13 +194,13 @@ public async Task ListenAsync(IChannel downChannel, IChannelFactory? upChannelFa } else { - _logger?.LogInformation("Sending light client update response to {PeerId}", - context.RemotePeer.Address.Get()); var sszData = DenebLightClientUpdate.Serialize(response, syncProtocol.Options.Preset); var encodedResponse = ReqRespHelpers.EncodeResponse(sszData, forkDigest, ResponseCodes.Success); var rawData = new ReadOnlySequence(encodedResponse); await downChannel.WriteAsync(rawData); + _logger?.LogInformation("Sent light client update response to {PeerId} for sync period {startPeriod}", + context.RemotePeer.Address.Get(), request.StartPeriod); } } } diff --git a/src/Lantern.Beacon/Networking/ReqRespProtocols/MetaDataProtocol.cs b/src/Lantern.Beacon/Networking/ReqRespProtocols/MetaDataProtocol.cs index ba7601c..dab26d0 100644 --- a/src/Lantern.Beacon/Networking/ReqRespProtocols/MetaDataProtocol.cs +++ b/src/Lantern.Beacon/Networking/ReqRespProtocols/MetaDataProtocol.cs @@ -20,66 +20,72 @@ public async Task DialAsync(IChannel downChannel, IChannelFactory? upChannelFact { var receivedData = new List(); - await foreach (var readOnlySequence in downChannel.ReadAllAsync()) + try { - receivedData.Add(readOnlySequence.ToArray()); - } + await foreach (var readOnlySequence in downChannel.ReadAllAsync()) + { + receivedData.Add(readOnlySequence.ToArray()); + } - if (receivedData.Count == 0 || receivedData[0] == null || receivedData[0].Length == 0) - { - // Log that we received an empty or null response - _logger?.LogWarning("Received an empty or null response from {PeerId}", context.RemotePeer.Address.Get()); - await downChannel.CloseAsync(); - return; - } + if (receivedData.Count == 0 || receivedData[0] == null || receivedData[0].Length == 0) + { + _logger?.LogWarning("Received an empty or null response from {PeerId}", context.RemotePeer.Address.Get()); + await downChannel.CloseAsync(); + return; + } - var flatData = receivedData.SelectMany(x => x).ToArray(); + var flatData = receivedData.SelectMany(x => x).ToArray(); - if (flatData[0] == (byte)ResponseCodes.ResourceUnavailable || flatData[0] == (byte)ResponseCodes.InvalidRequest || flatData[0] == (byte)ResponseCodes.ServerError) - { - _logger?.LogInformation("Failed to handle metadata response from {PeerId} due to reason {Reason}", context.RemotePeer.Address.Get(), (ResponseCodes)flatData[0]); - await downChannel.CloseAsync(); - return; - } + if (flatData[0] == (byte)ResponseCodes.ResourceUnavailable || flatData[0] == (byte)ResponseCodes.InvalidRequest || flatData[0] == (byte)ResponseCodes.ServerError) + { + _logger?.LogInformation("Failed to handle metadata response from {PeerId} due to reason {Reason}", context.RemotePeer.Address.Get(), (ResponseCodes)flatData[0]); + await downChannel.CloseAsync(); + return; + } + + var result = ReqRespHelpers.DecodeResponse(flatData); + + if(result.Item2 != ResponseCodes.Success) + { + _logger?.LogError("Failed to decode metadata response from {PeerId}", context.RemotePeer.Address.Get()); + await downChannel.CloseAsync(); + return; + } - var result = ReqRespHelpers.DecodeResponse(flatData); + var metaDataResponse = MetaData.Deserialize(result.Item1); - if(result.Item2 != ResponseCodes.Success) + _logger?.LogDebug("Received metadata response from {PeerId} with seq number {SeqNumber} and attnets {Attnets}", + context.RemotePeer.Address.Get(), + metaDataResponse.SeqNumber, + Convert.ToHexString(metaDataResponse.Attnets.Select(b => b ? (byte)1 : (byte)0).ToArray())); + } + catch (Exception e) { - _logger?.LogError("Failed to decode metadata response from {PeerId}", context.RemotePeer.Address.Get()); + _logger?.LogError(e, "Error while dialing for MetaData request from {PeerId}. Exception: {Message}", + context.RemotePeer.Address.Get(), e.Message); await downChannel.CloseAsync(); - return; } - - var metaDataResponse = MetaData.Deserialize(result.Item1); - - _logger?.LogInformation("Received metadata response from {PeerId} with seq number {SeqNumber} and attnets {Attnets}", - context.RemotePeer.Address.Get(), - metaDataResponse.SeqNumber, - Convert.ToHexString(metaDataResponse.Attnets.Select(b => b ? (byte)1 : (byte)0).ToArray())); } public async Task ListenAsync(IChannel downChannel, IChannelFactory? upChannelFactory, IPeerContext context) { - _logger?.LogDebug("Listening for MetaData request from {PeerId}", context.RemotePeer.Address); - - var rawData = new ReadOnlySequence(); - + _logger?.LogInformation("Listening for MetaData request from {PeerId}", context.RemotePeer.Address); + try { var metaData = peerState.MetaData; var sszData = MetaData.Serialize(metaData); var payload = ReqRespHelpers.EncodeResponse(sszData, ResponseCodes.Success); - rawData = new ReadOnlySequence(payload); + var rawData = new ReadOnlySequence(payload); + + await downChannel.WriteAsync(rawData); + _logger?.LogInformation("Sent MetaData response to {PeerId}", context.RemotePeer.Address.Get()); } catch (Exception e) { - _logger?.LogError(e, "Failed to encode MetaData request from {PeerId}. Exception: {Message}", + _logger?.LogError(e, "Error while listening for MetaData request from {PeerId}. Exception: {Message}", context.RemotePeer.Address.Get(), e.Message); - return; + await downChannel.CloseAsync(); } - - await downChannel.WriteAsync(rawData); - _logger?.LogInformation("Sent MetaData to {PeerId} with data {Data}", context.RemotePeer.Address.Get(), Convert.ToHexString(rawData.ToArray())); } } \ No newline at end of file diff --git a/src/Lantern.Beacon/Networking/ReqRespProtocols/PingProtocol.cs b/src/Lantern.Beacon/Networking/ReqRespProtocols/PingProtocol.cs index efbb7b5..78e3fcb 100644 --- a/src/Lantern.Beacon/Networking/ReqRespProtocols/PingProtocol.cs +++ b/src/Lantern.Beacon/Networking/ReqRespProtocols/PingProtocol.cs @@ -16,42 +16,50 @@ public class PingProtocol(IPeerState peerState, ILoggerFactory? loggerFactory = public async Task DialAsync(IChannel downChannel, IChannelFactory? upChannelFactory, IPeerContext context) { - var ping = Ping.CreateFrom(peerState.MetaData.SeqNumber); - var sszData = Ping.Serialize(ping); - var payload = ReqRespHelpers.EncodeRequest(sszData); - var rawData = new ReadOnlySequence(payload); + try + { + var ping = Ping.CreateFrom(peerState.MetaData.SeqNumber); + var sszData = Ping.Serialize(ping); + var payload = ReqRespHelpers.EncodeRequest(sszData); + var rawData = new ReadOnlySequence(payload); - _logger?.LogInformation("Sending ping to {PeerId} with SeqNumber {Value} and data {Data}", context.RemotePeer.Address.Get(), peerState.MetaData.SeqNumber, Convert.ToHexString(payload)); + _logger?.LogInformation("Sending ping to {PeerId} with SeqNumber {Value} and data {Data}", context.RemotePeer.Address.Get(), peerState.MetaData.SeqNumber, Convert.ToHexString(payload)); - await downChannel.WriteAsync(rawData); - var receivedData = new List(); + await downChannel.WriteAsync(rawData); + var receivedData = new List(); - await foreach (var readOnlySequence in downChannel.ReadAllAsync()) - { - receivedData.Add(readOnlySequence.ToArray()); - } + await foreach (var readOnlySequence in downChannel.ReadAllAsync()) + { + receivedData.Add(readOnlySequence.ToArray()); + } - if (receivedData.Count == 0 || receivedData[0] == null || receivedData[0].Length == 0) - { - // Log that we received an empty or null response - _logger?.LogWarning("Received an empty or null response from {PeerId}", context.RemotePeer.Address.Get()); - await downChannel.CloseAsync(); - return; - } + if (receivedData.Count == 0 || receivedData[0] == null || receivedData[0].Length == 0) + { + // Log that we received an empty or null response + _logger?.LogWarning("Received an empty or null response from {PeerId}", context.RemotePeer.Address.Get()); + await downChannel.CloseAsync(); + return; + } - var flatData = receivedData.SelectMany(x => x).ToArray(); - var result = ReqRespHelpers.DecodeResponse(flatData); + var flatData = receivedData.SelectMany(x => x).ToArray(); + var result = ReqRespHelpers.DecodeResponse(flatData); + + if(result.Item2 != ResponseCodes.Success) + { + _logger?.LogError("Failed to decode ping response from {PeerId}", context.RemotePeer.Address.Get()); + await downChannel.CloseAsync(); + return; + } - if(result.Item2 != ResponseCodes.Success) + var pingResponse = Ping.Deserialize(result.Item1); + + _logger?.LogInformation("Received pong response from {PeerId} with seq number {SeqNumber}", context.RemotePeer.Address.Get(), pingResponse.SeqNumber); + } + catch (Exception ex) { - _logger?.LogError("Failed to decode ping response from {PeerId}", context.RemotePeer.Address.Get()); + _logger?.LogError(ex, "Error while dialing for ping request to {PeerId}", context.RemotePeer.Address.Get()); await downChannel.CloseAsync(); - return; } - - var pingResponse = Ping.Deserialize(result.Item1); - - _logger?.LogInformation("Received pong from {PeerId} with seq number {SeqNumber}", context.RemotePeer.Address.Get(), pingResponse.SeqNumber); } public async Task ListenAsync(IChannel downChannel, IChannelFactory? upChannelFactory, IPeerContext context) @@ -64,15 +72,10 @@ public async Task ListenAsync(IChannel downChannel, IChannelFactory? upChannelFa { await foreach (var readOnlySequence in downChannel.ReadAllAsync()) { - Console.WriteLine("Received data: " + Convert.ToHexString(readOnlySequence.ToArray())); receivedData.Add(readOnlySequence.ToArray()); } - - Console.WriteLine("Finished receiving data..."); var flatData = receivedData.SelectMany(x => x).ToArray(); - - Console.WriteLine("Flat data: " + Convert.ToHexString(flatData)); var result = ReqRespHelpers.DecodeRequest(flatData); if (result == null) @@ -89,11 +92,12 @@ public async Task ListenAsync(IChannel downChannel, IChannelFactory? upChannelFa await downChannel.WriteAsync(rawData); - _logger?.LogInformation("Sent pong to {PeerId} with SeqNumber {Value} and data {Data}", context.RemotePeer.Address.Get(), peerState.MetaData.SeqNumber, Convert.ToHexString(payload)); + _logger?.LogInformation("Sent pong response to {PeerId}", context.RemotePeer.Address.Get()); } catch (Exception ex) { _logger?.LogError(ex, "Error while listening for ping request from {PeerId}", context.RemotePeer.Address.Get()); + await downChannel.CloseAsync(); } } } \ No newline at end of file diff --git a/src/Lantern.Beacon/Networking/ReqRespProtocols/StatusProtocol.cs b/src/Lantern.Beacon/Networking/ReqRespProtocols/StatusProtocol.cs index cf197cd..f598288 100644 --- a/src/Lantern.Beacon/Networking/ReqRespProtocols/StatusProtocol.cs +++ b/src/Lantern.Beacon/Networking/ReqRespProtocols/StatusProtocol.cs @@ -17,61 +17,69 @@ public class StatusProtocol(ISyncProtocol syncProtocol, ILoggerFactory? loggerFa public async Task DialAsync(IChannel downChannel, IChannelFactory? upChannelFactory, IPeerContext context) { - var forkDigest = BeaconClientUtility.GetForkDigestBytes(syncProtocol.Options); - var finalisedRoot = syncProtocol.CapellaLightClientStore.FinalizedHeader.GetHashTreeRoot(syncProtocol.Options.Preset); - var finalizedEpoch = Phase0Helpers.ComputeEpochAtSlot(syncProtocol.CapellaLightClientStore.FinalizedHeader.Beacon.Slot); - var headRoot = syncProtocol.CapellaLightClientStore.OptimisticHeader.GetHashTreeRoot(syncProtocol.Options.Preset); - var headSlot = syncProtocol.CapellaLightClientStore.OptimisticHeader.Beacon.Slot; - var localStatus = Status.CreateFrom(forkDigest, finalisedRoot, finalizedEpoch, headRoot, headSlot); - var sszData = Status.Serialize(localStatus); - var payload = ReqRespHelpers.EncodeRequest(sszData); - var rawData = new ReadOnlySequence(payload); - - _logger?.LogInformation("Sending status request to {PeerId} with forkDigest={ForkDigest}, finalizedRoot={FinalizedRoot}, finalizedEpoch={FinalizedEpoch}, headRoot={HeadRoot}, headSlot={HeadSlot}", context.RemotePeer.Address.Get(), Convert.ToHexString(localStatus.ForkDigest), Convert.ToHexString(localStatus.FinalizedRoot), localStatus.FinalizedEpoch, Convert.ToHexString(localStatus.HeadRoot), localStatus.HeadSlot); - - await downChannel.WriteAsync(rawData); - var receivedData = new List(); - - await foreach (var readOnlySequence in downChannel.ReadAllAsync()) - { - receivedData.Add(readOnlySequence.ToArray()); - } - - if (receivedData.Count == 0 || receivedData[0] == null || receivedData[0].Length == 0) - { - // Log that we received an empty or null response - _logger?.LogWarning("Received an empty or null response from {PeerId}", context.RemotePeer.Address.Get()); - await downChannel.CloseAsync(); - return; - } - - var flatData = receivedData.SelectMany(x => x).ToArray(); - - if (flatData[0] == (byte)ResponseCodes.ResourceUnavailable || flatData[0] == (byte)ResponseCodes.InvalidRequest || flatData[0] == (byte)ResponseCodes.ServerError) + try { - _logger?.LogInformation("Failed to handle status response from {PeerId} due to reason {Reason}", context.RemotePeer.Address.Get(), (ResponseCodes)flatData[0]); - await downChannel.CloseAsync(); - return; + var forkDigest = BeaconClientUtility.GetForkDigestBytes(syncProtocol.Options); + var finalisedRoot = syncProtocol.CapellaLightClientStore.FinalizedHeader.GetHashTreeRoot(syncProtocol.Options.Preset); + var finalizedEpoch = Phase0Helpers.ComputeEpochAtSlot(syncProtocol.CapellaLightClientStore.FinalizedHeader.Beacon.Slot); + var headRoot = syncProtocol.CapellaLightClientStore.OptimisticHeader.GetHashTreeRoot(syncProtocol.Options.Preset); + var headSlot = syncProtocol.CapellaLightClientStore.OptimisticHeader.Beacon.Slot; + var localStatus = Status.CreateFrom(forkDigest, finalisedRoot, finalizedEpoch, headRoot, headSlot); + var sszData = Status.Serialize(localStatus); + var payload = ReqRespHelpers.EncodeRequest(sszData); + var rawData = new ReadOnlySequence(payload); + + _logger?.LogInformation("Sending status request to {PeerId} with forkDigest={ForkDigest}, finalizedRoot={FinalizedRoot}, finalizedEpoch={FinalizedEpoch}, headRoot={HeadRoot}, headSlot={HeadSlot}", context.RemotePeer.Address.Get(), Convert.ToHexString(localStatus.ForkDigest), Convert.ToHexString(localStatus.FinalizedRoot), localStatus.FinalizedEpoch, Convert.ToHexString(localStatus.HeadRoot), localStatus.HeadSlot); + + await downChannel.WriteAsync(rawData); + var receivedData = new List(); + + await foreach (var readOnlySequence in downChannel.ReadAllAsync()) + { + receivedData.Add(readOnlySequence.ToArray()); + } + + if (receivedData.Count == 0 || receivedData[0] == null || receivedData[0].Length == 0) + { + // Log that we received an empty or null response + _logger?.LogWarning("Received an empty or null response from {PeerId}", context.RemotePeer.Address.Get()); + await downChannel.CloseAsync(); + return; + } + + var flatData = receivedData.SelectMany(x => x).ToArray(); + + if (flatData[0] == (byte)ResponseCodes.ResourceUnavailable || flatData[0] == (byte)ResponseCodes.InvalidRequest || flatData[0] == (byte)ResponseCodes.ServerError) + { + _logger?.LogInformation("Failed to handle status response from {PeerId} due to reason {Reason}", context.RemotePeer.Address.Get(), (ResponseCodes)flatData[0]); + await downChannel.CloseAsync(); + return; + } + + var result = ReqRespHelpers.DecodeResponse(flatData); + + if(result.Item2 != ResponseCodes.Success) + { + _logger?.LogError("Failed to decode status response from {PeerId}", context.RemotePeer.Address.Get()); + await downChannel.CloseAsync(); + return; + } + + var statusResponse = Status.Deserialize(result.Item1); + + _logger?.LogInformation("Received status response from {PeerId} with forkDigest={forkDigest}, finalizedRoot={finalizedRoot}, finalizedEpoch={finalizedEpoch}, headRoot={headRoot}, headSlot={headSlot}", context.RemotePeer.Address.Get(), + Convert.ToHexString(statusResponse.ForkDigest), Convert.ToHexString(statusResponse.FinalizedRoot), statusResponse.FinalizedEpoch, Convert.ToHexString(statusResponse.HeadRoot), statusResponse.HeadSlot); } - - var result = ReqRespHelpers.DecodeResponse(flatData); - - if(result.Item2 != ResponseCodes.Success) + catch (Exception ex) { - _logger?.LogError("Failed to decode status response from {PeerId}", context.RemotePeer.Address.Get()); + _logger?.LogError(ex, "Error while dialing for status request to {PeerId}", context.RemotePeer.Address.Get()); await downChannel.CloseAsync(); - return; } - - var statusResponse = Status.Deserialize(result.Item1); - - _logger?.LogInformation("Received status response from {PeerId} with forkDigest={forkDigest}, finalizedRoot={finalizedRoot}, finalizedEpoch={finalizedEpoch}, headRoot={headRoot}, headSlot={headSlot}", context.RemotePeer.Address.Get(), - Convert.ToHexString(statusResponse.ForkDigest), Convert.ToHexString(statusResponse.FinalizedRoot), statusResponse.FinalizedEpoch, Convert.ToHexString(statusResponse.HeadRoot), statusResponse.HeadSlot); } public async Task ListenAsync(IChannel downChannel, IChannelFactory? upChannelFactory, IPeerContext context) { - _logger?.LogDebug("Listening for status request from {PeerId}", context.RemotePeer.Address); + _logger?.LogInformation("Listening for status request from {PeerId}", context.RemotePeer.Address); var receivedData = new List(); @@ -79,7 +87,6 @@ public async Task ListenAsync(IChannel downChannel, IChannelFactory? upChannelFa { await foreach (var readOnlySequence in downChannel.ReadAllAsync()) { - Console.WriteLine("Received data: " + Convert.ToHexString(readOnlySequence.ToArray())); receivedData.Add(readOnlySequence.ToArray()); } @@ -94,7 +101,7 @@ public async Task ListenAsync(IChannel downChannel, IChannelFactory? upChannelFa } var statusResponse = Status.Deserialize(result); - _logger?.LogInformation("Received status request from {PeerId} with forkDigest={forkDigest}, finalizedRoot={finalizedRoot}, finalizedEpoch={finalizedEpoch}, headRoot={headRoot}, headSlot={headSlot}", context.RemotePeer.Address.Get(), + _logger?.LogDebug("Received status request from {PeerId} with forkDigest={forkDigest}, finalizedRoot={finalizedRoot}, finalizedEpoch={finalizedEpoch}, headRoot={headRoot}, headSlot={headSlot}", context.RemotePeer.Address.Get(), Convert.ToHexString(statusResponse.ForkDigest), Convert.ToHexString(statusResponse.FinalizedRoot), statusResponse.FinalizedEpoch, Convert.ToHexString(statusResponse.HeadRoot), statusResponse.HeadSlot); var forkDigest = BeaconClientUtility.GetForkDigestBytes(syncProtocol.Options); @@ -109,11 +116,12 @@ public async Task ListenAsync(IChannel downChannel, IChannelFactory? upChannelFa await downChannel.WriteAsync(rawData); - _logger?.LogInformation("Sent status response to {PeerId} with data {Data}", context.RemotePeer.Address.Get(), Convert.ToHexString(payload)); + _logger?.LogInformation("Sent status response to {PeerId}", context.RemotePeer.Address.Get()); } catch (Exception ex) { _logger?.LogError(ex, "Error while listening for status request from {PeerId}", context.RemotePeer.Address.Get()); + await downChannel.CloseAsync(); } } } \ No newline at end of file diff --git a/src/Lantern.Beacon/Storage/LiteDbService.cs b/src/Lantern.Beacon/Storage/LiteDbService.cs index b352578..f0b0d02 100644 --- a/src/Lantern.Beacon/Storage/LiteDbService.cs +++ b/src/Lantern.Beacon/Storage/LiteDbService.cs @@ -7,96 +7,104 @@ namespace Lantern.Beacon.Storage; public sealed class LiteDbService(BeaconClientOptions options, ILoggerFactory loggerFactory) : ILiteDbService, IDisposable { private LiteDatabase? _liteDatabase; + private readonly object _lock = new(); private readonly ILogger _logger = loggerFactory.CreateLogger(); public void Init() { - if (_liteDatabase != null) + lock (_lock) { - throw new InvalidOperationException("LiteDbService already initialized"); - } - - var directoryPath = Path.GetDirectoryName(options.DataDirectoryPath); - - if (!Directory.Exists(directoryPath)) - { - Directory.CreateDirectory(directoryPath); + if (_liteDatabase != null) + { + throw new InvalidOperationException("LiteDbService already initialized"); + } + + var directoryPath = Path.GetDirectoryName(options.DataDirectoryPath); + + if (!Directory.Exists(directoryPath)) + { + Directory.CreateDirectory(directoryPath); + } + + _liteDatabase = new LiteDatabase(options.DataDirectoryPath); + _logger.LogInformation("Data directory initialized with path: {Path}", options.DataDirectoryPath); } - - _liteDatabase = new LiteDatabase(options.DataDirectoryPath); - _logger.LogInformation("Data directory initialized with path: {Path}", options.DataDirectoryPath); } - + public void Store(string collectionName, T item) { - if(_liteDatabase == null) + lock (_lock) { - throw new InvalidOperationException("LiteDbService not initialized"); + if (_liteDatabase == null) + { + throw new InvalidOperationException("LiteDbService not initialized"); + } + + var collection = _liteDatabase.GetCollection(collectionName); + collection.Insert(item); } - - var collection = _liteDatabase.GetCollection(collectionName); - collection.Insert(item); } - + public void StoreOrUpdate(string collectionName, T item) where T : class, IEquatable, new() { - if (_liteDatabase == null) + lock (_lock) { - throw new InvalidOperationException("LiteDbService not initialized"); - } + if (_liteDatabase == null) + { + throw new InvalidOperationException("LiteDbService not initialized"); + } - var collection = _liteDatabase.GetCollection(collectionName); - var existingItem = collection.FindAll().FirstOrDefault(); + var collection = _liteDatabase.GetCollection(collectionName); + var existingItem = collection.FindAll().FirstOrDefault(); - if (existingItem != null) - { - collection.DeleteAll(); - } + if (existingItem != null) + { + collection.DeleteAll(); + } - collection.Insert(item); + collection.Insert(item); + } } - + public T? Fetch(string collectionName) { - if (_liteDatabase == null) + lock (_lock) { - throw new InvalidOperationException("LiteDbService not initialized"); - } + if (_liteDatabase == null) + { + throw new InvalidOperationException("LiteDbService not initialized"); + } - var collection = _liteDatabase.GetCollection(collectionName); - return collection.FindAll().FirstOrDefault(); - } - - public T? FetchByPredicate(string collectionName, Expression> predicate) - { - if(_liteDatabase == null) - { - throw new InvalidOperationException("LiteDbService not initialized"); + var collection = _liteDatabase.GetCollection(collectionName); + return collection.FindAll().FirstOrDefault(); } - - var collection = _liteDatabase.GetCollection(collectionName); - return collection.FindOne(predicate); } - - public bool UpdateItem(string collectionName, T item) where T : new() + + public T? FetchByPredicate(string collectionName, Expression> predicate) { - if (_liteDatabase == null) + lock (_lock) { - throw new InvalidOperationException("LiteDbService not initialized"); - } + if (_liteDatabase == null) + { + throw new InvalidOperationException("LiteDbService not initialized"); + } - var collection = _liteDatabase.GetCollection(collectionName); - return collection.Update(item); + var collection = _liteDatabase.GetCollection(collectionName); + return collection.FindOne(predicate); + } } private void Dispose(bool disposing) { - if (disposing) + lock (_lock) { - _liteDatabase?.Dispose(); + if (disposing) + { + _liteDatabase?.Dispose(); + } } } - + public void Dispose() { Dispose(true); diff --git a/test/Lantern.Beacon.Tests/BeaconClientManagerTests.cs b/test/Lantern.Beacon.Tests/BeaconClientManagerTests.cs index e474666..47c169d 100644 --- a/test/Lantern.Beacon.Tests/BeaconClientManagerTests.cs +++ b/test/Lantern.Beacon.Tests/BeaconClientManagerTests.cs @@ -5,7 +5,6 @@ using Lantern.Discv5.Enr; using Lantern.Discv5.Enr.Entries; using Lantern.Discv5.WireProtocol.Identity; -using Lantern.Discv5.WireProtocol.Utility; using Microsoft.Extensions.Logging; using Moq; using Multiformats.Address; diff --git a/test/Lantern.Beacon.Tests/CoreTests.cs b/test/Lantern.Beacon.Tests/CoreTests.cs index c966309..c965374 100644 --- a/test/Lantern.Beacon.Tests/CoreTests.cs +++ b/test/Lantern.Beacon.Tests/CoreTests.cs @@ -33,8 +33,7 @@ public void DbStoreTest() var loggerFactory = new LoggerFactory(); var db = new LiteDbService(options, loggerFactory); var oldUpdate = DenebLightClientFinalityUpdate.CreateDefault(); - - db.Init(); + db.StoreOrUpdate(nameof(DenebLightClientFinalityUpdate), oldUpdate); } @@ -44,8 +43,6 @@ public void DbReadTest() var options = new BeaconClientOptions(); var loggerFactory = new LoggerFactory(); var db = new LiteDbService(options, loggerFactory); - - db.Init(); var store = db.Fetch(nameof(ForkType)); @@ -62,7 +59,6 @@ public void DbUpdateTest() var newUpdate = DenebLightClientFinalityUpdate.Deserialize(Convert.FromHexString( "70010000ac04000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000d3df3b000000000017000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000f40000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000004802000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000f400000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000048020000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), SizePreset.MainnetPreset); - db.Init(); db.StoreOrUpdate(nameof(DenebLightClientFinalityUpdate), newUpdate); }