Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: SIGNAL-5504: add optional timer based work #28

Merged
merged 58 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from 49 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
37c37cd
ADD: throttle and timed new module - just adjusted the moduledoc
seungjinstord Dec 12, 2023
9330942
UPDATE: loop_interval added
seungjinstord Dec 12, 2023
6594fa4
ADD: test setup for the new module
seungjinstord Dec 12, 2023
f0be036
ADD: helper function to return the tuple with maybe the timeout value
seungjinstord Dec 12, 2023
659389d
ADD: text for when Horde is used
seungjinstord Dec 12, 2023
344d9f4
UPDATE: populate the initial lifecycle flow
seungjinstord Dec 12, 2023
cc2ab67
UPDATE: throttle function to run the GenServer.cast if child already …
seungjinstord Dec 12, 2023
39bf772
UPDATE: use functions for name / key generation
seungjinstord Dec 12, 2023
6270b91
UPDATE: unquote loop_interval
seungjinstord Dec 12, 2023
5e2ddb0
UPDATE: pull out non-macro related function out
seungjinstord Dec 12, 2023
5521346
ADD: first test
seungjinstord Dec 12, 2023
fc235e9
REMOVE: IO.inspect
seungjinstord Dec 12, 2023
b30349d
UPDATE: fix handle_continue pattern matching
seungjinstord Dec 12, 2023
724892f
UPDATE: spec
seungjinstord Dec 12, 2023
e29499d
UPDATE: expand test and finalize the inbox timeout
seungjinstord Dec 12, 2023
483d0bd
UPDATE: formatting
seungjinstord Dec 12, 2023
53d135d
ADD: test for throttlingn behavior intact
seungjinstord Dec 13, 2023
e98e2fe
ADD: logic to perform throttle if the throttle message comes in AFTER…
seungjinstord Dec 13, 2023
4f330f1
UPDATE: rewrite the tests so that it uses assert_receive
seungjinstord Dec 13, 2023
9fb43c3
REMOVE: files unused
seungjinstord Dec 13, 2023
bd55b7d
ADD: tests for maybe_add_inbox_timeout_and_update_work_status/1
seungjinstord Dec 13, 2023
9fc7394
REMOVE: unused functions
seungjinstord Dec 13, 2023
6aef813
Update lib/buffy/throttle_and_timed.ex
seungjinstord Dec 13, 2023
b2ce5ed
UPDATE: sentence clarification and bullets to start with lowercase
seungjinstord Dec 13, 2023
5234894
ADD: test for confirming no additional trigger without loop_interval set
seungjinstord Dec 13, 2023
e2c7b32
ADD: moduledoc on how to run it initially
seungjinstord Dec 13, 2023
7fd21d8
UPDATE: moduledoc
seungjinstord Dec 13, 2023
f8a57ad
UPDATE: moduledoc
seungjinstord Dec 13, 2023
13f018a
UPDATE: moduledoc
seungjinstord Dec 13, 2023
8020ffc
UPDATE: moduledoc
seungjinstord Dec 13, 2023
9616343
UDPATE: try parallel flag
seungjinstord Dec 13, 2023
983968f
REMOVE: partition flag
seungjinstord Dec 13, 2023
a384550
REMOVE: unnecessary comment
seungjinstord Dec 14, 2023
df117cc
Update lib/buffy/throttle_and_timed.ex
seungjinstord Dec 14, 2023
392ab69
Update lib/buffy/throttle_and_timed.ex
seungjinstord Dec 14, 2023
8253fd1
Update lib/buffy/throttle_and_timed.ex
seungjinstord Dec 14, 2023
4a6a9f2
Update lib/buffy/throttle_and_timed.ex
seungjinstord Dec 14, 2023
1990706
Update lib/buffy/throttle_and_timed.ex
seungjinstord Dec 14, 2023
ab547c4
Update lib/buffy/throttle_and_timed.ex
seungjinstord Dec 14, 2023
d3e98d8
Update lib/buffy/throttle_and_timed.ex
seungjinstord Dec 14, 2023
57d9f26
Update lib/buffy/throttle_and_timed.ex
seungjinstord Dec 14, 2023
ff1de5e
UPDATE: use existing api of Process.send_after and GenServer timeout …
seungjinstord Dec 14, 2023
32f6942
UPDATE: moduledoc
seungjinstord Dec 14, 2023
3ae1594
UPDATE: moduledoc
seungjinstord Dec 14, 2023
be65361
ADD: telemetry for when timeout is triggered
seungjinstord Dec 14, 2023
4297705
UPDATE: moduledoc test
seungjinstord Dec 14, 2023
24d7708
UPDATE: moduledoc
seungjinstord Dec 14, 2023
5569ab7
UPDATE: moduledoc
seungjinstord Dec 14, 2023
f8a87a2
UPDATE: moduledoc
seungjinstord Dec 14, 2023
e6425fb
Update lib/buffy/throttle_and_timed.ex
seungjinstord Dec 15, 2023
16be37c
UPDATE: move up the loop_interval type check to right after options p…
seungjinstord Dec 15, 2023
c3faca0
Update lib/buffy/throttle_and_timed.ex
seungjinstord Dec 15, 2023
5c9a9e5
Update lib/buffy/throttle_and_timed.ex
seungjinstord Dec 15, 2023
d0b3ca1
Update lib/buffy/throttle_and_timed.ex
seungjinstord Dec 15, 2023
dc1484e
UPDATE: alias usage
seungjinstord Dec 15, 2023
062337b
ADD: test for telemetry timeout
seungjinstord Dec 15, 2023
adbaea0
REMOVE: unused variable
seungjinstord Dec 15, 2023
7cc07d5
Merge branch 'main' into SIGNAL-5504-add-optional-timer-based-work
seungjinstord Dec 15, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ jobs:
run: mix compile --warnings-as-errors

- name: Test
run: mix coveralls.github
run: mix coveralls.github --parallel
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to add this due to related error in previous runs.

env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

Expand Down
381 changes: 381 additions & 0 deletions lib/buffy/throttle_and_timed.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,381 @@
# credo:disable-for-this-file Credo.Check.Refactor.LongQuoteBlocks
defmodule Buffy.ThrottleAndTimed do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we could come up with a slightly better name for this to describe the use case but I am struggling to land on one. Maybe ThrottleWithTimeout, or ThrottleWithLoop? I think we can keep it as is for now and iterate on it later though unless we can come up with a better option without spending too much time on it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ThrottleWithLoopInterval?

@moduledoc """
This is a variation on the `Buffy.Throttle` behavior.

It keeps the following functionality:
- wait for a specified amount of time before
invoking the work function. If the function is called again before the time has
elapsed, it's a no-op.

Key difference between `Buffy.Throttle` and `Buffy.ThrottleAndTimed`:
- it will not be terminated once the timer is done, but kept alive
- internally, the existing timer behavior is done via state rather than handling `{:error, {:already_started, pid}}` output of `GenServer.start_link`.
- See note on Horde about state.
- it rquires `:loop_interval` field value (set by config) to trigger work repeatedly based on a empty inbox timeout interval,
seungjinstord marked this conversation as resolved.
Show resolved Hide resolved
that is based on [GenServer's timeout feature](https://hexdocs.pm/elixir/1.12/GenServer.html#module-timeouts).
seungjinstord marked this conversation as resolved.
Show resolved Hide resolved

Main reason for these changes is sometimes there's a need to fall back to a time-interval triggered work, when there aren't any triggers to
start the work. Requirement of this means the process should exist and not get terminated immediately after a successfully throttled work execution.

### In other words, we keep the throttle mechanism:

Once the timer has expired, the function will be called,
and any subsequent calls will start a new timer.

```text
call call call call call
| call | call | call | call |
| | | | | | | | |
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ Timer 1 │ │ Timer 2 │ │ Timer 3 │ │ Timer 4 │
└─────────| └─────────┘ └─────────┘ └─────────┘
| | | |
| | | Forth function invocation
| | Third function invocation
| Second function invocation
First function invocation
```

### With the optionally enabled trigger, ending up in this lifecycle:

```mermaid
graph TB
A[Start Buffy] -->|start_link| B(Init Buffy)
B --> |initial handle_continue| W(Do throttled work)
S(Messages sent to Buffy) --> |message to trigger work| D{Can Throttle?}
D --> |YES| W
D --> |NO| C(Ignore message as throttle already scheduled)
S --> |empty inbox timeout interval| P(Do immediate work)
W --> |set message inbox timeout| S
P --> |set message inbox timeout| S
```

### Note on Horde based usage
seungjinstord marked this conversation as resolved.
Show resolved Hide resolved

Under Horde, the state unfortunately doesn't get synced up automatically - that requires explicit tooling.
Therefore state will be "reset" to the initial state when process boots up. This is not a big issue as the initial state is to
set a throttled run of `handle_throttle`.

### How to start timed interval triggers when your application boots up

By design this will not run when your application starts. If there's a need to start the inbox timeout,
then create a child spec for the application Supervisor (typically in `application.ex`)
for a Task module, that runs how many instances of `throttle/1` as necessary.
Example implementation is:

```
# application.ex
def start(_type, _args) do
...
children = [
...
{true,
Supervisor.child_spec(
{Task,
fn ->
for x <- 1..10, do: MyModuleUsingThrottleAndTimed.throttle(some: "value", x: x)
end},
id: MyModuleUsingThrottleAndTimedInit,
restart: :temporary
)}
]
...
```
seungjinstord marked this conversation as resolved.
Show resolved Hide resolved

## Example Usage

You'll first need to create a module that will be used to throttle.

defmodule MyTask do
use Buffy.ThrottleAndTimed,
throttle: :timer.minutes(2)
loop_timeout: :timer.minutes(2)

def handle_throttle(args) do
# Do something with args
end
end

Next, you can use the `throttle/1` function with the registered module.

iex> MyTask.throttle(args)
:ok

## Options

- `:registry_module` (`atom`) - Optional. A module that implements the `Registry` behaviour. If you are running in a distributed instance, you can set this value to `Horde.Registry`. Defaults to `Registry`.

- `:registry_name` (`atom`) - Optional. The name of the registry to use. Defaults to the built in Buffy registry, but if you are running in a distributed instance you can set this value to a named `Horde.Registry` process. Defaults to `Buffy.Registry`.

- `:restart` (`:permanent` | `:temporary` | `:transient`) - Optional. The restart strategy to use for the GenServer. Defaults to `:temporary`.

- `:supervisor_module` (`atom`) - Optional. A module that implements the `DynamicSupervisor` behaviour. If you are running in a distributed instance, you can set this value to `Horde.DynamicSupervisor`. Defaults to `DynamicSupervisor`.

- `:supervisor_name` (`atom`) - Optional. The name of the dynamic supervisor to use. Defaults to the built in Buffy dynamic supervisor, but if you are running in a distributed instance you can set this value to a named `Horde.DynamicSupervisor` process. Defaults to `Buffy.DynamicSupervisor`.

- :throttle (`non_neg_integer`) - Required. The amount of time to wait before invoking the function. This value is in milliseconds.

- `:loop_interval` (`atom`) - Required. The amount of time that this process will wait while inbox is empty until sending a `:timeout` message (handle via `handle_info`). Resets if message comes in. In milliseconds.

## Using with Horde

If you are running Elixir in a cluster, you can utilize `Horde` to only run one of your throttled functions at a time. To do this, you'll need to set the `:registry_module` and `:supervisor_module` options to `Horde.Registry` and `Horde.DynamicSupervisor` respectively. You'll also need to set the `:registry_name` and `:supervisor_name` options to the name of the Horde registry and dynamic supervisor you want to use.

defmodule MyThrottler do
use Buffy.ThrottleAndTimed,
registry_module: Horde.Registry,
registry_name: MyApp.HordeRegistry,
supervisor_module: Horde.DynamicSupervisor,
supervisor_name: MyApp.HordeDynamicSupervisor,
throttle: :timer.minutes(2),
loop_timeout: :timer.minutes(10)

def handle_throttle(args) do
# Do something with args
end
end

## Telemetry

These are the events that are called by the `Buffy.ThrottleAndTimed` module:

- `[:buffy, :throttle, :throttle]` - Emitted when the `throttle/1` function is called.
- `[:buffy, :throttle, :timeout]` - Emitted when inbox timeout is triggered.
- `[:buffy, :throttle, :handle, :start]` - Emitted at the start of the `handle_throttle/1` function.
- `[:buffy, :throttle, :handle, :stop]` - Emitted at the end of the `handle_throttle/1` function.
- `[:buffy, :throttle, :handle, :exception]` - Emitted when an error is raised in the `handle_throttle/1` function.
seungjinstord marked this conversation as resolved.
Show resolved Hide resolved

All of these events will have the following metadata:

- `:args` - The arguments passed to the `throttle/1` function.
- `:key` - A hash of the passed arguments used to deduplicate the throttled function.
- `:module` - The module using `Buffy.ThrottleAndTimed`.

With the additional metadata for `[:buffy, :throttle, :handle, :stop]`:

- `:result` - The return value of the `handle_throttle/1` function.

"""
require Logger
alias Buffy.ThrottleAndTimed
seungjinstord marked this conversation as resolved.
Show resolved Hide resolved

@typedoc """
A list of arbitrary arguments that are used for the `c:handle_throttle/1`
function.
"""
@type args :: term()

@typedoc """
A unique key for debouncing. This is used for GenServer uniqueness and is
generated from hashing all of the args.
"""
@type key :: term()

@typedoc """
Internal state that `Buffy.ThrottleAndTimed` keeps.
"""
@type state :: %{
key: key(),
args: args(),
timer_ref: reference()
}

@doc """
A function to call the throttle. This will start
and wait the configured `throttle` time before calling the `c:handle_throttle/1`
function.
"""
@callback throttle(args :: args()) :: :ok | {:error, term()}

@doc """
The function called after the throttle has completed. This function will
receive the arguments passed to the `throttle/1` function.
"""
@callback handle_throttle(args()) :: any()

defmacro __using__(opts) do
registry_module = Keyword.get(opts, :registry_module, Registry)
registry_name = Keyword.get(opts, :registry_name, Buffy.Registry)
restart = Keyword.get(opts, :restart, :temporary)
supervisor_module = Keyword.get(opts, :supervisor_module, DynamicSupervisor)
supervisor_name = Keyword.get(opts, :supervisor_name, Buffy.DynamicSupervisor)
throttle = Keyword.fetch!(opts, :throttle)
kinson marked this conversation as resolved.
Show resolved Hide resolved
loop_interval = Keyword.fetch!(opts, :loop_interval)

quote do
@behaviour Buffy.ThrottleAndTimed

use GenServer, restart: unquote(restart)

require Logger

@doc false
@spec start_link({ThrottleAndTimed.key(), ThrottleAndTimed.args()}) :: :ignore | {:ok, pid} | {:error, term()}
def start_link({key, args}) do
name = key_to_name(key)

with {:error, {:already_started, pid}} <- GenServer.start_link(__MODULE__, {key, args}, name: name) do
:ignore
end
end
Comment on lines +219 to +226
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should investigate a way for us to use Buffy.Throttle here and override the functions we need with this implementation's specifics to make iterating on the underlying Throttle features easier and reduce having to implement in multiple places 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure that adding that layer of abstraction would help here. I'd be concerned with the readability of this module due to the fact that it is already somewhat complex and implements a macro.

Furthermore, these modules may share a similar name but they are fundamentally different in that Buffy.Throttle executes the task and stops the server immediately whereas Buffy.ThrottleAndTimed is a long-running process.

Maybe I don't have the same vision for how we could layer these two together though, so if you have something in mind I could definitely be persuaded!


@doc """
Starts debouncing the given `t:Buffy.ThrottleAndTimed.key()` for the
module set `throttle` time. Returns a tuple containing `:ok`
and the `t:pid()` of the throttle process.

## Examples

iex> throttle(:my_function_arg)
{:ok, #PID<0.123.0>}

"""
@impl Buffy.ThrottleAndTimed
@spec throttle(Buffy.ThrottleAndTimed.args()) :: :ok | {:error, term()}
def throttle(args) do
key = args_to_key(args)

:telemetry.execute(
[:buffy, :throttle, :throttle],
%{count: 1},
%{args: args, key: key, module: __MODULE__}
)

case unquote(supervisor_module).start_child(unquote(supervisor_name), {__MODULE__, {key, args}}) do
{:ok, pid} ->
:ok

:ignore ->
# already started; Trigger throttle for that process
key |> key_to_name |> GenServer.cast(:throttle)

result ->
result
end
end

defp args_to_key(args), do: args |> :erlang.term_to_binary() |> :erlang.phash2()

defp key_to_name(key) do
{:via, unquote(registry_module), {unquote(registry_name), {__MODULE__, key}}}
end

@doc """
The function that runs after throttle has completed. This function will
be called with the `t:Buffy.ThrottleAndTimed.key()` and can return anything. The
return value is ignored. If an error is raised, it will be logged and
ignored.

## Examples

A simple example of implementing the `c:Buffy.ThrottleAndTimed.handle_throttle/1`
callback:

def handle_throttle(args) do
# Do some work
end

Handling errors in the `c:Buffy.ThrottleAndTimed.handle_throttle/1` callback:

def handle_throttle(args) do
# Do some work
rescue
e ->
# Do something with a raised error
end

"""
@impl Buffy.ThrottleAndTimed
@spec handle_throttle(Buffy.ThrottleAndTimed.args()) :: any()
def handle_throttle(_args) do
raise RuntimeError,
message: "You must implement the `handle_throttle/1` function in your module."
end

defoverridable handle_throttle: 1

@doc false
@impl GenServer
@spec init({ThrottleAndTimed.key(), ThrottleAndTimed.args()}) :: {:ok, Buffy.ThrottleAndTimed.state()}
def init({key, args}) do
{:ok, schedule_throttle_and_update_state(%{key: key, args: args, timer_ref: nil})}
end

@doc """
Function to invoke the throttle logic if process already exists.
It will only schedule a throttle if `timer_ref` is `nil`.

"""
@impl GenServer
@spec handle_cast(:throttle, Buffy.ThrottleAndTimed.state()) :: {:noreply, Buffy.ThrottleAndTimed.state()}
def handle_cast(:throttle, %{timer_ref: nil} = state) do
{:noreply, schedule_throttle_and_update_state(state)}
end

def handle_cast(:throttle, state) do
{:noreply, state}
end

defp schedule_throttle_and_update_state(state) do
timer_ref = Process.send_after(self(), :execute_throttle_callback, unquote(throttle))
%{state | timer_ref: timer_ref}
end

@doc false
@impl GenServer
@spec handle_info(:timeout | :execute_throttle_callback, Buffy.ThrottleAndTimed.state()) ::
{:noreply, Buffy.ThrottleAndTimed.state(), {:continue, :do_work}}
def handle_info(:timeout, %{key: key, args: args} = state) do
:telemetry.execute(
[:buffy, :throttle, :timeout],
%{count: 1},
%{args: args, key: key, module: __MODULE__}
)

{:noreply, state, {:continue, :do_work}}
end

def handle_info(:execute_throttle_callback, state) do
{:noreply, state, {:continue, :do_work}}
end

@doc false
@impl GenServer
@spec handle_continue(do_work :: atom(), Buffy.ThrottleAndTimed.state()) ::
{:noreply, Buffy.ThrottleAndTimed.state()} | {:noreply, Buffy.ThrottleAndTimed.state(), timeout()}
def handle_continue(:do_work, %{key: key, args: args} = state) do
:telemetry.span(
[:buffy, :throttle, :handle],
%{args: args, key: key, module: __MODULE__},
fn ->
result = handle_throttle(args)
{result, %{args: args, key: key, module: __MODULE__, result: result}}
end
)

new_state = %{state | timer_ref: nil}
maybe_add_inbox_timeout({:noreply, new_state})
rescue
e ->
Logger.error("Error in throttle: #{inspect(e)}")
new_state = %{state | timer_ref: nil}
maybe_add_inbox_timeout({:noreply, new_state})
end

defp maybe_add_inbox_timeout({return_signal, state} = return_tuple) do
loop_interval = unquote(loop_interval)

if is_number(loop_interval) do
{return_signal, state, loop_interval}
else
Logger.error(
"Error parsing :loop_interval - value is not a number, will ignore. Got: #{inspect(loop_interval)}"
)

return_tuple
end
seungjinstord marked this conversation as resolved.
Show resolved Hide resolved
end
end
end
end
Loading