Skip to content

Commit

Permalink
Merge pull request #3 from pushex-project/transform_fn
Browse files Browse the repository at this point in the history
Transform fn
  • Loading branch information
sb8244 authored May 4, 2019
2 parents ad656fc + f9336c9 commit 8618ed7
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 20 deletions.
40 changes: 40 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,43 @@ You can customize the `ShardSupervisor` and `Handler` by specifying the size of
is useful if you want to have multiple Supervisors in your application. This is documented in the hexdocs.

The `Handler` will unsubscribe from all subscribed metrics when it terminates.

## Performance

The default `:telemetry` execution will run in the process of the caller. This means that no binaries are
copied. `TelemetryAsync`, however, will copy binaries (potentially large maps) due to crossing a process boundary.
Using synchronous handlers is probably useful for many people and you should go asynchronous only if you are
okay with the memory implications of it. In theory it will allow for higher throughput to your main processes (business requests)
and offload metrics to be async.

A way to help alleviate binary copying is provided. You are able to set `transform_fn` option on the `Handler` process.
This option will run the provided function for *every* execution it receives, before it crosses the process boundary.
You can return a tuple containing the new measurements and metadata like `{measurements, metadata}` and these will be
provided to `:telemetry.execute`. You must always provide a match, so the follow pattern is encouraged:

```elixir
defmodule TestTransform do
def transform(
[:metric_i_want_to_transform],
measurements,
%{some_metadata: %{nested: meta}}
) do
{
Map.take(measurements, [:key_i_care_about]),
%{nested: meta}
}
end

def transform([:removes, :everything], _a, _b) do
{%{}, %{}}
end

def transform(_, a, b) do
{a, b}
end
end
```

This will allow you to utilize pattern matching on the metric names you care about (without `:async` added). You
can modify the payload or completely remove it by setting it to empty maps. A default case should be provided to
be an identity function.
34 changes: 25 additions & 9 deletions lib/telemetry_async/handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ defmodule TelemetryAsync.Handler do
* metrics -(required) Must be provided. This is a list of telemetry metric names. They must be lists of atoms, like telemetry accepts
* pool_size - (optional) The size of the ShardSupervisor pool. This defaults to the number of schedulers
* prefix - (optional) An atom that is used to name the individual Shards. Defaults to `TelemetryAsync.Shard`
* transform_fn - (optional) A function/3 that accepts the metric name (without async prepended), measurements, metadata and
returns a tuple `{measurements, metadata}` which will be executed async. This allows smaller data to cross
the process boundary. Like `:telemetry` recommends, it is recommended to provide a `&Module.function/3` capture
rather than providing an anonymous function.
The prefix and pool_size should match a ShardSupervisor started with the same options or the telemetry events will not be re-broadcast.
"""
Expand All @@ -31,19 +35,23 @@ defmodule TelemetryAsync.Handler do
metrics = Keyword.fetch!(opts, :metrics)
pool_size = Keyword.get(opts, :pool_size, ShardSupervisor.default_pool_size())
prefix = Keyword.get(opts, :prefix, Shard.default_prefix())
names = attach_metrics(metrics, pool_size, prefix)
transform_fn = Keyword.get(opts, :transform_fn)
names = attach_metrics(metrics, pool_size, prefix, transform_fn)

Process.flag(:trap_exit, true)

{:ok, %{names: names, opts: opts}}
{:ok, %{names: names, opts: opts, transform_fn: transform_fn}}
end

@doc false
def handler(metric, measurements, metadata, %{pool_size: pool_size, prefix: prefix}) do
ShardSupervisor.random_shard(pool_size: pool_size, prefix: prefix)
|> Shard.execute(fn ->
:telemetry.execute([:async | metric], measurements, metadata)
end)
def handler(metric, measurements, metadata, config = %{transform_fn: transform_fn})
when is_function(transform_fn, 3) do
{measurements, metadata} = transform_fn.(metric, measurements, metadata)
exec_handler(metric, measurements, metadata, config)
end

def handler(metric, measurements, metadata, config) do
exec_handler(metric, measurements, metadata, config)
end

@doc false
Expand All @@ -53,14 +61,22 @@ defmodule TelemetryAsync.Handler do
end)
end

defp attach_metrics(metrics, pool_size, prefix) do
defp exec_handler(metric, measurements, metadata, %{pool_size: pool_size, prefix: prefix}) do
ShardSupervisor.random_shard(pool_size: pool_size, prefix: prefix)
|> Shard.execute(fn ->
:telemetry.execute([:async | metric], measurements, metadata)
end)
end

defp attach_metrics(metrics, pool_size, prefix, transform_fn) do
Enum.map(metrics, fn metric ->
name = [__MODULE__ | [prefix | metric]] |> Module.concat()

:ok =
:telemetry.attach(name, metric, &__MODULE__.handler/4, %{
pool_size: pool_size,
prefix: prefix
prefix: prefix,
transform_fn: transform_fn
})

name
Expand Down
39 changes: 30 additions & 9 deletions test/integration.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
IO.puts("start")

defmodule TestTransform do
def transform([:test], _a, _b) do
{%{data: true}, %{transformed: true}}
end

def transform([:other, :test], _a, _b) do
{%{other: true}, %{test: true}}
end

def transform(_, a, b) do
{a, b}
end
end

metrics = [
[:test],
[:other, :test],
Expand All @@ -17,19 +31,26 @@ Supervisor.start_link([{TelemetryAsync.Handler, pool_size: 4, prefix: :steve, me
strategy: :one_for_one
)

Supervisor.start_link([{TelemetryAsync.ShardSupervisor, pool_size: 4, prefix: :transformed}],
strategy: :one_for_one
)

Supervisor.start_link(
[
{TelemetryAsync.Handler,
pool_size: 4, prefix: :transformed, metrics: metrics, transform_fn: &TestTransform.transform/3}
],
strategy: :one_for_one
)

Supervisor.start_link([TelemetryAsync.ShardSupervisor], strategy: :one_for_one)
Supervisor.start_link([{TelemetryAsync.Handler, metrics: metrics}], strategy: :one_for_one)

:telemetry.attach(
:test,
[:async, :test],
fn a, b, c, d ->
IO.inspect({a, b, c, d})
end,
nil
)
:telemetry.attach(:test, [:async, :test], fn a, b, c, d -> IO.inspect({a, b, c, d}) end, nil)
:telemetry.attach(:other_test, [:async, :other, :test], fn a, b, c, d -> IO.inspect({a, b, c, d}) end, nil)

:telemetry.execute([:test], %{}, %{})
:telemetry.execute([:test], %{a: 1}, %{b: 2})
:telemetry.execute([:other, :test], %{a: 1}, %{b: 2})

Process.sleep(100)

Expand Down
39 changes: 37 additions & 2 deletions test/telemetry_async/handler_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ defmodule TelemetryAsync.HandlerTest do
assert b == %{a: 1}
assert c == %{b: 2}
assert d == nil
Logger.info(test)
Logger.info("ok")
end,
nil
) == :ok

assert capture_log(fn ->
assert :telemetry.execute(metric, %{a: 1}, %{b: 2}) == :ok
Process.sleep(50)
end) =~ to_string(test)
end) =~ "ok"
end

test "the metric is not dispatched if there is no shard supervisor", %{test: test} do
Expand Down Expand Up @@ -95,5 +95,40 @@ defmodule TelemetryAsync.HandlerTest do

assert :telemetry.list_handlers(metric) |> length() == 0
end

test "a transformation function can be provided", %{test: test} do
metric = [HandlerTest_6]
assert {:ok, _sup_pid} = ShardSupervisor.start_link(prefix: test)

assert {:ok, _pid} =
Handler.start_link(
metrics: [metric],
prefix: test,
transform_fn: fn a, b, c ->
assert a == metric
assert b == %{a: 1, extra: true}
assert c == %{b: 2, extra: true}
{%{a: 1}, %{b: 2}}
end
)

assert :telemetry.attach(
test,
[:async | metric],
fn a, b, c, d ->
assert a == [:async | metric]
assert b == %{a: 1}
assert c == %{b: 2}
assert d == nil
Logger.info("ok")
end,
nil
) == :ok

assert capture_log(fn ->
assert :telemetry.execute(metric, %{a: 1, extra: true}, %{b: 2, extra: true}) == :ok
Process.sleep(50)
end) =~ "ok"
end
end
end

0 comments on commit 8618ed7

Please sign in to comment.