Skip to content
This repository has been archived by the owner on May 3, 2019. It is now read-only.

Commit

Permalink
Merge explorer branch
Browse files Browse the repository at this point in the history
  • Loading branch information
hrobeers committed Dec 26, 2016
2 parents 1734209 + 7e27654 commit d91384c
Show file tree
Hide file tree
Showing 8 changed files with 325 additions and 43 deletions.
21 changes: 3 additions & 18 deletions apps/explorer/config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,8 @@ use Mix.Config
# if you want to provide default values for your application for
# 3rd-party users, it should be done in your "mix.exs" file.

# You can configure for your application as:
#
# config :explorer, key: :value
#
# And access this configuration in your application as:
#
# Application.get_env(:explorer, :key)
#
# Or configure a 3rd-party app:
#
# config :logger, level: :info
#
config :explorer,
reload_interval: 20*1000,
start_height: -200 # First block to parse (negative means relative to last block at startup)

# It is also possible to import configuration files, relative to this
# directory. For example, you can emulate configuration per environment
# by uncommenting the line below and defining dev.exs, test.exs and such.
# Configuration from the imported file will override the ones defined
# here (which is why it is important to import them last).
#
# import_config "#{Mix.env}.exs"
2 changes: 1 addition & 1 deletion apps/explorer/lib/explorer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule Mercator.Explorer do
# Define workers and child supervisors to be supervised
children = [
# Starts a worker by calling: Mercator.Explorer.Worker.start_link(arg1, arg2, arg3)
# worker(Mercator.Explorer.Worker, [arg1, arg2, arg3]),
worker(Mercator.Explorer.Repo, [])
]

# See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
Expand Down
225 changes: 225 additions & 0 deletions apps/explorer/lib/repo.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
defmodule Mercator.Explorer.Repo do
use GenServer
require Logger

alias Bitcoin.Protocol.Types.Script
alias BitcoinTool.Address
alias Mercator.RPC

defp reload_interval, do: Application.get_env(:explorer, :reload_interval)
defp start_height(block_cnt) do
cnfg = Application.get_env(:explorer, :start_height)
if (cnfg < 0) do
block_cnt + cnfg
else
cnfg
end
end

@tasksup_name String.to_atom(Atom.to_string(__MODULE__) <> ".TaskSup")

## Client API

@doc """
Starts the Explorer repository.
"""
def start_link do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end

## Server Callbacks

def init(:ok) do
# init the Repo
init(:retry, true)
end

defp init(:retry, log) do
try do
# Check the connection
block_cnt = :rpc |> Gold.getblockcount!
Logger.info("Explorer.Repo: RPC connection initialized (reload_interval: " <> Integer.to_string(reload_interval) <> ")")
# Init the ETS tables
:ets.new(:pkh_index, [:set, :public, :named_table])
:ets.new(:sh_index, [:set, :public, :named_table])
:ets.new(:op_return, [:set, :public, :named_table])
# Set initial parsing state
store(start_height(block_cnt), :low_cnt, :pkh_index)
store(start_height(block_cnt), :high_cnt, :pkh_index)
# Init the task supervisor
Task.Supervisor.start_link(name: @tasksup_name)
# Initial blockchain parse (TODO: persistent storage)
parse_new_blocks(1)
{:ok, %{connected: true, parsing: false, start_time: DateTime.to_unix(DateTime.utc_now)}}
rescue
_ ->
if log, do: Logger.warn("Explorer.Repo: Failed to establish rpc connection. Will retry every second.")
# Block synchronously until connection is established
:timer.sleep(1000)
init(:retry, false)
end
end

defp parse_new_blocks(timeout) do
Process.send_after(self(), :parse_new, timeout)
end
def handle_info(:parse_new, state) do
new_state =
case :rpc |> Gold.getblockcount do
{:ok, block_cnt} ->
# Log reconnection if wasn't connected
unless Map.get(state, :connected), do: Logger.info("Explorer.Repo: rpc connection re-established")
state = state |> Map.put(:connected, true)

# Parse new blocks when they arrived
high_cnt = retrieve(:high_cnt, :pkh_index)
cond do
high_cnt == block_cnt -> Logger.info("Explorer.Repo: up to date")
state.parsing == false -> parse_blocks!(high_cnt, block_cnt)
true -> nil
end

# Return state
state
{:error, _} ->
# TODO: log error
if Map.get(state, :connected), do: Logger.warn("PeerAssets.Repo: rpc connection lost")
state |> Map.put(:connected, false)# no connection, just ignore until restored
end

parse_new_blocks(reload_interval)
{:noreply, new_state}
end

def handle_info({_ref, %{height: height} }, state) do
# TODO register parsed block height
{:noreply, state}
end

def handle_info({:DOWN, _ref, :process, _pid, :normal}, state) do
# ignore
{:noreply, state}
end

def handle_cast({:parse_blocks, block, low, high}, state) do
if (high-low > 1000) do
range = Integer.to_string(low) <> " - " <> Integer.to_string(high)
progress = Float.to_string(Float.round((high-block.height)/(high-low)*100, 2)) <> "%"
Logger.info("Explorer.Repo: Parsing blocks in range: " <> range <> " progress: " <> progress)
end
block |> parse_blocks!(low, high)
{:noreply, state |> Map.put(:parsing, true)}
end

def handle_cast({:parsing_done, low, high}, state) do
if (high-low > 1000) do
range = Integer.to_string(low) <> " - " <> Integer.to_string(high)
Logger.info("Explorer.Repo: Parsing blocks in range: " <> range <> " progress: 100%")
end

high |> store(:high_cnt, :pkh_index)
{:noreply, state |> Map.put(:parsing, false)}
end

## Private

defp retrieve(key, table) do
case :ets.lookup(table, key) do
[{_key, result}] -> result
[] -> []
end
end

defp store(value, key, table) do
table
|> :ets.insert({key, value})
end

defp add_to_db({:pkh, pkh}, txn, _block) do
txn_id = txn.id |> Base.decode16!(case: :lower)
[txn_id | retrieve(pkh, :pkh_index)]
|> store(pkh, :pkh_index)
end
defp add_to_db({:sh, sh}, txn, _block) do
txn_id = txn.id |> Base.decode16!(case: :lower)
[txn_id | retrieve(sh, :sh_index)]
|> store(sh, :sh_index)
end
defp add_to_db({:op_return, data}, txn, block) do
txn_id = txn.id |> Base.decode16!(case: :lower)
:op_return |> :ets.insert({txn_id, %{height: block.height, data: data}})
end
defp add_to_db({:coinbase, _script}, _txn, _block), do: nil
defp add_to_db({:empty}, _txn, _block), do: nil
defp add_to_db({:error, reason, inoutput}, txn, _block) do
Logger.error """
Explorer: #{reason}:
txn_id: #{txn.id}
#{inspect(inoutput)}
"""
end

defp parse_blocks!(low, high) do
hash = :rpc |> Gold.getblockhash!(high)
block = :rpc |> Gold.getblock!(hash)
GenServer.cast(__MODULE__, {:parse_blocks, block, low, high})
end

defp parse_blocks!(block, low, high) do
block |> process_block

prev_block = :rpc |> Gold.getblock!(block.previousblockhash)

cond do
# Done when low is reached
prev_block.height == low -> GenServer.cast(__MODULE__, {:parsing_done, low, high})
# Put every 100th block on message queue as backpressure
prev_block.height |> is_multiple_of?(100) -> GenServer.cast(__MODULE__, {:parse_blocks, prev_block, low, high})
# Process next block directly
true -> prev_block |> parse_blocks!(low, high)
end
end

defp process_block(block) do
@tasksup_name
|> Task.Supervisor.async(fn () ->
txns = block.txns
|> Enum.map(fn(txn_id) ->
txn = txn_id |> RPC.gettransaction!

outputs = txn.outputs
|> Enum.map(&(parse_script(&1)))

inputs = txn.inputs
|> Enum.map(&(parse_script(&1)))

%{id: txn_id, outputs: outputs, inputs: inputs}
end)

# Update the database
txns
|> Enum.each(fn(txn) ->
txn.outputs
|> Enum.each(&(add_to_db(&1, txn, block)))
txn.inputs
|> Enum.each(&(add_to_db(&1, txn, block)))
end)

%{height: block.height}
end)
end

defp parse_script(inoutput) do
parsed = inoutput |> Script.parse_address
case parsed do
{:ok, addr} -> {:pkh, Address.raw(addr)}
other -> other
end
end

defp is_multiple_of?(to_test, base) do
div = to_test/base
(Float.ceil(div) == div)
end

end
7 changes: 4 additions & 3 deletions apps/explorer/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ defmodule Mercator.Explorer.Mixfile do
lockfile: "../../mix.lock",
elixir: "~> 1.3",
build_embedded: Mix.env == :prod,
start_permanent: false, # Mix.env == :prod,
start_permanent: Mix.env == :prod,
deps: deps]
end

# Configuration for the OTP application
#
# Type "mix help compile.app" for more information
def application do
[applications: [:logger],
[applications: [:logger, :rpc],
mod: {Mercator.Explorer, []}]
end

Expand All @@ -36,6 +36,7 @@ defmodule Mercator.Explorer.Mixfile do
#
# Type "mix help deps" for more examples and options
defp deps do
[{:exrm, "~> 1.0"}]
[{:rpc, in_umbrella: true},
{:exrm, "~> 1.0"}]
end
end
52 changes: 52 additions & 0 deletions apps/rpc/lib/cache.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
defmodule Mercator.RPC.Cache do
use GenServer

## Client API

@doc """
Starts the RPC Cache.
"""
def start_link(size) do
GenServer.start_link(__MODULE__, {:ok, %{size: size}}, name: __MODULE__)
end

def call(id, fun) do
case :ets.lookup(:txn_cache, id) do
[{id, result}] -> result
[] ->
result = fun.(id)
GenServer.call(__MODULE__, {:register, id, result})
result
end
end

## Server Callbacks

def init({:ok, %{size: size}}) do
:ets.new(:txn_cache, [:set, :protected, :named_table, read_concurrency: true])
{:ok, %{size: size,
total: 0,
queue: :queue.new}}
end

def handle_call({:register, id, result}, _from, state) do
full = state.total == state.size
:ets.insert(:txn_cache, {id, result})

new_queue = case full do
false -> state.queue
true ->
{{:value, to_remove}, q} = :queue.out(state.queue)
:ets.delete(:txn_cache, to_remove)
q
end

new_state = %{
size: state.size,
total: state.total + (if (full) do 0 else 1 end),
queue: :queue.in(id, new_queue)
}

{:reply, :ok, new_state}
end
end
Loading

0 comments on commit d91384c

Please sign in to comment.