-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
157 lines (142 loc) · 5.67 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
require('dotenv').config()
const express = require('express')
const app = express()
const sse = require('./src/sse')
const txs = require('./src/tx')
const zmq = require('zeromq')
const mingo = require('mingo')
const { v4: uuid } = require('uuid')
const NodeCache = require('node-cache')
app.use(sse)
app.set('view engine', 'ejs')
app.use(express.static('public'))
const connections = {
raw: { pool: [] },
parsed: { pool: [] }
}
const obj_cache = new NodeCache({deleteOnExpire: true, stdTTL: 60, checkperiod: 65})
const raw_cache = new NodeCache({deleteOnExpire: true, stdTTL: 60, checkperiod: 65})
const rblock_cache = new NodeCache({deleteOnExpire: true, stdTTL: 60, checkperiod: 65})
const oblock_cache = new NodeCache({deleteOnExpire: true, stdTTL: 60, checkperiod: 65})
const host = (process.env.node_zmq_host ? process.env.node_zmq_host : '127.0.0.1')
const port = (process.env.node_zmq_port ? process.env.node_zmq_port : '28332')
let node_listen = async function () {
try {
const sock = new zmq.Subscriber
sock.connect("tcp://"+host+":"+port)
sock.subscribe("rawtx")
//sock.subscribe("rawblock")
console.log('[INFO]', 'Connected to ZMQ at '+host+':'+port)
for await (const [topic, message] of sock) {
if (topic.toString() === 'rawtx') {
var doc = await txs.parse_tx(message.toString('hex'))
raw_cache.set(uuid(), (message.toString('hex')))
obj_cache.set(doc.tx.h, doc);
} else if (topic.toString() == 'rawblock') {
var raw = message.toString('hex')
var rblock = bitcoin.Block.fromHex(raw)
var trans = []
for(var i = 0; i < rblock.transactions.length; i++) {
trans.push(rblock.transactions[i].getId())
}
var block = {hash: rblock.getId(), size: rblock.byteLength(), version: rblock.version, timestamp: rblock.timestamp, bits: rblock.bits, nonce: rblock.nonce, transactions: trans}
}
}
} catch (err) {
console.error(err)
node_listen()
}
}
app.get(/^\/channel\/(.+)/, function(req, res) {
let encoded = req.params[0]
let decoded = Buffer.from(encoded, 'base64').toString()
res.render('channel', {
bitserve_url: '/stream/',
code: decoded
})
});
app.get('/channel', function (req, res) {
res.render('channel', {
bitserve_url: '/stream/',
code: JSON.stringify({
"v": 3,
"q": { "find": {} }
}, null, 2)
})
});
app.get('/', function(req, res) {
res.redirect('/channel')
});
app.get('/stream/raw', function(req, res) {
const fingerprint = uuid()
res.$fingerprint = fingerprint
res.sseSetup()
res.sseSend({ type: "open", data: {} })
connections.raw.pool[fingerprint] = res
console.log('[RAW]', '🥳 [SSE_JOIN]', fingerprint, '(now '+(Object.keys(connections.raw.pool).length + Object.keys(connections.raw.pool).length)+' users)')
req.on("close", function() {
delete connections.raw.pool[res.$fingerprint]
console.log('[RAW]', '🚪 [SSE_LEAVE]', res.$fingerprint, '(now '+(Object.keys(connections.raw.pool).length + Object.keys(connections.raw.pool).length)+' users)')
})
})
app.get('/stream', function(req, res) {
const fingerprint = uuid()
res.$fingerprint = fingerprint
res.sseSetup()
res.sseSend({ type: "open", data: {} })
connections.parsed.pool[fingerprint] = res
console.log('[PARSED]', '🥳 [SSE_JOIN]', fingerprint, '(now '+(Object.keys(connections.parsed.pool).length + Object.keys(connections.raw.pool).length)+' users)')
req.on("close", function() {
delete connections.parsed.pool[res.$fingerprint]
console.log('[PARSED]', '🚪 [SSE_LEAVE]', res.$fingerprint, '(now '+(Object.keys(connections.parsed.pool).length + Object.keys(connections.raw.pool).length)+' users)')
})
})
app.get(/^\/stream\/(.+)/, function(req, res) {
let encoded = req.params[0]
let decoded = Buffer.from(encoded, 'base64').toString()
const fingerprint = uuid()
res.$query = decoded
res.$fingerprint = fingerprint
res.sseSetup()
res.sseSend({ type: "open", data: {} })
connections.parsed.pool[fingerprint] = res
console.log('[QUERY]', '🥳 [SSE_JOIN]', fingerprint, '(now '+(Object.keys(connections.parsed.pool).length + Object.keys(connections.raw.pool).length)+' users)')
req.on("close", function() {
delete connections.parsed.pool[res.$fingerprint]
console.log('[QUERY]', '🚪 [SSE_LEAVE]', res.$fingerprint, '(now '+(Object.keys(connections.parsed.pool).length + Object.keys(connections.raw.pool).length)+' users)')
})
})
let send_to = async function (format, type, data) {
Object.keys(connections[format].pool).forEach(async function(key) {
if(format == 'parsed' && connections[format].pool[key].$query) {
var xquery = JSON.parse(connections[format].pool[key].$query)
let mquery = new mingo.Query(xquery.q.find);
if(mquery.test(data)) {
connections[format].pool[key].sseSend({type: type, data: [data]})
}
} else {
connections[format].pool[key].sseSend({type: type, data: [data]})
}
})
}
// Listen for new additions to "obj_cache", send to users
obj_cache.on("set", function(key, value) {
send_to('parsed', 'mempool', value)
})
// Listen for new additions to "raw_cache", send to users
raw_cache.on("set", function(key, value) {
send_to('raw', 'mempool', value)
})
// Send a heartbeat every 15 seconds to keep the connection alive
setInterval(function() {
Object.keys(connections.parsed.pool).forEach(async function(key) {
connections.parsed.pool[key].sseHeartbeat()
})
Object.keys(connections.raw.pool).forEach(async function(key) {
connections.raw.pool[key].sseHeartbeat()
})
}, 15000);
app.listen(process.env.app_port, () => {
console.log('[INFO]', process.env.app_name + ' listening on localhost:'+process.env.app_port)
node_listen()
})