From 10d44cf9a8df32c5a216b9ca574d429a977d1b88 Mon Sep 17 00:00:00 2001 From: dania02525 Date: Fri, 19 Jan 2018 00:26:55 -0700 Subject: [PATCH] use genserver for order management --- lib/elixir_exchange/application.ex | 2 +- lib/elixir_exchange/caches/order_cache.ex | 50 -------- .../data/market_order_filler.ex | 65 ---------- lib/elixir_exchange/data/order_data.ex | 63 +-------- lib/elixir_exchange/imports/format_helpers.ex | 23 ++++ lib/elixir_exchange/imports/server_helpers.ex | 121 ++++++++++++++++++ .../{caches => servers}/graph_cache.ex | 0 lib/elixir_exchange/servers/order_server.ex | 64 +++++++++ .../servers/order_supervisor.ex | 22 ++++ .../trade_logic/order_filler.ex | 84 ++++++++++++ .../channels/trading_view.ex | 27 +--- 11 files changed, 320 insertions(+), 201 deletions(-) delete mode 100644 lib/elixir_exchange/caches/order_cache.ex delete mode 100644 lib/elixir_exchange/data/market_order_filler.ex create mode 100644 lib/elixir_exchange/imports/format_helpers.ex create mode 100644 lib/elixir_exchange/imports/server_helpers.ex rename lib/elixir_exchange/{caches => servers}/graph_cache.ex (100%) create mode 100644 lib/elixir_exchange/servers/order_server.ex create mode 100644 lib/elixir_exchange/servers/order_supervisor.ex create mode 100644 lib/elixir_exchange/trade_logic/order_filler.ex diff --git a/lib/elixir_exchange/application.ex b/lib/elixir_exchange/application.ex index a837ef6..85218d6 100644 --- a/lib/elixir_exchange/application.ex +++ b/lib/elixir_exchange/application.ex @@ -13,8 +13,8 @@ defmodule ElixirExchange.Application do # Start the endpoint when the application starts supervisor(ElixirExchangeWeb.Endpoint, []), + supervisor(ElixirExchange.OrderSupervisor, []), worker(ElixirExchange.GraphCache, []), - worker(ElixirExchange.OrderCache, []), worker(ElixirExchange.Cron, [[ %{ module: ElixirExchange.GraphData, diff --git a/lib/elixir_exchange/caches/order_cache.ex b/lib/elixir_exchange/caches/order_cache.ex deleted file mode 100644 index a0f69db..0000000 --- a/lib/elixir_exchange/caches/order_cache.ex +++ /dev/null @@ -1,50 +0,0 @@ -defmodule ElixirExchange.OrderCache do - use GenServer - - def start_link do - GenServer.start_link(__MODULE__, :ok) - end - - def init(_) do - :ets.new(:order_cache, [:set, :public, :named_table]) - init_cache() - {:ok, []} - end - - def update_modified_sell_orders(pair, orders) do - ids = Enum.map(orders, fn(o)-> o.id end) - sells = - sell_orders(pair) - |> Enum.reject(fn(o)-> - Enum.member?(ids, o.id) - end) - - modified_sells = - orders - |> Enum.filter(fn(o)-> - o.status == "open" || o.status == "partially_filled" - end) - - :ets.insert(:graph_cache, {{pair, "sell"}, modified_sells ++ sells}) - end - - def buy_orders(pair) do - {_key, buys} = List.first(:ets.lookup(:graph_cache, {pair, "buy"})) - buys - end - - def sell_orders(pair) do - {_key, sells} = List.first(:ets.lookup(:graph_cache, {pair, "sell"})) - sells - end - - def init_cache do - Application.fetch_env!(:elixir_exchange, :pairs) - |> Enum.each(fn(pair)-> - buys = ElixirExchange.OrderData.query_active_buy_orders(pair) - sells = ElixirExchange.OrderData.query_active_sell_orders(pair) - :ets.insert(:graph_cache, {{pair, "buy"}, buys}) - :ets.insert(:graph_cache, {{pair, "sell"}, sells}) - end) - end -end diff --git a/lib/elixir_exchange/data/market_order_filler.ex b/lib/elixir_exchange/data/market_order_filler.ex deleted file mode 100644 index 729a0da..0000000 --- a/lib/elixir_exchange/data/market_order_filler.ex +++ /dev/null @@ -1,65 +0,0 @@ -defmodule ElixirExchange.MarketOrderFiller do - require Logger - - def fill_market_order(order, sell_orders) do - sorted = - Enum.sort_by(sell_orders, fn(o)-> - [o.price, o.created] - end) - - [order, modified_orders] = do_fill_market_order(order, sorted, []) - [order, modified_orders, new_market_price(modified_orders)] - end - - # the market order was completely filled - def do_fill_market_order(%{unfilled_quantity: 0} = order, _, modified_orders) do - [Map.put(order, :status, "filled"), modified_orders] - end - - # the market order could not be completely filled ???? - def do_fill_market_order(order, [], modified_orders) do - if order.quantity != order.unfilled_quantity do - [Map.put(order, :status, "partially_filled"), modified_orders] - else - [order, modified_orders] - end - end - - def do_fill_market_order(order, [sell | sells], modified_orders) do - if sell.unfilled_quantity >= order.unfilled_quantity do - filled_sell = - if sell.unfilled_quantity - order.unfilled_quantity > 0 do - sell - |> Map.put(:unfilled_quantity, sell.unfilled_quantity - order.unfilled_quantity) - |> Map.put(:status, "partially_filled") - else - sell - |> Map.put(:unfilled_quantity, 0) - |> Map.put(:status, "filled") - end - - filled_order = - order - |> Map.put(:unfilled_quantity, 0) - |> Map.put(:status, "filled") - - do_fill_market_order(filled_order, sells, [filled_sell | modified_orders]) - else - filled_sell = - sell - |> Map.put(:unfilled_quantity, 0) - |> Map.put(:status, "filled") - - filled_order = - order - |> Map.put(:unfilled_quantity, order.unfilled_quantity - sell.unfilled_quantity) - |> Map.put(:status, "partially_filled") - - do_fill_market_order(filled_order, sells, [filled_sell | modified_orders]) - end - end - - defp new_market_price(filled_sell_orders) do - Map.get(List.first(filled_sell_orders), :price) - end -end diff --git a/lib/elixir_exchange/data/order_data.ex b/lib/elixir_exchange/data/order_data.ex index a7412d3..49bc73b 100644 --- a/lib/elixir_exchange/data/order_data.ex +++ b/lib/elixir_exchange/data/order_data.ex @@ -3,7 +3,7 @@ defmodule ElixirExchange.OrderData do ElixirExchange.FakeOrderData.market_price() end - def query_active_buy_orders(pair) do + def query_active_orders(pair, "buy") do ElixirExchange.FakeOrderData.fake_orders |> Enum.filter(fn({_k, o})-> o.side == "buy" && (o.status == "open" || o.status == "partially_filled") && o.type == "limit" @@ -11,7 +11,7 @@ defmodule ElixirExchange.OrderData do |> Enum.map(fn {_k, v}-> v end) end - def query_active_sell_orders(pair) do + def query_active_orders(pair, "sell") do ElixirExchange.FakeOrderData.fake_orders |> Enum.filter(fn({_k, o})-> o.side == "sell" && (o.status == "open" || o.status == "partially_filled") && o.type == "limit" @@ -26,63 +26,4 @@ defmodule ElixirExchange.OrderData do end) |> Enum.map(fn {_k, v}-> v end) end - - def cached_buy_orders(pair) do - ElixirExchange.OrderCache.buy_orders(pair) - end - - def cached_sell_orders(pair) do - ElixirExchange.OrderCache.sell_orders(pair) - end - - # order filling logic - def fill_market_order(order) do - sell_orders = cached_sell_orders(order.pair) - [order, modified_orders, new_market_price] = - ElixirExchange.MarketOrderFiller.fill_market_order(order, sell_orders) - - ElixirExchange.OrderCache.update_modified_sell_orders(order.pair, modified_orders) - - # asyncronously balance limit orders - - broadcast_new_order_data(order.pair, new_market_price) - end - - def balance_limit_orders do - # not impl - end - - def broadcast_new_order_data(pair, price) do - data = %{ - order_data: %{ - buys: collapse_orders(cached_buy_orders(pair)), - sells: collapse_orders(cached_sell_orders(pair)) - }, - market_price: price - } - - ElixirExchangeWeb.Endpoint.broadcast("trading:#{pair}", "update_orders", data) - end - - defp collapse_orders(orders) do - Enum.reduce(orders, %{}, fn(o, acc)-> - existing = Map.get(acc, o.price) - if existing do - order = %{ - price: o.price, - quantity: existing.quantity + o.unfilled_quantity - } - Map.put(acc, o.price, order) - else - order = %{ - price: o.price, - quantity: o.unfilled_quantity - } - Map.put(acc, o.price, order) - end - end) - |> Enum.map(fn {_k, v}-> - v - end) - end end diff --git a/lib/elixir_exchange/imports/format_helpers.ex b/lib/elixir_exchange/imports/format_helpers.ex new file mode 100644 index 0000000..7db48c0 --- /dev/null +++ b/lib/elixir_exchange/imports/format_helpers.ex @@ -0,0 +1,23 @@ +defmodule ElixirExchange.FormatHelpers do + def collapse_orders(orders) do + Enum.reduce(orders, %{}, fn(o, acc)-> + existing = Map.get(acc, o.price) + if existing do + order = %{ + price: o.price, + quantity: existing.quantity + o.unfilled_quantity + } + Map.put(acc, o.price, order) + else + order = %{ + price: o.price, + quantity: o.unfilled_quantity + } + Map.put(acc, o.price, order) + end + end) + |> Enum.map(fn {_k, v}-> + v + end) + end +end diff --git a/lib/elixir_exchange/imports/server_helpers.ex b/lib/elixir_exchange/imports/server_helpers.ex new file mode 100644 index 0000000..13a85f8 --- /dev/null +++ b/lib/elixir_exchange/imports/server_helpers.ex @@ -0,0 +1,121 @@ +defmodule ElixirExchange.ServerHelpers do + import ElixirExchange.FormatHelpers + + def process_alias(pair, side) do + String.to_atom("#{side}_#{pair}") + end + + ## + # + # Notifier + # + ## + + + def notify_market(old_state, new_state) do + market_leader = + old_state + |> Enum.reverse + |> Enum.find(fn(o)-> + o.type == "limit" && (o.status == "filled" || o.status == "partially_filled") + end) + + do_notify_market(new_state, market_leader) + end + + def do_notify_market(state, %{side: "sell"} = market_leader) do + payload = %{ + order_data: %{ + sells: collapse_orders(state) + }, + market_price: market_leader.price + } + + pair = market_leader.pair + + ElixirExchangeWeb.Endpoint.broadcast("trading:#{pair}", "update_orders", payload) + end + + def do_notify_market(state, %{side: "buy"} = market_leader) do + payload = %{ + order_data: %{ + buys: collapse_orders(state) + }, + market_price: market_leader.price + } + + pair = market_leader.pair + + ElixirExchangeWeb.Endpoint.broadcast("trading:#{pair}", "update_orders", payload) + end + + def do_notify_market(_state, _any) do + :noop + end + + + ## + # + # Order filling + # + ## + + def maybe_fill_order(order, []) do + push_order(order) + [] + end + + def maybe_fill_order(%{type: "market"} = order, state) do + do_fill_order(order, state) + end + + def maybe_fill_order(%{type: "limit", side: "sell"} = order, state) do + if List.first(state).price > order.price do + do_fill_order(order, state) + else + push_order(order) + state + end + end + + def maybe_fill_order(%{type: "limit", side: "buy"} = order, state) do + if List.first(state).price < order.price do + do_fill_order(order, state) + else + push_order(order) + state + end + end + + def do_fill_order(order, state) do + [order, mixed_state] = ElixirExchange.OrderFiller.fill_order(order, state) + + new_state = + Enum.reject(mixed_state, fn(o)-> + o.status == "filled" && o.unfilled_quantity == 0 + end) + + notify_market(mixed_state, new_state) + push_order(order) + + new_state + end + + ## + # + # Order Caching + # + ## + + def push_order(%{unfilled_quantity: 0, status: "filled"}) do + :noop + end + + def push_order(%{side: "buy", pair: pair} = order) do + GenServer.cast(process_alias(pair, "buy"), {:push_order, order}) + end + + def push_order(%{side: "sell", pair: pair} = order) do + GenServer.cast(process_alias(pair, "sell"), {:push_order, order}) + end +end diff --git a/lib/elixir_exchange/caches/graph_cache.ex b/lib/elixir_exchange/servers/graph_cache.ex similarity index 100% rename from lib/elixir_exchange/caches/graph_cache.ex rename to lib/elixir_exchange/servers/graph_cache.ex diff --git a/lib/elixir_exchange/servers/order_server.ex b/lib/elixir_exchange/servers/order_server.ex new file mode 100644 index 0000000..070f7f0 --- /dev/null +++ b/lib/elixir_exchange/servers/order_server.ex @@ -0,0 +1,64 @@ +defmodule ElixirExchange.OrderServer do + use GenServer + + import ElixirExchange.ServerHelpers + + ## + # + # Public Api + # + ## + + def buy_orders(pair) do + GenServer.call(process_alias(pair, "buy"), :get_orders) + end + + def sell_orders(pair) do + GenServer.call(process_alias(pair, "sell"), :get_orders) + end + + def fill_order(%{side: side, pair: pair} = order) do + GenServer.call(process_alias(pair, side), {:fill_order, order}) + end + + ## + # + # GenServer Callbacks + # + ## + + def start_link(pair, side) do + GenServer.start_link(__MODULE__, %{pair: pair, side: side}, name: process_alias(pair, side)) + end + + def init(%{pair: pair, side: side}) do + state = + Enum.sort_by(ElixirExchange.OrderData.query_active_orders(pair, side), fn(o)-> + [o.price, o.created] + end) + + {:ok, state} + end + + def handle_call(:get_orders, _from, state) do + {:reply, state, state} + end + + def handle_call({:fill_order, order}, _from, state) do + {:noreply, maybe_fill_order(order, state)} + end + + def handle_cast({:push_order, order}, state) do + sorted_orders = + Enum.sort_by([ order | state], fn(o)-> + case o.type do + "market" -> [0, 0, o.created] + "limit" -> [1, o.price, o.created] + end + end) + + notify_market(sorted_orders, sorted_orders) + + {:noreply, sorted_orders} + end +end diff --git a/lib/elixir_exchange/servers/order_supervisor.ex b/lib/elixir_exchange/servers/order_supervisor.ex new file mode 100644 index 0000000..340e6b1 --- /dev/null +++ b/lib/elixir_exchange/servers/order_supervisor.ex @@ -0,0 +1,22 @@ +defmodule ElixirExchange.OrderSupervisor do + use Supervisor + + def start_link do + Supervisor.start_link(__MODULE__, []) + end + + def init([]) do + children = + Enum.map(Application.fetch_env!(:elixir_exchange, :pairs), fn(pair)-> + sell_process = String.to_atom("sell_#{pair}") + buy_process = String.to_atom("buy_#{pair}") + [ + worker(ElixirExchange.OrderServer, [pair, "sell"], [id: sell_process]), + worker(ElixirExchange.OrderServer, [pair, "buy"], [id: buy_process]) + ] + end) + |> List.flatten() + + supervise(children, strategy: :one_for_one) + end +end diff --git a/lib/elixir_exchange/trade_logic/order_filler.ex b/lib/elixir_exchange/trade_logic/order_filler.ex new file mode 100644 index 0000000..5383c80 --- /dev/null +++ b/lib/elixir_exchange/trade_logic/order_filler.ex @@ -0,0 +1,84 @@ +defmodule ElixirExchange.OrderFiller do + + def fill_order(new, orders) do + do_fill_order(new, orders, []) + end + + defp do_fill_order(new, [], new_orders) do + [new, Enum.reverse(new_orders)] + end + + defp do_fill_order(%{unfilled_quantity: 0} = new, [order | orders], new_orders) do + do_fill_order(new, orders, [order | new_orders]) + end + + defp do_fill_order(new, [order | orders], new_orders) do + [new, order] = match_order(new, order) + + do_fill_order(new, orders, [order | new_orders]) + end + + defp match_order(new, order) do + case new.type do + "market" -> do_match_order(new, order) + "limit" -> match_limit_order(new, order) + end + end + + defp match_limit_order(new, %{type: "market"} = order) do + match_order(new, order) + end + + defp match_limit_order(%{side: "sell"} = new, order) do + if new.price <= order.price do + do_match_order(new, order) + else + [new, order] + end + end + + defp match_limit_order(%{side: "buy"} = new, order) do + if new.price >= order.price do + do_match_order(new, order) + else + [new, order] + end + end + + defp do_match_order(new, order) do + conditions = [ + order.unfilled_quantity >= new.unfilled_quantity, + order.unfilled_quantity - new.unfilled_quantity > 0 + ] + + updated_order = + case conditions do + [true, true] -> + order + |> Map.put(:unfilled_quantity, order.unfilled_quantity - new.unfilled_quantity) + |> Map.put(:status, "partially_filled") + [true, false] -> + order + |> Map.put(:unfilled_quantity, 0) + |> Map.put(:status, "filled") + [false, _] -> + order + |> Map.put(:unfilled_quantity, 0) + |> Map.put(:status, "filled") + end + + updated_new = + case conditions do + [true, _] -> + new + |> Map.put(:unfilled_quantity, 0) + |> Map.put(:status, "filled") + [false, _] -> + new + |> Map.put(:unfilled_quantity, new.unfilled_quantity - order.unfilled_quantity) + |> Map.put(:status, "partially_filled") + end + + [updated_new, updated_order] + end +end diff --git a/lib/elixir_exchange_web/channels/trading_view.ex b/lib/elixir_exchange_web/channels/trading_view.ex index 6a44c68..3781881 100644 --- a/lib/elixir_exchange_web/channels/trading_view.ex +++ b/lib/elixir_exchange_web/channels/trading_view.ex @@ -1,6 +1,7 @@ defmodule ElixirExchangeWeb.TradingView do use Phoenix.Channel require Logger + import ElixirExchange.FormatHelpers def join("trading:" <> pair, _message, socket) do if Enum.member?(Application.fetch_env!(:elixir_exchange, :pairs), pair) do @@ -17,8 +18,8 @@ defmodule ElixirExchangeWeb.TradingView do market_price: ElixirExchange.OrderData.market_price(pair), graph_data: ElixirExchange.GraphData.cached_history(pair), order_data: %{ - buys: collapse_orders(ElixirExchange.OrderData.cached_buy_orders(pair)), - sells: collapse_orders(ElixirExchange.OrderData.cached_sell_orders(pair)), + buys: collapse_orders(ElixirExchange.OrderServer.buy_orders(pair)), + sells: collapse_orders(ElixirExchange.OrderServer.sell_orders(pair)), my_orders: my_orders(socket, pair) } } @@ -30,28 +31,6 @@ defmodule ElixirExchangeWeb.TradingView do :ok end - defp collapse_orders(orders) do - Enum.reduce(orders, %{}, fn(o, acc)-> - existing = Map.get(acc, o.price) - if existing do - order = %{ - price: o.price, - quantity: existing.quantity + o.unfilled_quantity - } - Map.put(acc, o.price, order) - else - order = %{ - price: o.price, - quantity: o.unfilled_quantity - } - Map.put(acc, o.price, order) - end - end) - |> Enum.map(fn {_k, v}-> - v - end) - end - defp my_orders(socket, pair) do if socket.assigns[:current_user] do ElixirExchange.OrderData.orders_by_user(pair, socket.assigns[:current_user])