diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index b2c46fd..f90b31e 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -144,7 +144,7 @@ jobs: run: mix compile --warnings-as-errors - name: Test - run: mix coveralls.github + run: mix coveralls.github --parallel env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/lib/buffy/throttle_and_timed.ex b/lib/buffy/throttle_and_timed.ex new file mode 100644 index 0000000..003df74 --- /dev/null +++ b/lib/buffy/throttle_and_timed.ex @@ -0,0 +1,372 @@ +# credo:disable-for-this-file Credo.Check.Refactor.LongQuoteBlocks +defmodule Buffy.ThrottleAndTimed do + @moduledoc """ + This is a variation on the `Buffy.Throttle` behavior. + + It keeps the following functionality: + - wait for a specified amount of time before + invoking the work function. If the function is called again before the time has + elapsed, it's a no-op. + + Key difference between `Buffy.Throttle` and `Buffy.ThrottleAndTimed`: + - it will not be terminated once the timer is done, but kept alive + - internally, the existing timer behavior is done via state rather than handling `{:error, {:already_started, pid}}` output of `GenServer.start_link`. + - See note on Horde about state. + - it requires `:loop_interval` field value (set by config) to trigger work repeatedly based on a empty inbox timeout interval, + that is based on [GenServer's timeout feature](https://hexdocs.pm/elixir/1.15/GenServer.html#module-timeouts). + + Main reason for these changes is sometimes there's a need to fall back to a time-interval triggered work, when there aren't any triggers to + start the work. Requirement of this means the process should exist and not get terminated immediately after a successfully throttled work execution. + + ### In other words, we keep the throttle mechanism: + + Once the timer has expired, the function will be called, + and any subsequent calls will start a new timer. + + ```text + call call call call call + | call | call | call | call | + | | | | | | | | | + ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ + │ Timer 1 │ │ Timer 2 │ │ Timer 3 │ │ Timer 4 │ + └─────────| └─────────┘ └─────────┘ └─────────┘ + | | | | + | | | Forth function invocation + | | Third function invocation + | Second function invocation + First function invocation + ``` + + ### With the optionally enabled trigger, ending up in this lifecycle: + + ```mermaid + graph TB + A[Start Buffy] -->|start_link| B(Init Buffy) + B --> |initial handle_continue| W(Do throttled work) + S(Messages sent to Buffy) --> |message to trigger work| D{Can Throttle?} + D --> |YES| W + D --> |NO| C(Ignore message as throttle already scheduled) + S --> |empty inbox timeout interval| P(Do immediate work) + W --> |set message inbox timeout| S + P --> |set message inbox timeout| S + ``` + + ### Note on usage with Horde + + Under Horde, the state unfortunately doesn't get synced up automatically - that requires explicit tooling. + Therefore state will be "reset" to the initial state when process boots up. This is not a big issue as the initial state is to + set a throttled run of `handle_throttle`. + + ### How to start timed interval triggers when your application boots up + + By design this will not run when your application starts. If there's a need to start the inbox timeout, + then create a child spec for the application Supervisor (typically in `application.ex`) + for a Task module, that runs how many instances of `throttle/1` as necessary. + Example implementation is: + + ``` + # application.ex + def start(_type, _args) do + ... + children = [ + ... + {true, + Supervisor.child_spec( + {Task, + fn -> + for x <- 1..10, do: MyModuleUsingThrottleAndTimed.throttle(some: "value", x: x) + end}, + id: MyModuleUsingThrottleAndTimedInit, + restart: :temporary + )} + ] + ... + ``` + + ## Example Usage + + You'll first need to create a module that will be used to throttle. + + defmodule MyTask do + use Buffy.ThrottleAndTimed, + throttle: :timer.minutes(2) + loop_timeout: :timer.minutes(2) + + def handle_throttle(args) do + # Do something with args + end + end + + Next, you can use the `throttle/1` function with the registered module. + + iex> MyTask.throttle(args) + :ok + + ## Options + + - `:registry_module` (`atom`) - Optional. A module that implements the `Registry` behaviour. If you are running in a distributed instance, you can set this value to `Horde.Registry`. Defaults to `Registry`. + + - `:registry_name` (`atom`) - Optional. The name of the registry to use. Defaults to the built in Buffy registry, but if you are running in a distributed instance you can set this value to a named `Horde.Registry` process. Defaults to `Buffy.Registry`. + + - `:restart` (`:permanent` | `:temporary` | `:transient`) - Optional. The restart strategy to use for the GenServer. Defaults to `:temporary`. + + - `:supervisor_module` (`atom`) - Optional. A module that implements the `DynamicSupervisor` behaviour. If you are running in a distributed instance, you can set this value to `Horde.DynamicSupervisor`. Defaults to `DynamicSupervisor`. + + - `:supervisor_name` (`atom`) - Optional. The name of the dynamic supervisor to use. Defaults to the built in Buffy dynamic supervisor, but if you are running in a distributed instance you can set this value to a named `Horde.DynamicSupervisor` process. Defaults to `Buffy.DynamicSupervisor`. + + - :throttle (`non_neg_integer`) - Required. The amount of time to wait before invoking the function. This value is in milliseconds. + + - `:loop_interval` (`atom`) - Required. The amount of time that this process will wait while inbox is empty until sending a `:timeout` message (handle via `handle_info`). Resets if message comes in. In milliseconds. + + ## Using with Horde + + If you are running Elixir in a cluster, you can utilize `Horde` to only run one of your throttled functions at a time. To do this, you'll need to set the `:registry_module` and `:supervisor_module` options to `Horde.Registry` and `Horde.DynamicSupervisor` respectively. You'll also need to set the `:registry_name` and `:supervisor_name` options to the name of the Horde registry and dynamic supervisor you want to use. + + defmodule MyThrottler do + use Buffy.ThrottleAndTimed, + registry_module: Horde.Registry, + registry_name: MyApp.HordeRegistry, + supervisor_module: Horde.DynamicSupervisor, + supervisor_name: MyApp.HordeDynamicSupervisor, + throttle: :timer.minutes(2), + loop_timeout: :timer.minutes(10) + + def handle_throttle(args) do + # Do something with args + end + end + + ## Telemetry + + These are the events that are called by the `Buffy.ThrottleAndTimed` module: + + - `[:buffy, :throttle, :throttle]` - Emitted when the `throttle/1` function is called. + - `[:buffy, :throttle, :timeout]` - Emitted when inbox timeout is triggered. + - `[:buffy, :throttle, :handle, :start]` - Emitted at the start of the `handle_throttle/1` function. + - `[:buffy, :throttle, :handle, :stop]` - Emitted at the end of the `handle_throttle/1` function. + - `[:buffy, :throttle, :handle, :exception]` - Emitted when an error is raised in the `handle_throttle/1` function. + + All of these events will have the following metadata: + + - `:args` - The arguments passed to the `throttle/1` function. + - `:key` - A hash of the passed arguments used to deduplicate the throttled function. + - `:module` - The module using `Buffy.ThrottleAndTimed`. + + With the additional metadata for `[:buffy, :throttle, :handle, :stop]`: + + - `:result` - The return value of the `handle_throttle/1` function. + + """ + require Logger + alias Buffy.ThrottleAndTimed + + @typedoc """ + A list of arbitrary arguments that are used for the `c:handle_throttle/1` + function. + """ + @type args :: term() + + @typedoc """ + A unique key for debouncing. This is used for GenServer uniqueness and is + generated from hashing all of the args. + """ + @type key :: term() + + @typedoc """ + Internal state that `Buffy.ThrottleAndTimed` keeps. + """ + @type state :: %{ + key: key(), + args: args(), + timer_ref: reference() + } + + @doc """ + A function to call the throttle. This will start + and wait the configured `throttle` time before calling the `c:handle_throttle/1` + function. + """ + @callback throttle(args :: args()) :: :ok | {:error, term()} + + @doc """ + The function called after the throttle has completed. This function will + receive the arguments passed to the `throttle/1` function. + """ + @callback handle_throttle(args()) :: any() + + defmacro __using__(opts) do + registry_module = Keyword.get(opts, :registry_module, Registry) + registry_name = Keyword.get(opts, :registry_name, Buffy.Registry) + restart = Keyword.get(opts, :restart, :temporary) + supervisor_module = Keyword.get(opts, :supervisor_module, DynamicSupervisor) + supervisor_name = Keyword.get(opts, :supervisor_name, Buffy.DynamicSupervisor) + throttle = Keyword.fetch!(opts, :throttle) + loop_interval = Keyword.fetch!(opts, :loop_interval) + + unless is_number(loop_interval) do + # credo:disable-for-next-line Credo.Check.Readability.NestedFunctionCalls + raise ArgumentError, "expected :loop_interval to be a number, received: #{inspect(loop_interval)}" + end + + quote do + @behaviour ThrottleAndTimed + + use GenServer, restart: unquote(restart) + + require Logger + + @doc false + @spec start_link({ThrottleAndTimed.key(), ThrottleAndTimed.args()}) :: :ignore | {:ok, pid} | {:error, term()} + def start_link({key, args}) do + name = key_to_name(key) + + with {:error, {:already_started, pid}} <- GenServer.start_link(__MODULE__, {key, args}, name: name) do + :ignore + end + end + + @doc """ + Starts debouncing the given `t:Buffy.ThrottleAndTimed.key()` for the + module set `throttle` time. Returns a tuple containing `:ok` + and the `t:pid()` of the throttle process. + + ## Examples + + iex> throttle(:my_function_arg) + {:ok, #PID<0.123.0>} + + """ + @impl ThrottleAndTimed + @spec throttle(ThrottleAndTimed.args()) :: :ok | {:error, term()} + def throttle(args) do + key = args_to_key(args) + + :telemetry.execute( + [:buffy, :throttle, :throttle], + %{count: 1}, + %{args: args, key: key, module: __MODULE__} + ) + + case unquote(supervisor_module).start_child(unquote(supervisor_name), {__MODULE__, {key, args}}) do + {:ok, pid} -> + :ok + + :ignore -> + # already started; Trigger throttle for that process + key |> key_to_name |> GenServer.cast(:throttle) + + result -> + result + end + end + + defp args_to_key(args), do: args |> :erlang.term_to_binary() |> :erlang.phash2() + + defp key_to_name(key) do + {:via, unquote(registry_module), {unquote(registry_name), {__MODULE__, key}}} + end + + @doc """ + The function that runs after throttle has completed. This function will + be called with the `t:Buffy.ThrottleAndTimed.key()` and can return anything. The + return value is ignored. If an error is raised, it will be logged and + ignored. + + ## Examples + + A simple example of implementing the `c:Buffy.ThrottleAndTimed.handle_throttle/1` + callback: + + def handle_throttle(args) do + # Do some work + end + + Handling errors in the `c:Buffy.ThrottleAndTimed.handle_throttle/1` callback: + + def handle_throttle(args) do + # Do some work + rescue + e -> + # Do something with a raised error + end + + """ + @impl ThrottleAndTimed + @spec handle_throttle(ThrottleAndTimed.args()) :: any() + def handle_throttle(_args) do + raise RuntimeError, + message: "You must implement the `handle_throttle/1` function in your module." + end + + defoverridable handle_throttle: 1 + + @doc false + @impl GenServer + @spec init({ThrottleAndTimed.key(), ThrottleAndTimed.args()}) :: {:ok, ThrottleAndTimed.state()} + def init({key, args}) do + {:ok, schedule_throttle_and_update_state(%{key: key, args: args, timer_ref: nil})} + end + + @doc """ + Function to invoke the throttle logic if process already exists. + It will only schedule a throttle if `timer_ref` is `nil`. + + """ + @impl GenServer + @spec handle_cast(:throttle, ThrottleAndTimed.state()) :: {:noreply, ThrottleAndTimed.state()} + def handle_cast(:throttle, %{timer_ref: nil} = state) do + {:noreply, schedule_throttle_and_update_state(state)} + end + + def handle_cast(:throttle, state) do + {:noreply, state} + end + + defp schedule_throttle_and_update_state(state) do + timer_ref = Process.send_after(self(), :execute_throttle_callback, unquote(throttle)) + %{state | timer_ref: timer_ref} + end + + @doc false + @impl GenServer + @spec handle_info(:timeout | :execute_throttle_callback, ThrottleAndTimed.state()) :: + {:noreply, ThrottleAndTimed.state(), {:continue, :do_work}} + def handle_info(:timeout, %{key: key, args: args} = state) do + :telemetry.execute( + [:buffy, :throttle, :timeout], + %{count: 1}, + %{args: args, key: key, module: __MODULE__} + ) + + {:noreply, state, {:continue, :do_work}} + end + + def handle_info(:execute_throttle_callback, state) do + {:noreply, state, {:continue, :do_work}} + end + + @doc false + @impl GenServer + @spec handle_continue(do_work :: atom(), ThrottleAndTimed.state()) :: + {:noreply, ThrottleAndTimed.state()} | {:noreply, ThrottleAndTimed.state(), timeout()} + def handle_continue(:do_work, %{key: key, args: args} = state) do + :telemetry.span( + [:buffy, :throttle, :handle], + %{args: args, key: key, module: __MODULE__}, + fn -> + result = handle_throttle(args) + {result, %{args: args, key: key, module: __MODULE__, result: result}} + end + ) + + new_state = %{state | timer_ref: nil} + {:noreply, new_state, unquote(loop_interval)} + rescue + e -> + Logger.error("Error in throttle: #{inspect(e)}") + new_state = %{state | timer_ref: nil} + {:noreply, new_state, unquote(loop_interval)} + end + end + end +end diff --git a/test/buffy/throttle_and_timed_test.exs b/test/buffy/throttle_and_timed_test.exs new file mode 100644 index 0000000..6f30d58 --- /dev/null +++ b/test/buffy/throttle_and_timed_test.exs @@ -0,0 +1,264 @@ +defmodule Buffy.ThrottleAndTimedTest do + use ExUnit.Case, async: true + use ExUnitProperties + + import ExUnit.CaptureLog + + defmodule MyDynamicSupervisor do + use DynamicSupervisor + + def start_link(init_arg) do + DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__) + end + + @impl DynamicSupervisor + def init(_init_arg) do + DynamicSupervisor.init(strategy: :one_for_one) + end + end + + defmodule MySlowThrottler do + use Buffy.ThrottleAndTimed, + throttle: 100, + supervisor_module: DynamicSupervisor, + supervisor_name: MyDynamicSupervisor, + loop_interval: 200 + + def handle_throttle(:raise) do + raise RuntimeError, message: ":raise" + end + + def handle_throttle(:error) do + :error + end + + def handle_throttle(%{test_pid: test_pid} = args) do + send(test_pid, {:ok, args, System.monotonic_time()}) + :ok + end + end + + defmodule MyZeroThrottler do + use Buffy.ThrottleAndTimed, + throttle: 0, + supervisor_module: DynamicSupervisor, + supervisor_name: MyDynamicSupervisor, + loop_interval: 100 + + def handle_throttle(:raise) do + raise RuntimeError, message: ":raise" + end + + def handle_throttle(:error) do + :error + end + + def handle_throttle(%{test_pid: test_pid} = args) do + send(test_pid, {:ok, args, System.monotonic_time()}) + :ok + end + end + + describe "loop_interval type check" do + test "should raise if loop_interval is not a number nor a nil" do + assert_raise ArgumentError, fn -> + defmodule MyWrongThrottler do + use Buffy.ThrottleAndTimed, + throttle: 100, + loop_interval: "300", + supervisor_module: DynamicSupervisor, + supervisor_name: MyDynamicSupervisor + + def handle_throttle(%{test_pid: test_pid} = args) do + send(test_pid, {:ok, args, System.monotonic_time()}) + :ok + end + end + end + end + end + + describe "handle_info(:timeout)" do + defmodule MyTimedThrottler do + use Buffy.ThrottleAndTimed, + throttle: 10, + loop_interval: 100, + supervisor_module: DynamicSupervisor, + supervisor_name: MyDynamicSupervisor + + def handle_throttle(%{test_pid: test_pid} = args) do + send(test_pid, {:ok, args, System.monotonic_time()}) + :ok + end + end + + defmodule MyTimedSlowThrottler do + use Buffy.ThrottleAndTimed, + throttle: 100, + loop_interval: 300, + supervisor_module: DynamicSupervisor, + supervisor_name: MyDynamicSupervisor + + def handle_throttle(%{test_pid: test_pid} = args) do + send(test_pid, {:ok, args, System.monotonic_time()}) + :ok + end + end + + setup do + start_supervised!({MyDynamicSupervisor, []}) + :ok + end + + test "should trigger if no message in inbox for loop_interval" do + prev = System.monotonic_time() + DynamicSupervisor.count_children(MyDynamicSupervisor) + test_pid = self() + MyTimedThrottler.throttle(%{test_pid: test_pid, prev: prev}) + + # Initial throttle is 10 msec so should receive within 20 msec + assert_receive {:ok, %{prev: ^prev}, now}, 200 + assert System.convert_time_unit(now - prev, :native, :millisecond) < 20 + + # Inbox timeout triggers at 100 msec so should receive "around" that time + assert_receive {:ok, %{prev: ^prev}, now2}, 200 + diff = System.convert_time_unit(now2 - now, :native, :millisecond) + assert :erlang.abs(diff - 100) < 10 + + # Confirm another inbox timeout triggered + assert_receive {:ok, %{prev: ^prev}, now3}, 200 + diff = System.convert_time_unit(now3 - now2, :native, :millisecond) + assert :erlang.abs(diff - 100) < 10 + end + + test "should throttle all incoming triggers when work is already scheduled" do + DynamicSupervisor.count_children(MyDynamicSupervisor) + test_pid = self() + # trigger throttle + MyTimedSlowThrottler.throttle(%{test_pid: test_pid}) + + # trigger more throttle + for _ <- 1..10 do + Task.async(fn -> + MyTimedSlowThrottler.throttle(%{test_pid: test_pid}) + end) + end + + # assert throttled work done + assert_receive {:ok, _, now}, 200 + + # refute any other work was done + refute_receive {:ok, _, _now}, 200 + + # check inbox timeout triggered + assert_receive {:ok, _, now2}, 400 + diff = System.convert_time_unit(now2 - now, :native, :millisecond) + assert :erlang.abs(diff - 300) < 10 + end + + test "should reset inbox timeout if throttle request comes in" do + test_pid = self() + # trigger initial throttle + MyTimedSlowThrottler.throttle(%{test_pid: test_pid}) + + # assert throttled work done + assert_receive {:ok, _, now}, 200 + + # now in inbox timeout waiting period and work scheduled via inbox timeout logic + # trigger a throttle + MyTimedSlowThrottler.throttle(%{test_pid: test_pid}) + + # assert the trigger happend within the throttle interval and not the inbox timeout loop interval + assert_receive {:ok, _, now2}, 400 + diff = System.convert_time_unit(now2 - now, :native, :millisecond) + assert :erlang.abs(diff - 100) < 10 + end + end + + describe "handle_throttle/1" do + setup do + start_supervised!({MyDynamicSupervisor, []}) + :ok + end + + test "calls handle_throttle/1" do + check all args <- StreamData.term() do + assert :ok = MyZeroThrottler.throttle(%{args: args, test_pid: self()}) + assert_receive {:ok, _, _} + end + end + + test "throttles handle_throttle/1" do + test_pid = self() + for _ <- 1..200, do: MySlowThrottler.throttle(%{test_pid: test_pid}) + assert_receive {:ok, _, _}, 200 + refute_receive {:ok, _, _}, 200 + end + end + + describe ":telemetry" do + setup do + :telemetry_test.attach_event_handlers(self(), [ + [:buffy, :throttle, :throttle], + [:buffy, :throttle, :timeout], + [:buffy, :throttle, :handle, :start], + [:buffy, :throttle, :handle, :stop], + [:buffy, :throttle, :handle, :exception] + ]) + + start_supervised!({MyDynamicSupervisor, []}) + + :ok + end + + test "emits [:buffy, :throttle, :throttle]" do + MyZeroThrottler.throttle(:foo) + + assert_receive {[:buffy, :throttle, :throttle], _ref, %{count: 1}, + %{ + args: :foo, + key: _, + module: MyZeroThrottler + }} + end + + test "emits [:buffy, :throttle, :timeout]" do + args = %{test_pid: self()} + MyZeroThrottler.throttle(args) + + assert_receive {[:buffy, :throttle, :timeout], _ref, %{count: 1}, + %{ + args: ^args, + key: _, + module: MyZeroThrottler + }}, + 150 + end + + test "emits [:buffy, :throttle, :handle, :start]" do + MyZeroThrottler.throttle(:starting) + + assert_receive {[:buffy, :throttle, :handle, :start], _ref, %{}, + %{ + args: :starting, + key: _, + module: MyZeroThrottler + }} + end + + test "emits [:buffy, :throttle, :handle, :exception]" do + MyZeroThrottler.throttle(:raise) + + assert capture_log(fn -> + assert_receive {[:buffy, :throttle, :handle, :exception], _ref, %{duration: _}, + %{ + args: :raise, + key: _, + kind: :error, + reason: %RuntimeError{message: ":raise"}, + module: MyZeroThrottler + }} + end) + end + end +end