Skip to content

Commit

Permalink
feat!: add otp_app option to consumer and producer compile-time optio…
Browse files Browse the repository at this point in the history
…ns to improve configurating the modules
  • Loading branch information
yordis committed Dec 27, 2023
1 parent 460e678 commit dafb878
Show file tree
Hide file tree
Showing 16 changed files with 57 additions and 2 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ You'll first setup a module for your consumer logic like so:
```elixir
defmodule MyConsumer do
use Kafee.Consumer,
otp_app: :my_app,
adapter: Application.compile_env(:my_app, :kafee_consumer_adapter, nil),
consumer_group_id: "my-app",
topic: "my-topic"
Expand Down Expand Up @@ -91,6 +92,7 @@ So you want to send messages to Kafka eh? Well, first you will need to create a
```elixir
defmodule MyProducer do
use Kafee.Producer,
otp_app: :my_app,
adapter: Application.compile_env(:my_app, :kafee_producer_adapter, nil),
encoder: Kafee.JasonEncoderDecoder,
topic: "my-topic",
Expand Down
21 changes: 20 additions & 1 deletion lib/kafee/consumer.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
defmodule Kafee.Consumer do
@options_schema NimbleOptions.new!(
otp_app: [
doc: """
The name of the OTP application to read configuration from.
If the option is set, the configuration will be read from the application environment under the
given name. For example, if the OTP application is `:my_app`, the configuration will be read from
`Application.get_env(:my_app, Myapp.KafeeConsumer)`.
""",
required: true,
type: :atom
],
adapter: [
default: nil,
doc: """
Expand Down Expand Up @@ -93,6 +104,7 @@ defmodule Kafee.Consumer do
defmodule MyConsumer do
use Kafee.Consumer,
otp_app: :my_app,
adapter: Kafee.Consumer.BroadwayAdapter,
host: "localhost",
port: 9092
Expand All @@ -116,6 +128,7 @@ defmodule Kafee.Consumer do
defmodule MyConsumer do
use Kafee.Consumer,
otp_app: :my_app,
adapter: Application.compile_env(:my_app, :kafee_consumer_adapter, nil),
host: "localhost",
port: 9092,
Expand Down Expand Up @@ -197,7 +210,13 @@ defmodule Kafee.Consumer do
@doc false
@spec child_spec(Kafee.Consumer.options()) :: Supervisor.child_spec()
def child_spec(args) do
full_opts = Keyword.merge(unquote(Macro.escape(opts)), args)
opts = unquote(Macro.escape(opts))
env_opts = Application.get_env(opts[:otp_app], unquote(__MODULE__), [])

full_opts =
opts
|> Keyword.merge(env_opts)
|> Keyword.merge(args)

%{
id: __MODULE__,
Expand Down
22 changes: 21 additions & 1 deletion lib/kafee/producer.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
defmodule Kafee.Producer do
@options_schema NimbleOptions.new!(
otp_app: [
doc: """
The name of the OTP application to read configuration from.
If the option is set, the configuration will be read from the application environment under the
given name. For example, if the OTP application is `:my_app`, the configuration will be read from
`Application.get_env(:my_app, Myapp.KafeeProducer)`.
""",
required: true,
type: :atom
],
adapter: [
default: nil,
doc: """
Expand Down Expand Up @@ -107,6 +118,7 @@ defmodule Kafee.Producer do
defmodule MyProducer do
use Kafee.Producer,
otp_app: :my_app,
topic: "my-topic",
partition_fun: :random
end
Expand All @@ -125,6 +137,7 @@ defmodule Kafee.Producer do
defmodule MyProducer do
use Kafee.Producer,
otp_app: :my_app,
topic: "my-topic",
partition_fun: :random
Expand All @@ -151,6 +164,7 @@ defmodule Kafee.Producer do
defmodule MyProducer do
use Kafee.Producer,
otp_app: :my_app,
encoder: Kafee.JasonEncoderDecoder,
topic: "order-created",
partition_fun: :random
Expand Down Expand Up @@ -213,7 +227,13 @@ defmodule Kafee.Producer do
@doc false
@spec child_spec(Kafee.Producer.options()) :: Supervisor.child_spec()
def child_spec(args) do
full_opts = Keyword.merge(unquote(Macro.escape(opts)), args)
opts = unquote(Macro.escape(opts))
env_opts = Application.get_env(opts[:otp_app], unquote(__MODULE__), [])

full_opts =
opts
|> Keyword.merge(env_opts)
|> Keyword.merge(args)

%{
id: __MODULE__,
Expand Down
1 change: 1 addition & 0 deletions lib/kafee/producer/test_adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ defmodule Kafee.Producer.TestAdapter do
defmodule MyProducer do
use Kafee.Producer,
otp_app: :my_app,
adapter: Kafee.Producer.TestAdapter,
encoder: Kafee.JasonEncoderDecoder
Expand Down
1 change: 1 addition & 0 deletions test/kafee/consumer/broadway_adapter_integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule Kafee.Consumer.BroadwayAdapterIntegrationTest do

defmodule MyConsumer do
use Kafee.Consumer,
otp_app: :kafee,
adapter: {Kafee.Consumer.BroadwayAdapter, []}

def handle_message(%Kafee.Consumer.Message{} = message) do
Expand Down
1 change: 1 addition & 0 deletions test/kafee/consumer/brod_adapter_integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule Kafee.Consumer.BrodAdapterIntegrationTest do

defmodule MyConsumer do
use Kafee.Consumer,
otp_app: :kafee,
adapter: Kafee.Consumer.BrodAdapter

def handle_message(%Kafee.Consumer.Message{} = message) do
Expand Down
2 changes: 2 additions & 0 deletions test/kafee/consumer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ defmodule Kafee.ConsumerTest do
test "starts the adapter process tree", %{topic: topic} do
assert {:ok, pid} =
Kafee.Consumer.start_link(MyConsumer,
otp_app: :kafee,
adapter: Kafee.Consumer.BroadwayAdapter,
host: Kafee.KafkaApi.host(),
port: Kafee.KafkaApi.port(),
Expand All @@ -25,6 +26,7 @@ defmodule Kafee.ConsumerTest do
test "starts nothing if no adapter is set", %{topic: topic} do
assert :ignore =
Kafee.Consumer.start_link(MyConsumer,
otp_app: :kafee,
adapter: nil,
host: Kafee.KafkaApi.host(),
port: Kafee.KafkaApi.port(),
Expand Down
1 change: 1 addition & 0 deletions test/kafee/producer/async_adapter_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Kafee.Producer.AsyncAdapterTest do

defmodule MyProducer do
use Kafee.Producer,
otp_app: :kafee,
adapter: Kafee.Producer.AsyncAdapter,
encoder: Kafee.JasonEncoderDecoder,
partition_fun: :random
Expand Down
1 change: 1 addition & 0 deletions test/kafee/producer/sync_adapter_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Kafee.Producer.SyncAdapterTest do

defmodule MyProducer do
use Kafee.Producer,
otp_app: :kafee,
adapter: Kafee.Producer.SyncAdapter,
encoder: Kafee.JasonEncoderDecoder,
partition_fun: :random
Expand Down
1 change: 1 addition & 0 deletions test/kafee/producer/test_adapter_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule Kafee.Producer.TestAdapterTest do

defmodule MyProducer do
use Kafee.Producer,
otp_app: :kafee,
adapter: Kafee.Producer.TestAdapter,
encoder: Kafee.JasonEncoderDecoder,
partition_fun: :random
Expand Down
1 change: 1 addition & 0 deletions test/kafee/producer_integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Kafee.ProducerIntegrationTest do

defmodule MyProducer do
use Kafee.Producer,
otp_app: :kafee,
adapter: Kafee.Producer.AsyncAdapter,
partition_fun: :random
end
Expand Down
1 change: 1 addition & 0 deletions test/kafee/producer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ defmodule Kafee.ProducerTest do
test "allows setting config via using macro" do
defmodule MyTestProducer do
use Kafee.Producer,
otp_app: :kafee,
adapter: Kafee.Producer.TestAdapter,
topic: "my super-amazing-test-topic",
partition_fun: :random
Expand Down
1 change: 1 addition & 0 deletions test/kafee/test_async_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Kafee.TestAsyncTest do

defmodule TestProducer do
use Kafee.Producer,
otp_app: :kafee,
adapter: Kafee.Producer.TestAdapter,
topic: "kafee-test-async-test",
partition_fun: :random
Expand Down
1 change: 1 addition & 0 deletions test/kafee/test_sync_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Kafee.TestSyncTest do

defmodule TestProducer do
use Kafee.Producer,
otp_app: :kafee,
adapter: Kafee.Producer.TestAdapter,
topic: "kafee-test-sync-test",
partition_fun: :random
Expand Down
1 change: 1 addition & 0 deletions test/support/example/my_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule MyConsumer do
"""

use Kafee.Consumer,
otp_app: :kafee,
adapter: nil,
host: "localhost",
consumer_group_id: "test",
Expand Down
1 change: 1 addition & 0 deletions test/support/example/my_producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule MyProducer do
"""

use Kafee.Producer,
otp_app: :kafee,
adapter: nil,
topic: "test-topic",
partition_fun: :random
Expand Down

0 comments on commit dafb878

Please sign in to comment.