Skip to content

Commit

Permalink
temp
Browse files Browse the repository at this point in the history
  • Loading branch information
ellemouton committed Feb 10, 2025
1 parent 6eb8f1f commit e051ffe
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 197 deletions.
109 changes: 30 additions & 79 deletions graph/db/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,13 +231,13 @@ func NewChannelGraph(db kvdb.Backend, options ...OptionModifier) (*ChannelGraph,
log.Debugf("Populating in-memory channel graph, this might " +
"take a while...")

err := g.ForEachNodeCacheable(
func(tx kvdb.RTx, node GraphCacheNode) error {
g.graphCache.AddNodeFeatures(node)
err := g.ForEachNodeCacheable(func(node route.Vertex,
vector *lnwire.FeatureVector) error {

return nil
},
)
g.graphCache.AddNodeFeatures(node, vector)

return nil
})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -758,8 +758,8 @@ func (c *ChannelGraph) ForEachNode(
// graph, executing the passed callback with each node encountered. If the
// callback returns an error, then the transaction is aborted and the iteration
// stops early.
func (c *ChannelGraph) ForEachNodeCacheable(cb func(kvdb.RTx,
GraphCacheNode) error) error {
func (c *ChannelGraph) ForEachNodeCacheable(cb func(route.Vertex,
*lnwire.FeatureVector) error) error {

traversal := func(tx kvdb.RTx) error {
// First grab the nodes bucket which stores the mapping from
Expand All @@ -778,7 +778,7 @@ func (c *ChannelGraph) ForEachNodeCacheable(cb func(kvdb.RTx,
}

nodeReader := bytes.NewReader(nodeBytes)
cacheableNode, err := deserializeLightningNodeCacheable(
node, features, err := deserializeLightningNodeCacheable( //nolint:lll

Check failure on line 781 in graph/db/graph.go

View workflow job for this annotation

GitHub Actions / lint code

the line is 94 characters long, which exceeds the maximum of 80 characters. (ll)
nodeReader,
)
if err != nil {
Expand All @@ -787,7 +787,7 @@ func (c *ChannelGraph) ForEachNodeCacheable(cb func(kvdb.RTx,

// Execute the callback, the transaction will abort if
// this returns an error.
return cb(tx, cacheableNode)
return cb(node, features)
})
}

Expand Down Expand Up @@ -887,13 +887,9 @@ func (c *ChannelGraph) AddLightningNode(node *models.LightningNode,
r := &batch.Request{
Update: func(tx kvdb.RwTx) error {
if c.graphCache != nil {
cNode := newGraphCacheNode(
c.graphCache.AddNodeFeatures(
node.PubKeyBytes, node.Features,
)
err := c.graphCache.AddNode(tx, cNode)
if err != nil {
return err
}
}

return addLightningNode(tx, node)
Expand Down Expand Up @@ -3045,50 +3041,6 @@ func (c *ChannelGraph) fetchLightningNode(tx kvdb.RTx,
return node, nil
}

// graphCacheNode is a struct that wraps a LightningNode in a way that it can be
// cached in the graph cache.
type graphCacheNode struct {
pubKeyBytes route.Vertex
features *lnwire.FeatureVector
}

// newGraphCacheNode returns a new cache optimized node.
func newGraphCacheNode(pubKey route.Vertex,
features *lnwire.FeatureVector) *graphCacheNode {

return &graphCacheNode{
pubKeyBytes: pubKey,
features: features,
}
}

// PubKey returns the node's public identity key.
func (n *graphCacheNode) PubKey() route.Vertex {
return n.pubKeyBytes
}

// Features returns the node's features.
func (n *graphCacheNode) Features() *lnwire.FeatureVector {
return n.features
}

// ForEachChannel iterates through all channels of this node, executing the
// passed callback with an edge info structure and the policies of each end
// of the channel. The first edge policy is the outgoing edge *to* the
// connecting node, while the second is the incoming edge *from* the
// connecting node. If the callback returns an error, then the iteration is
// halted with the error propagated back up to the caller.
//
// Unknown policies are passed into the callback as nil values.
func (n *graphCacheNode) ForEachChannel(tx kvdb.RTx,
cb func(kvdb.RTx, *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
*models.ChannelEdgePolicy) error) error {

return nodeTraversal(tx, n.pubKeyBytes[:], nil, cb)
}

var _ GraphCacheNode = (*graphCacheNode)(nil)

// HasLightningNode determines if the graph has a vertex identified by the
// target node identity public key. If the node exists in the database, a
// timestamp of when the data for the node was lasted updated is returned along
Expand Down Expand Up @@ -4048,60 +4000,59 @@ func fetchLightningNode(nodeBucket kvdb.RBucket,
return deserializeLightningNode(nodeReader)
}

func deserializeLightningNodeCacheable(r io.Reader) (*graphCacheNode, error) {
// Always populate a feature vector, even if we don't have a node
// announcement and short circuit below.
node := newGraphCacheNode(
route.Vertex{},
lnwire.EmptyFeatureVector(),
)
func deserializeLightningNodeCacheable(r io.Reader) (route.Vertex,
*lnwire.FeatureVector, error) {

var nodeScratch [8]byte
var (
pubKey route.Vertex
features = lnwire.EmptyFeatureVector()
nodeScratch [8]byte
)

// Skip ahead:
// - LastUpdate (8 bytes)
if _, err := r.Read(nodeScratch[:]); err != nil {
return nil, err
return pubKey, nil, err
}

if _, err := io.ReadFull(r, node.pubKeyBytes[:]); err != nil {
return nil, err
if _, err := io.ReadFull(r, pubKey[:]); err != nil {
return pubKey, nil, err
}

// Read the node announcement flag.
if _, err := r.Read(nodeScratch[:2]); err != nil {
return nil, err
return pubKey, nil, err
}
hasNodeAnn := byteOrder.Uint16(nodeScratch[:2])

// The rest of the data is optional, and will only be there if we got a
// node announcement for this node.
if hasNodeAnn == 0 {
return node, nil
return pubKey, features, nil
}

// We did get a node announcement for this node, so we'll have the rest
// of the data available.
var rgb uint8
if err := binary.Read(r, byteOrder, &rgb); err != nil {
return nil, err
return pubKey, nil, err
}
if err := binary.Read(r, byteOrder, &rgb); err != nil {
return nil, err
return pubKey, nil, err
}
if err := binary.Read(r, byteOrder, &rgb); err != nil {
return nil, err
return pubKey, nil, err
}

if _, err := wire.ReadVarString(r, 0); err != nil {
return nil, err
return pubKey, nil, err
}

if err := node.features.Decode(r); err != nil {
return nil, err
if err := features.Decode(r); err != nil {
return pubKey, nil, err
}

return node, nil
return pubKey, features, nil
}

func deserializeLightningNode(r io.Reader) (models.LightningNode, error) {
Expand Down
48 changes: 5 additions & 43 deletions graph/db/graph_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,10 @@ import (

"github.com/btcsuite/btcd/btcutil"
"github.com/lightningnetwork/lnd/graph/db/models"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
)

// GraphCacheNode is an interface for all the information the cache needs to know
// about a lightning node.
type GraphCacheNode interface {
// PubKey is the node's public identity key.
PubKey() route.Vertex

// Features returns the node's p2p features.
Features() *lnwire.FeatureVector

// ForEachChannel iterates through all channels of a given node,
// executing the passed callback with an edge info structure and the
// policies of each end of the channel. The first edge policy is the
// outgoing edge *to* the connecting node, while the second is the
// incoming edge *from* the connecting node. If the callback returns an
// error, then the iteration is halted with the error propagated back up
// to the caller.
ForEachChannel(kvdb.RTx,
func(kvdb.RTx, *models.ChannelEdgeInfo,
*models.ChannelEdgePolicy,
*models.ChannelEdgePolicy) error) error
}

// DirectedChannel is a type that stores the channel information as seen from
// one side of the channel.
type DirectedChannel struct {
Expand Down Expand Up @@ -124,35 +101,20 @@ func (c *GraphCache) Stats() string {
}

// AddNodeFeatures adds a graph node and its features to the cache.
func (c *GraphCache) AddNodeFeatures(node GraphCacheNode) {
nodePubKey := node.PubKey()
func (c *GraphCache) AddNodeFeatures(node route.Vertex,
features *lnwire.FeatureVector) {

nodePubKey := node

// Only hold the lock for a short time. The `ForEachChannel()` below is
// possibly slow as it has to go to the backend, so we can unlock
// between the calls. And the AddChannel() method will acquire its own
// lock anyway.
c.mtx.Lock()
c.nodeFeatures[nodePubKey] = node.Features()
c.nodeFeatures[nodePubKey] = features
c.mtx.Unlock()
}

// AddNode adds a graph node, including all the (directed) channels of that
// node.
func (c *GraphCache) AddNode(tx kvdb.RTx, node GraphCacheNode) error {
c.AddNodeFeatures(node)

return node.ForEachChannel(
tx, func(tx kvdb.RTx, info *models.ChannelEdgeInfo,
outPolicy *models.ChannelEdgePolicy,
inPolicy *models.ChannelEdgePolicy) error {

c.AddChannel(info, outPolicy, inPolicy)

return nil
},
)
}

// AddChannel adds a non-directed channel, meaning that the order of policy 1
// and policy 2 does not matter, the directionality is extracted from the info
// and policy flags automatically. The policy will be set as the outgoing policy
Expand Down
40 changes: 8 additions & 32 deletions graph/db/graph_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"testing"

"github.com/lightningnetwork/lnd/graph/db/models"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -41,23 +40,6 @@ func (n *node) Features() *lnwire.FeatureVector {
return n.features
}

func (n *node) ForEachChannel(tx kvdb.RTx,
cb func(kvdb.RTx, *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
*models.ChannelEdgePolicy) error) error {

for idx := range n.edgeInfos {
err := cb(
tx, n.edgeInfos[idx], n.outPolicies[idx],
n.inPolicies[idx],
)
if err != nil {
return err
}
}

return nil
}

// TestGraphCacheAddNode tests that a channel going from node A to node B can be
// cached correctly, independent of the direction we add the channel as.
func TestGraphCacheAddNode(t *testing.T) {
Expand Down Expand Up @@ -85,21 +67,15 @@ func TestGraphCacheAddNode(t *testing.T) {
ChannelFlags: lnwire.ChanUpdateChanFlags(channelFlagB),
ToNode: nodeA,
}
node := &node{
pubKey: nodeA,
features: lnwire.EmptyFeatureVector(),
edgeInfos: []*models.ChannelEdgeInfo{{
ChannelID: 1000,
// Those are direction independent!
NodeKey1Bytes: pubKey1,
NodeKey2Bytes: pubKey2,
Capacity: 500,
}},
outPolicies: []*models.ChannelEdgePolicy{outPolicy1},
inPolicies: []*models.ChannelEdgePolicy{inPolicy1},
}
cache := NewGraphCache(10)
require.NoError(t, cache.AddNode(nil, node))
cache.AddNodeFeatures(nodeA, lnwire.EmptyFeatureVector())
cache.AddChannel(&models.ChannelEdgeInfo{
ChannelID: 1000,
// Those are direction independent!
NodeKey1Bytes: pubKey1,
NodeKey2Bytes: pubKey2,
Capacity: 500,
}, outPolicy1, inPolicy1)

var fromChannels, toChannels []*DirectedChannel
_ = cache.ForEachChannel(nodeA, func(c *DirectedChannel) error {
Expand Down
Loading

0 comments on commit e051ffe

Please sign in to comment.