diff --git a/lib/nebulex/adapters/multilevel.ex b/lib/nebulex/adapters/multilevel.ex index a85b366f..5bc66f2a 100644 --- a/lib/nebulex/adapters/multilevel.ex +++ b/lib/nebulex/adapters/multilevel.ex @@ -616,12 +616,18 @@ defmodule Nebulex.Adapters.Multilevel do end defp maybe_replicate({value, [level_meta | [_ | _] = levels]}, key, :inclusive) do - ttl = with_dynamic_cache(level_meta, :ttl, [key]) || :infinity - :ok = - Enum.each(levels, fn l_meta -> - _ = with_dynamic_cache(l_meta, :put, [key, value, [ttl: ttl]]) - end) + case with_dynamic_cache(level_meta, :ttl, [key]) do + nil -> + # the cache entry expired between the `get` and `ttl` calls + # don't replicate the entry + :ok + + ttl -> + Enum.each(levels, fn l_meta -> + _ = with_dynamic_cache(l_meta, :put, [key, value, [ttl: ttl]]) + end) + end value end diff --git a/test/nebulex/adapters/multilevel_inclusive_test.exs b/test/nebulex/adapters/multilevel_inclusive_test.exs index 2ce57930..6916eb5a 100644 --- a/test/nebulex/adapters/multilevel_inclusive_test.exs +++ b/test/nebulex/adapters/multilevel_inclusive_test.exs @@ -9,8 +9,10 @@ defmodule Nebulex.Adapters.MultilevelInclusiveTest do alias Nebulex.Adapters.Local.Generation alias Nebulex.Cache.Cluster + alias Nebulex.TestCache.DelayedReadAdapter alias Nebulex.TestCache.Multilevel alias Nebulex.TestCache.Multilevel.{L1, L2, L3} + alias Nebulex.TestCache.MultilevelWithDelay @gc_interval :timer.hours(1) @@ -155,6 +157,39 @@ defmodule Nebulex.Adapters.MultilevelInclusiveTest do end end + describe "delayed multilevel" do + setup_with_dynamic_cache(MultilevelWithDelay, :multilevel_inclusive_with_delay, + model: :inclusive, + levels: [ + {MultilevelWithDelay.L1, + name: :multilevel_inclusive_with_delay_l1, + gc_interval: @gc_interval, + backend: :shards, + partitions: 2}, + {MultilevelWithDelay.L2, + name: :multilevel_inclusive_with_delay_l2, + gc_interval: @gc_interval, + backend: :shards, + partitions: 2} + ] + ) + + test "does not replicate the data if the cache expires during replication" do + # reading from L2 will take 500ms + DelayedReadAdapter.put_read_delay(500) + + # since we call both `get` and `ttl` the total read time will be 1000ms + :ok = MultilevelWithDelay.put(:key, :data, ttl: 700, level: 2) + + # the key should expire between the `get` and `tl` calls, so the data + # should be returned but not replicated + assert MultilevelWithDelay.get(:key) == :data + assert MultilevelWithDelay.get(:key, level: 1) == nil + + assert MultilevelWithDelay.ttl(:key) == nil + end + end + describe "distributed levels" do test "return cluster nodes" do assert Cluster.get_nodes(:multilevel_inclusive_l2) == [node()] diff --git a/test/support/test_cache.ex b/test/support/test_cache.ex index d0761c80..e7148c66 100644 --- a/test/support/test_cache.ex +++ b/test/support/test_cache.ex @@ -113,6 +113,114 @@ defmodule Nebulex.TestCache do end end + defmodule DelayedReadAdapter do + @moduledoc false + + require Nebulex.Adapters.Local + + @behaviour Nebulex.Adapter + @behaviour Nebulex.Adapter.Entry + @behaviour Nebulex.Adapter.Queryable + + @fallback_adapter Nebulex.Adapters.Local + + @impl true + defmacro __before_compile__(opts) do + quote do + require unquote(@fallback_adapter) + + unquote(@fallback_adapter).__before_compile__(unquote(Macro.escape(opts))) + end + end + + @impl true + defdelegate init(opts), to: @fallback_adapter + + @impl true + def get(adapter_meta, key, opts) do + delay() + @fallback_adapter.get(adapter_meta, key, opts) + end + + @impl true + def get_all(adapter_meta, list, opts) do + delay() + @fallback_adapter.get_all(adapter_meta, list, opts) + end + + @impl true + defdelegate put(adapter_meta, key, value, ttl, on_write, opts), to: @fallback_adapter + + @impl true + defdelegate put_all(adapter_meta, entries, ttl, on_write, opts), to: @fallback_adapter + + @impl true + defdelegate delete(adapter_meta, key, opts), to: @fallback_adapter + + @impl true + defdelegate take(adapter_meta, key, opts), to: @fallback_adapter + + @impl true + def has_key?(adapter_meta, key) do + delay() + @fallback_adapter.has_key?(adapter_meta, key) + end + + @impl true + def ttl(adapter_meta, key) do + delay() + @fallback_adapter.ttl(adapter_meta, key) + end + + @impl true + defdelegate expire(adapter_meta, key, ttl), to: @fallback_adapter + + @impl true + defdelegate touch(adapter_meta, key), to: @fallback_adapter + + @impl true + defdelegate update_counter(adapter_meta, key, amount, ttl, default, opts), to: @fallback_adapter + + @impl true + defdelegate execute(adapter_meta, command, args, opts), to: @fallback_adapter + + @impl true + defdelegate stream(adapter_meta, query, opts), to: @fallback_adapter + + @read_delay_key {__MODULE__, :read_delay} + + def put_read_delay(delay) when is_integer(delay) do + Process.put(@read_delay_key, delay) + end + + defp delay do + delay = Process.get(@read_delay_key, 1000) + + Process.sleep(delay) + end + end + + defmodule MultilevelWithDelay do + @moduledoc false + use Nebulex.Cache, + otp_app: :nebulex, + adapter: Nebulex.Adapters.Multilevel + + defmodule L1 do + @moduledoc false + use Nebulex.Cache, + otp_app: :nebulex, + adapter: Nebulex.Adapters.Local + end + + defmodule L2 do + @moduledoc false + use Nebulex.Cache, + otp_app: :nebulex, + adapter: Nebulex.TestCache.DelayedReadAdapter + end + end + ## Mocks defmodule AdapterMock do