Skip to content

Commit

Permalink
Merge pull request #23 from bbalser/fault_tolerance
Browse files Browse the repository at this point in the history
Manager will now wait 2 seconds before dying
  • Loading branch information
Brian Balser authored Jul 1, 2019
2 parents 69e043e + 806586d commit c6a2a9d
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 6 deletions.
22 changes: 18 additions & 4 deletions lib/elsa/group/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ defmodule Elsa.Group.Manager do
end

def init(opts) do
Process.flag(:trap_exit, true)

state = %State{
brokers: Keyword.fetch!(opts, :brokers),
group: Keyword.fetch!(opts, :group),
Expand All @@ -83,12 +85,12 @@ defmodule Elsa.Group.Manager do
workers: %{}
}

{:ok, client_pid} = Elsa.Util.start_client(state.brokers, state.name)

{:ok, %{state | client_pid: client_pid}, {:continue, :start_coordinator}}
{:ok, state, {:continue, :start_coordinator}}
end

def handle_continue(:start_coordinator, state) do
{:ok, client_pid} = Elsa.Util.start_client(state.brokers, state.name)

{:ok, group_coordinator_pid} =
:brod_group_coordinator.start_link(state.name, state.group, state.topics, state.config, __MODULE__, self())

Expand All @@ -98,7 +100,10 @@ defmodule Elsa.Group.Manager do

Registry.put_meta(registry(state.name), :group_coordinator, group_coordinator_pid)

{:noreply, %{state | group_coordinator_pid: group_coordinator_pid}}
{:noreply, %{state | client_pid: client_pid, group_coordinator_pid: group_coordinator_pid}}
catch
:exit, reason ->
wait_and_stop(reason, state)
end

def handle_cast({:process_assignments, generation_id, assignments}, state) do
Expand Down Expand Up @@ -135,4 +140,13 @@ defmodule Elsa.Group.Manager do

{:noreply, %{state | workers: new_workers}}
end

def handle_info({:EXIT, _from, reason}, state) do
wait_and_stop(reason, state)
end

defp wait_and_stop(reason, state) do
Process.sleep(2_000)
{:stop, reason, state}
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ defmodule Elsa.MixProject do
{:brod, "~> 3.7"},
{:divo, "~> 1.1", only: [:dev, :test, :integration], override: true},
{:divo_kafka, "~> 0.1.0", only: [:dev, :test, :integration]},
{:placebo, "~> 1.2", only: [:dev, :test]},
{:placebo, "~> 1.2.2", only: [:dev, :test]},
{:checkov, "~> 0.4.0", only: [:test, :integration]},
{:ex_doc, "~> 0.20.2", only: [:dev]},
{:dialyxir, "~> 0.5", only: [:dev], runtime: false}
Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"meck": {:hex, :meck, "0.8.13", "ffedb39f99b0b99703b8601c6f17c7f76313ee12de6b646e671e3188401f7866", [:rebar3], [], "hexpm"},
"nimble_parsec": {:hex, :nimble_parsec, "0.5.0", "90e2eca3d0266e5c53f8fbe0079694740b9c91b6747f2b7e3c5d21966bba8300", [:mix], [], "hexpm"},
"patiently": {:hex, :patiently, "0.2.0", "67eb139591e10c4b363ae0198e832552f191c58894731efd3bf124ec4722267a", [:mix], [], "hexpm"},
"placebo": {:hex, :placebo, "1.2.1", "303ebb597279fd77b1f6d38ec377be4932f73246351f181d6128d0a58d3f74e9", [:mix], [{:meck, "~> 0.8.9", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm"},
"placebo": {:hex, :placebo, "1.2.2", "a3d47906b01844bfd04ab0351a605620619fdb8f011225e406696f96a88ff380", [:mix], [{:meck, "~> 0.8.13", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm"},
"snappyer": {:hex, :snappyer, "1.2.4", "6d739c534cd2339633127a2b40279be71f149e5842c5363a4d88e66efb7c1fec", [:make, :rebar, :rebar3], [], "hexpm"},
"supervisor3": {:hex, :supervisor3, "1.1.8", "5cf95c95342b589ec8d74689eea0646c0a3eb92820241e0c2d0ca4c104df92bc", [:make, :rebar, :rebar3], [], "hexpm"},
}

0 comments on commit c6a2a9d

Please sign in to comment.