-
Notifications
You must be signed in to change notification settings - Fork 39
/
Copy path03-introduce-pubsub.Rmd
145 lines (106 loc) · 5.83 KB
/
03-introduce-pubsub.Rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# Introduce PubSub as a communication method
## Objectives
- consider reasons why introducing a PubSub communication would be beneficial
- implement the PubSub communication between the `Streamer.Binance` and the `Naive.Trader`(s)
## Design
First, let's look at the current situation:
```{r, fig.align="center", out.width="30%", echo=FALSE}
knitr::include_graphics("images/chapter_03_01_current_situation.png")
```
We started with the Binance streamer calling the `send_event/1` function on the `Naive` module. The `Naive` module then calls the trader process using the GenServer's `cast/2` function(via its registered name).
\newpage
The next step in the process of extending our trading strategy will be to scale it to run multiple `Naive.Trader` processes in parallel. To be able to do that we will need to remove the option to register the `trader` process with a name(as only one process can be registered under a single name).
```{r, fig.align="center", out.width="30%", echo=FALSE}
knitr::include_graphics("images/chapter_03_02_current_situation_failed.png")
```
The second issue with that design was the fact that the `Streamer` needs to be aware of all processes that are interested in the streamed data and explicitly push that information to them.
To fix those issues we will invert the design and introduce a PubSub mechanism:
```{r, fig.align="center", out.width="25%", echo=FALSE}
knitr::include_graphics("images/chapter_03_03_phoenix_pubsub.png")
```
The streamer will broadcast trade events to the PubSub topic and whatever is interested in that data, can subscribe to the topic and it will receive the broadcasted messages.
There's no coupling between the `Streamer` and `Naive` app anymore.
\newpage
We can now introduce multiple traders that will subscribe to the topic and they will receive messages from the PubSub:
```{r, fig.align="center", out.width="100%", out.height="35%", echo=FALSE}
knitr::include_graphics("images/chapter_03_04_naive_trader_group.png")
```
Going even further down the line we can picture that system could consist of other processes interested in the streamed data. An example of those could be a process that will save all streamed information to the database to be utilized in backtesting later on:
```{r, fig.align="center", out.width="100%", out.height="45%", echo=FALSE}
knitr::include_graphics("images/chapter_03_05_other_services.png")
```
\newpage
## Implementation
We will start by adding a [`Phoenix.PubSub`](https://github.com/phoenixframework/phoenix_pubsub) library to both `Streamer` and `Naive` app(as both will be using it, `Streamer` app as a broadcaster and `Naive` app as a subscriber).
Scrolling down through its readme on GitHub we can see that we need to add `:phoenix_pubsub` to list of dependencies:
```{r, engine = 'elixir', eval = FALSE}
# /apps/streamer/mix.exs & /apps/naive/mix.exs
defp deps do
[
...
{:phoenix_pubsub, "~> 2.0"},
...
]
end
```
Remember to place it so the list will keep alphabetical order. The second step in the readme says that we need to add PubSub as a child of our app. We need to decide where we will put it, `Streamer` sounds like a good starting point. We will modify the `/apps/streamer/lib/streamer/application.ex` module by appending the PubSub to it:
```{r, engine = 'elixir', eval = FALSE}
# /apps/streamer/lib/streamer/application.ex
def start(_type, _args) do
children = [
{
Phoenix.PubSub,
name: Streamer.PubSub, adapter_name: Phoenix.PubSub.PG2
}
]
...
end
```
We will add the `:adapter_name` option to instruct PubSub to use [`pg`](http://erlang.org/doc/man/pg.html) adapter, which will give us distributed process groups.
We will now modify the streamer to broadcast a message to PubSub topic instead of using the `Naive` module's function:
```{r, engine = 'elixir', eval = FALSE}
# /apps/streamer/lib/streamer/binance.ex
defp process_event(...) do
...
Phoenix.PubSub.broadcast(
Streamer.PubSub,
"TRADE_EVENTS:#{trade_event.symbol}",
trade_event
)
end
```
\newpage
Inside the trader on init we need to subscribe to the "TRADE_EVENTS" PubSub channel:
```{r, engine = 'elixir', eval = FALSE}
# /apps/naive/lib/naive/trader.ex
def init(...) do
...
Phoenix.PubSub.subscribe(
Streamer.PubSub,
"TRADE_EVENTS:#{symbol}"
)
...
end
```
Next, we need to convert all `handle_cast` callbacks to `handle_info` inside our `Trader` module as PubSub doesn't use `GenServer.cast/2` to send messages over to subscribers.
The final change will be to remove the `send_event` function from the `Naive`
module as it's no longer required.
Our update is now finished so we can start an iex session to see how it works.
First, we will start a streamer process that will broadcast messages
to PubSub. Next, we will start trading on the same symbol. On init, the trader will subscribe to a PubSub channel and it will make a full trade cycle.
```{r, engine = 'bash', eval = FALSE}
$ iex -S mix
...
iex(1)> Streamer.start_streaming("xrpusdt")
{:ok, #PID<0.483.0>}
iex(2)> Naive.Trader.start_link(%{symbol: "XRPUSDT", profit_interval: "-0.01"})
23:46:37.482 [info] Initializing new trader for XRPUSDT
{:ok, #PID<0.474.0>}
23:46:55.179 [info] Placing BUY order for XRPUSDT @ 0.29462000, quantity: 100
23:46:55.783 [info] Buy order filled, placing SELL order for XRPUSDT @ 0.29225),
quantity: 100.00000000
23:46:56.029 [info] Trade finished, trader will now exit
```
This shows that the new trader process successfully subscribed to the PubSub, received the broadcasted messages, placed buy/sell orders, and terminated after the full trade cycle finished.
[Note] Please remember to run the `mix format` to keep things nice and tidy.
The source code for this chapter can be found on [GitHub](https://github.com/Cinderella-Man/hands-on-elixir-and-otp-cryptocurrency-trading-bot-source-code/tree/chapter_03)