1
1
#!/usr/bin/env python3
2
2
from inotify import constants
3
3
from inotify .adapters import Inotify
4
+ import os
4
5
from pyln .client import Plugin
6
+ import pika
5
7
from sqlalchemy import create_engine
6
8
from sqlalchemy import desc
7
9
from sqlalchemy .orm import sessionmaker
@@ -139,28 +141,117 @@ def tail(self):
139
141
continue
140
142
141
143
144
+ def encode_varint (value ):
145
+ """Encode a varint value"""
146
+ result = bytearray ()
147
+ while value >= 128 :
148
+ result .append ((value & 0x7F ) | 0x80 )
149
+ value >>= 7
150
+ result .append (value )
151
+ return bytes (result )
152
+
153
+
154
+ def field_prefix (index : int , wire_type : int ) -> bytes :
155
+ """The T part of the TLV for protobuf encoded fields.
156
+ Bits 0-2 are the type, while greater bits are the varint encoded field index.
157
+ 0 VARINT int32, int64, uint32, uint64, sint32, sint64, bool, enum
158
+ 1 I64 fixed64, sfixed64, double
159
+ 2 LEN string, bytes, embedded messages, packed repeated fields
160
+ 3 SGROUP group start (deprecated)
161
+ 4 EGROUP group end (deprecated)
162
+ 5 I32 fixed32, sfixed32, float"""
163
+ return encode_varint (index << 3 | wire_type )
164
+
165
+
166
+ def length_delimited (data : bytes ) -> bytes :
167
+ """The LV part of the TLV for protobuf encoded fields."""
168
+ if not data :
169
+ return b'\x00 '
170
+ return encode_varint (len (data )) + data
171
+
172
+
173
+ def serialize (msg : bytes , node_id : str , network : str ) -> bytes :
174
+ # from GL proto/internal.proto:
175
+ # message GossipMessage {
176
+ # // The raw message as seen on the wire.
177
+ # bytes raw = 1;
178
+ #
179
+ # // For private messages such as local addition of a channel we
180
+ # // want to restrict to the node that originated the message.
181
+ # bytes node_id = 2;
182
+ #
183
+ # // Which network was the client configured to follow?
184
+ # Network network = 3;
185
+ #
186
+ # // Which peer of the node sent this message?
187
+ # bytes peer_id = 4;
188
+ # }
189
+ network_encoding = {"bitcoin" : 0 , "testnet" : 1 , "regtest" : 2 , "signet" : 3 }
190
+ if network in network_encoding :
191
+ active_network = network_encoding [network ]
192
+ else :
193
+ active_network = 2
194
+ output = bytearray ()
195
+ output .extend (field_prefix (1 , 2 )) # raw message tag
196
+ output .extend (length_delimited (msg )) # raw msg field
197
+ output .extend (field_prefix (2 , 2 )) # node_id tag
198
+ output .extend (length_delimited (None )) # leave this empty - all public.
199
+ output .extend (field_prefix (3 , 0 )) # network in an enum
200
+ output .extend (length_delimited (active_network .to_bytes ())) # network field
201
+ output .extend (field_prefix (4 , 2 )) # peer_id tag
202
+ if node_id :
203
+ # Add our node_id if we have it (so we know who to blame.)
204
+ output .extend (length_delimited (node_id .encode ("utf-8" )))
205
+ else :
206
+ output .extend (length_delimited (None )) # our node id not available
207
+
208
+ return output
209
+
210
+
142
211
class Flusher (Thread ):
143
212
def __init__ (self , engine ):
144
213
Thread .__init__ (self )
145
214
self .engine = engine
146
215
self .session_maker = sessionmaker (bind = engine )
147
216
self .session = None
217
+ self .RABBITMQ_URL = os .environ .get ("RABBITMQ_URL" )
218
+ self .connection = None
219
+ my_info = plugin .rpc .getinfo ()
220
+ if "id" in my_info :
221
+ self .node_id = my_info ["id" ]
222
+ else :
223
+ self .node_id = None
224
+ if "network" in my_info :
225
+ self .network = my_info ["network" ]
226
+ else :
227
+ self .network = None
228
+
229
+ def rabbitmq_connect (self ):
230
+ params = pika .URLParameters (self .RABBITMQ_URL )
231
+ self .connection = pika .BlockingConnection (params ) # default, localhost
232
+ self .channel = self .connection .channel ()
233
+ plugin .log (f"message queue connected to { params .host } :{ params .port } " )
148
234
149
235
def run (self ):
150
236
logging .info ("Starting flusher" )
151
237
ft = FileTailer ('gossip_store' )
152
238
last_flush = time .time ()
239
+ total = 0
153
240
154
241
self .session = self .session_maker ()
155
242
for i , e in enumerate (ft .tail ()):
156
243
self .store (e )
244
+ self .publish (e )
157
245
158
246
if last_flush < time .time () - 10 :
159
247
self .session .commit ()
160
248
self .session = self .session_maker ()
161
249
last_flush = time .time ()
162
250
163
- logging .warn ("Filetailer exited..." )
251
+ plugin .log ("Filetailer exited..." , level = "warn" )
252
+ if self .connection :
253
+ self .connection .close ()
254
+ plugin .log ("Rabbitmq connection closed." , level = "warn" )
164
255
165
256
def store (self , raw : bytes ) -> None :
166
257
try :
@@ -180,7 +271,39 @@ def store(self, raw: bytes) -> None:
180
271
181
272
self .session .merge (cls .from_gossip (msg , raw ))
182
273
except Exception as e :
183
- logging .warn (f"Exception parsing gossip message: { e } " )
274
+ logging .warning (f"Exception parsing gossip message: { e } " )
275
+
276
+ def publish (self , raw : bytes ) -> None :
277
+ """Serialize and publish a gossip message to a rabbitmq exchange."""
278
+ if not self .RABBITMQ_URL :
279
+ return
280
+
281
+ try :
282
+ msg = gossipd .parse (raw )
283
+ if msg is None :
284
+ return
285
+ except Exception as e :
286
+ logging .warning (f"Could not parse gossip message: { e } " )
287
+ return
288
+
289
+ if not self .connection or not self .connection .is_open :
290
+ try :
291
+ plugin .log (f"connecting to message queue" )
292
+ self .rabbitmq_connect ()
293
+ except :
294
+ raise Exception ("rabbitmq connection closed" )
295
+
296
+ for msg_type in [gossipd .ChannelUpdate ,
297
+ gossipd .ChannelAnnouncement ,
298
+ gossipd .NodeAnnouncement ]:
299
+ if isinstance (msg , msg_type ):
300
+ self .channel .basic_publish (exchange = 'router.gossip' ,
301
+ # unused by fanout exchange
302
+ routing_key = '' ,
303
+ body = serialize (raw , self .node_id ,
304
+ self .network ))
305
+ return
306
+
184
307
185
308
186
309
@plugin .init ()
0 commit comments