-
Notifications
You must be signed in to change notification settings - Fork 39
/
Copy path19-idiomatic-elixir.Rmd
467 lines (343 loc) · 17.9 KB
/
19-idiomatic-elixir.Rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
# Idiomatic OTP
## Objectives
- the concept
- initial implementation
- idiomatic solution
## The concept
In this chapter, we will look into building an OHLC(open-high-low-close) indicator. We will discuss different potential implementations, which will give us an excellent example to understand the idiomatic usage of the OTP framework.
The OHLC indicator consists of four prices that could be used to draw a candle on the chart:
```{r, fig.align="center", out.width="100%", echo=FALSE}
knitr::include_graphics("images/chapter_19_01_ohlc_candle.jpg")
```
The OHLC indicators get created by collecting the first price (the "open" price), lowest price(the "low" price), highest price(the "high" price) and the last price(the "close" price) within a specific timeframe like 1 minute. Our code will need to be able to generate those for multiple timeframes at once - 1m, 5m, 15m, 1h, 4h and 24h.
## Initial implementation
What we could do is to have multiple GenServer processes subscribed to the trade events topic in the PubSub where each one would update their own OHLC numbers:
```{r, fig.align="center", out.width="100%", echo=FALSE}
knitr::include_graphics("images/chapter_19_02_multi_workers.jpg")
```
Let's start by creating a new application inside our umbrella called "indicators"(run below inside terminal):
```{r, engine = 'bash', eval = FALSE}
$ cd apps
$ mix new indicator --sup
```
We can now create a new directory called "ohlc" inside the "/apps/indicator/lib/indicator" directory, where we will create the GenServer "worker.ex" file:
```{r, engine = 'elixir', eval = FALSE}
# /apps/indicator/lib/indicator/ohlc/worker.ex
defmodule Indicator.Ohlc.Worker do
use GenServer
def start_link({symbol, duration}) do
GenServer.start_link(__MODULE__, {symbol, duration})
end
def init({symbol, duration}) do
{:ok, {symbol, duration}}
end
end
```
\newpage
As each worker will need to subscribe to the PubSub's `"TRADE_EVENTS:#{symbol}"` topic, we can update the `init/1` function to do that:
```{r, engine = 'elixir', eval = FALSE}
# /apps/indicator/lib/indicator/ohlc/worker.ex
# add those at the top of the worker module
require Logger
@logger Application.compile_env(:core, :logger)
@pubsub_client Application.compile_env(:core, :pubsub_client)
...
# updated `init/1` function
def init({symbol, duration}) do
symbol = String.upcase(symbol)
@logger.info("Initializing a new OHLC worker(#{duration} minutes) for #{symbol}")
@pubsub_client.subscribe(
Core.PubSub,
"TRADE_EVENTS:#{symbol}"
)
{:ok, {symbol, duration}}
end
```
Following the pattern established by the `Naive.Trader`, we use the module's attributes(with values based on the configuration) instead of hardcoded module names.
Additionally, we've used the `Core.PubSub` and `Phoenix.PubSub`(indirectly) modules so we need to add them to the dependencies list of the `indicator` application:
```{r, engine = 'elixir', eval = FALSE}
# /apps/indicator/mix.exs
defp deps do
[
{:core, in_umbrella: true}, # <= added
{:phoenix_pubsub, "~> 2.0"} # <= added
...
```
As we subscribed to the PubSub, we need to provide a callback that will handle the incoming trade events:
```{r, engine = 'elixir', eval = FALSE}
# /apps/indicator/lib/indicator/ohlc/worker.ex
# add this at the top
alias Core.Struct.TradeEvent
def handle_info(%TradeEvent{} = trade_event, ohlc) do
{:noreply, Indicator.Ohlc.process(ohlc, trade_event)}
end
```
To avoid mixing our business logic with the GenServer boilerplate(as discussed in the last chapter), we will place it in a new module. First, we need to create a new file `/apps/indicator/lib/indicator/ohlc.ex` and the `Indicator.Ohlc` module inside it:
```{r, engine = 'elixir', eval = FALSE}
# /apps/indicator/lib/indicator/ohlc.ex
defmodule Indicator.Ohlc do
end
```
The `Indicator.Ohlc` module will define a struct that the `Indicator.Ohlc.Worker` will use as a state:
```{r, engine = 'elixir', eval = FALSE}
# /apps/indicator/lib/indicator/ohlc.ex
@enforce_keys [
:symbol,
:start_time,
:duration
]
defstruct [
:symbol,
:start_time,
:duration,
:open,
:high,
:low,
:close
]
```
Most of the above are self-descriptive, besides the `start_time`(a Unix timestamp) and the `duration`(the timeframe in minutes).
We can now progress to the implementation of the `process/2` function, but instead of focusing on it, we will start from the "deepest"/"bottom" function that it will rely on and work our way up. As we discussed in the last chapter, we will try to maximize the amount of pure code.
We can imagine that the bottom function will be involved in merging of the trade event's data with the current OHLC data. Two things can happen, either:
- trade event's `trade_time` is **within** the current OHLC timeframe. Trade event's price will get merged to the current OHLC
- trade event's `trade_time` is **outside** the current OHLC timeframe. The current(now old) OHLC will be returned together with a new OHLC based on the trade event's price
\newpage
Here's the implementation:
```{r, engine = 'elixir', eval = FALSE}
# /apps/indicator/lib/indicator/ohlc.ex
def merge_price(%__MODULE__{} = ohlc, price, trade_time) do
if within_current_timeframe(ohlc.start_time, ohlc.duration, trade_time) do
{nil, %{ohlc | low: min(ohlc.low, price), high: max(ohlc.high, price), close: price}}
else
{ohlc, generate_ohlc(ohlc.symbol, ohlc.duration, price, trade_time)}
end
end
```
together with the `within_current_timeframe/3` and `generate_ohlc/3` function:
```{r, engine = 'elixir', eval = FALSE}
# /apps/indicator/lib/indicator/ohlc.ex
def within_current_timeframe(start_time, duration, trade_time) do
end_time = start_time + duration * 60
trade_time = div(trade_time, 1000)
start_time <= trade_time && trade_time < end_time
end
def generate_ohlc(symbol, duration, price, trade_time) do
start_time = div(div(div(trade_time, 1000), 60), duration) * duration * 60
%__MODULE__{
symbol: symbol,
start_time: start_time,
duration: duration,
open: price,
high: price,
low: price,
close: price
}
end
```
Moving up the call chain, besides merging the trade event's data into OHLC data, we need to deal with the initial state of the worker(the `{symbol, duration}` tuple). Let's add a `process/2` function with two clauses handling each of those cases:
```{r, engine = 'elixir', eval = FALSE}
# /apps/indicator/lib/indicator/ohlc.ex
# add the below line at the top of the module
alias Core.Struct.TradeEvent
def process(%__MODULE__{} = ohlc, %TradeEvent{} = trade_event) do
{old_ohlc, new_ohlc} = merge_price(ohlc, trade_event.price, trade_event.trade_time)
maybe_broadcast(old_ohlc)
new_ohlc
end
def process({symbol, duration}, %TradeEvent{} = trade_event) do
generate_ohlc(symbol, duration, trade_event.price, trade_event.trade_time)
end
```
The second clause takes care of the initial state of the OHLC worker(happens upon receiving the first trade event - once in the lifetime of the worker process).
The first clause handles all the other trade events.
This order of clauses could appear weird, but it makes a lot of sense as Elixir will try pattern match clauses from the top, and in our case, the first(top) clause will be used for almost all of the calls.
### Maybe functions
Let's get back to the first clause of the `process/2` function as it uses another Elixir/Erlang pattern that we didn't discuss before - the `maybe_do_x` functions.
In case of the incoming trade event's `trade_time` **outside** of the current OHLC's timeframe, we would like to **broadcast** the current OHLC and return a new OHLC(based on the trade event's data). Otherwise(trade event's `trade_time` **within** the current OHLC), the trade event's `price` is merged into the current OHLC, and it's **not** broadcasted.
This sounds very similar to the if-else, but it's most of the time achieved using the pattern matching:
```{r, engine = 'elixir', eval = FALSE}
# /apps/indicator/lib/indicator/ohlc.ex
# add below lines at the top of the module
require Logger
@pubsub_client Application.compile_env(:core, :pubsub_client)
defp maybe_broadcast(nil), do: :ok
defp maybe_broadcast(%__MODULE__{} = ohlc) do
Logger.debug("Broadcasting OHLC: #{inspect(ohlc)}")
@pubsub_client.broadcast(
Core.PubSub,
"OHLC:#{ohlc.symbol}",
ohlc
)
end
```
By using a separate function, we avoided branching using if-else inside the `process/2` function. The idea is not necessarily to avoid if statements but to keep the code at the consistent level of abstraction, so it's easier to understand. Inside the `process/2` function, we can understand what's going on just by reading the function names inside - there's no "logic".
Sometimes people advise that "code should be like well written prose" and `maybe_do_x` functions are one of the ways to achieve this "nirvana" state.
### Testing
At this moment, our code should compile, and we should already be able to test it. First, let's change the logging level to `debug` inside `config/config.exs` to see what OHLC structs got broadcasted:
```{r, engine = 'elixir', eval = FALSE}
# /config/config.exs
config :logger,
level: :debug # <= updated
```
We can now progress with testing:
```{r, engine = 'bash', eval = FALSE}
$ iex -S mix
iex(1)> Streamer.start_streaming("XRPUSDT")
...
iex(2)> Indicator.Ohlc.Worker.start_link({"XRPUSDT", 1})
{:ok, #PID<0.447.0>}
...
22:45:00.335 [debug] Broadcasting OHLC: %Indicator.Ohlc{close: "0.63880000", duration: 1,
high: "0.63890000", low: "0.63840000", open: "0.63860000", start_time: 1644014640,
symbol: "XRPUSDT"}
```
The above test confirms that we can manually start a single OHLC worker that will aggregate data over a single minute timeframe.
### Supervision
To supervise multiple `Indicators.Worker`s we will use the `DynamicSupervisor`.
We need to update the Indicators application's supervision tree to start a dynamic supervisor:
```{r, engine = 'elixir', eval = FALSE}
# /apps/indicator/lib/indicator/application.ex
children = [
{DynamicSupervisor, strategy: :one_for_one, name: Indicator.DynamicSupervisor}
]
```
\newpage
We can now add `aggregate_ohlcs/1` function that will start all workers:
```{r, engine = 'elixir', eval = FALSE}
# /apps/indicator/lib/indicator.ex
def aggregate_ohlcs(symbol) do
[1, 5, 15, 60, 4 * 60, 24 * 60]
|> Enum.each(
&DynamicSupervisor.start_child(
Indicator.DynamicSupervisor,
{Indicator.Ohlc.Worker, {symbol, &1}}
)
)
end
```
We can now start multiple OHLC workers just by running a single function:
```{r, engine = 'bash', eval = FALSE}
$ iex -S mix
iex(1)> Streamer.start_streaming("XRPUSDT")
...
iex(2)> Indicator.aggregate_ohlcs("XRPUSDT")
{:ok, #PID<0.447.0>}
```
The above calls will start six OHLC worker processes supervised under the `Indicator.DynamicSupervisor` process.
We could continue with this exercise, add a Registry to be able to stop the indicators and a database to be able to autostart them. Those improvements would get the "indicators" application in line with the other applications, but that's not the goal of this chapter.
## Idiomatic solution
What we will focus on is the **usage of processes**(in our case the GenServers) in our solution. We've split our logic between multiple processes, each aggregating a single timeframe. All of those processes work in the same way. They subscribe to the PubSub topic, merge the incoming data into OHLC structs, and potentially broadcast them. The only difference is the timeframe that they use for merging data.
The solution feels very clean, but also we are using multiple processes to aggregate data for each symbol. In a situation like this, we should always ask ourselves:
- do we need that many processes?
- are we using them to achieve parallelism? Is it required?
- could we reorganize our code to use fewer processes?
The main idea is to keep our code as simple as possible and only use processes when it's absolutely required.
\newpage
Instead of a single worker being responsible for aggregation within a single timeframe, we could restructure our code to make it responsible for all timeframes of the symbol.
This improvement would:
- make our merging code a little bit complex(it would merge multiple timeframes at once)
- decrease the amount of PubSub subscribers by six (instead of six processes per symbol, we would have just one)
- potentially decrease the amount of broadcasted messages (we could group OHLC structs into a single message)
- allow a single worker single worker to decide what to do with all current OHLCs in case of errors, **without** need for any additional symbol supervision level(like `one_for_all`)
- make it **impossible** to aggregate nor broadcast OHLCS of the same symbol in **parallel**
We can see that the solution based on processes could get really complex really quickly(multiple processes per symbol, growing supervision tree, multiple subscriptions, multiple messages etc).
Just to be clear - processes **do** have their place, they are a really powerful tool, but as with every tool, the responsibility lies on us developers to use them wisely - they **must not** be used for code organization.
Enough theory - let's look into how we could update our `Indicator.Ohlc` module to be able to deal with multiple timeframes at once.
Initially, we could be tempted to update the `process/2` function:
```{r, engine = 'elixir', eval = FALSE}
# /apps/indicator/lib/indicator/ohlc.ex
def process([_ | _] = ohlcs, %TradeEvent{} = trade_event) do
results =
ohlcs
|> Enum.map(&merge_price(&1, trade_event.price, trade_event.trade_time))
results |> Enum.map(&maybe_broadcast(elem(&1, 0)))
results |> Enum.map(&elem(&1, 1))
end
def process(symbol, %TradeEvent{} = trade_event) do
[1, 5, 15, 60, 4 * 60, 24 * 60]
|> Enum.map(
&generate_ohlc(
symbol,
&1,
trade_event.price,
trade_event.trade_time
)
)
end
```
We added some logic to both clauses. At this moment, it maybe doesn't look that bad, but it's a great place to mix even more logic with dirty code. This is a great example where we should stop and rethink how we could maximize the amount of pure code.
Instead of expanding the `process/2` function, we could built on top of the `merge_price/3` and `generate_ohlc/4` functions, which deal with a single OHLC data. We could add plural versions of those functions that will understand that we now deal with multiple OHLC structs:
```{r, engine = 'elixir', eval = FALSE}
# /apps/indicator/lib/indicator/ohlc.ex
def merge_prices(ohlcs, price, trade_time) do
results =
ohlcs
|> Enum.map(&merge_price(&1, price, trade_time))
{
results |> Enum.map(&elem(&1, 0)) |> Enum.filter(& &1),
results |> Enum.map(&elem(&1, 1))
}
end
def generate_ohlcs(symbol, price, trade_time) do
[1, 5, 15, 60, 4 * 60, 24 * 60]
|> Enum.map(
&generate_ohlc(
symbol,
&1,
price,
trade_time
)
)
end
```
Now the `process/2` function got back to its almost original shape:
```{r, engine = 'elixir', eval = FALSE}
def process([_ | _] = ohlcs, %TradeEvent{} = trade_event) do
{old_ohlcs, new_ohlcs} = merge_prices(ohlcs, trade_event.price, trade_event.trade_time)
old_ohlcs |> Enum.each(&maybe_broadcast/1)
new_ohlcs
end
def process(symbol, %TradeEvent{} = trade_event) do
generate_ohlcs(symbol, trade_event.price, trade_event.trade_time)
end
```
By introducing the `merge_prices/3` and `generate_ohlcs/3` functions, we were able to keep the dirty part of our code small(it grew only to accommodate multiple broadcasts). The new functions are pure and can be easily tested.
As we modified the interface of the `process/2` function (by removing the `duration`), we need to update the `Indicator.Ohlc.Worker` module to be blissfully not aware that we have different durations:
```{r, engine = 'elixir', eval = FALSE}
# /apps/indicator/lib/indicator/ohlc/worker.ex
def start_link(symbol) do # <= duration removed
GenServer.start_link(__MODULE__, symbol) # <= duration removed
end
def init(symbol) do # <= duration removed
symbol = String.upcase(symbol)
@logger.debug("Initializing new a OHLC worker for #{symbol}") # <= duration skipped
...
{:ok, symbol} # <= duration removed
end
```
The final update will be to simplify the `Indicator.aggregate_ohlcs/1` function just to start a single OHLC worker:
```{r, engine = 'elixir', eval = FALSE}
# /apps/indicator/lib/indicator.ex
def aggregate_ohlcs(symbol) do
DynamicSupervisor.start_child(
Indicator.DynamicSupervisor,
{Indicator.Ohlc.Worker, symbol}
)
end
```
We can now test our implementation by running the following once again:
```{r, engine = 'bash', eval = FALSE}
$ iex -S mix
iex(1)> Streamer.start_streaming("XRPUSDT")
...
iex(2)> Indicator.Ohlc.Worker.start_link("XRPUSDT")
...
23:23:00.416 [info] Broadcasting OHLC: %Indicator.Ohlc{close: "0.68230000", duration: 1,
high: "0.68330000", low: "0.68130000", open: "0.68160000", start_time: 1644189720,
symbol: "XRPUSDT"}
```
The logged message(s) confirms that our OHLC aggregator logic works as expected using a single process.
In the case of the OHLC aggregator, we've seen that there are some trade-off between running multiple processes(simpler code) vs single process(more complex merging) - it could be unclear what's the benefit of limiting the usage of processes(especially that they are "almost free").
I totally understand. Please treat this chapter as an introduction to the idiomatic Elixir. In the next chapter, we will apply this concept when refactoring the Naive trading strategy to see its practical benefits.
[Note] Please remember to run the `mix format` to keep things nice and tidy.
The source code for this chapter can be found on [GitHub](https://github.com/Cinderella-Man/hands-on-elixir-and-otp-cryptocurrency-trading-bot-source-code/tree/chapter_19)