Skip to content

Commit

Permalink
Added amqp.
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-br committed Apr 23, 2024
1 parent c025822 commit 2515369
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 12 deletions.
1 change: 1 addition & 0 deletions lib/clear_settle_engine.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ defmodule ClearSettleEngine do
def start(_type, _args) do
children = [
ClearSettleEngine.Repo,
RabbitMQSubscriber,
{SuccessfulDayScheduler, []}
]

Expand Down
2 changes: 1 addition & 1 deletion lib/mix/tasks/successful_day.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule Mix.Tasks.SuccessfulDay do

require Logger

@length_of_day_in_seconds 120
@length_of_day_in_seconds 10

@shortdoc "Submits trades every 5 seconds."
def run(_) do
Expand Down
115 changes: 115 additions & 0 deletions lib/rabbitmq_subscriber.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
defmodule RabbitMQSubscriber do
use GenServer

@moduledoc """
A GenServer client module for subscribing to RabbitMQ messages using consume.
"""

# Starts the GenServer
def start_link(_) do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end

# Initializes the RabbitMQ connection, declares the queue, exchange, binds them and consumes messages.
def init(:ok) do
# Establish the connection
host = System.get_env("RABBITMQ_HOST") || "localhost"
{:ok, connection} = AMQP.Connection.open("amqp://guest:guest@#{host}")
{:ok, channel} = AMQP.Channel.open(connection)
{:ok, publisher_channel} = AMQP.Channel.open(connection)

# Declare the queue and the exchange
queue = "queue"
exchange = "start_transactions"
response_exchange = "response_exchange"
response_queue = "response_queue"

AMQP.Queue.declare(channel, queue)
AMQP.Exchange.declare(channel, exchange, :topic, durable: true)
AMQP.Queue.bind(channel, queue, exchange)

# New line: Declare the response exchange
AMQP.Queue.declare(publisher_channel, response_queue)
AMQP.Exchange.declare(publisher_channel, response_exchange, :topic, durable: true)
AMQP.Queue.bind(publisher_channel, response_queue, response_exchange)

# Start consuming messages
{:ok, _consumer_tag} =
AMQP.Basic.consume(
channel,
queue,
nil,
no_ack: true
)

AMQP.Basic.publish(publisher_channel, "response_exchange", "", "already running")

{:ok, %{channel: channel, connection: connection, publisher_channel: publisher_channel}}
end

def handle_info(
{:basic_deliver, payload, %{delivery_tag: tag, redelivered: redelivered}},
%{
publisher_channel: publisher_channel,
channel: channel
} = state
) do
# You might want to run payload consumption in separate Tasks in production
IO.puts("Received: #{payload}")
IO.puts(inspect(channel))
IO.puts(inspect(publisher_channel))

IO.puts("Check")

case GenServer.call(SuccessfulDayScheduler, :start_transactions) do
:ok ->
IO.puts("Transaction started successfully!!.")

AMQP.Basic.publish(
publisher_channel,
"response_exchange",
"",
"transactions_started"
)

{:error, :already_running} ->
IO.puts("Cannot start a new transaction: another transaction is currently in progress.")

AMQP.Basic.publish(publisher_channel, "response_exchange", "", "already_running")

_ ->
IO.puts("Unexpected response from GenServer.")
end

{:noreply, state}
end

# # Handle cancel which occurs when the consumer is cancelled
# def handle_info(:basic_cancel, channel) do
# IO.puts("Consumer has been cancelled")
# {:stop, :normal, channel}
# end

# Confirmation sent by the broker after registering this process as a consumer
def handle_info({:basic_consume_ok, %{consumer_tag: consumer_tag}}, chan) do
{:noreply, chan}
end

# Sent by the broker when the consumer is unexpectedly cancelled (such as after a queue deletion)
def handle_info({:basic_cancel, %{consumer_tag: consumer_tag}}, chan) do
{:stop, :normal, chan}
end

# Confirmation sent by the broker to the consumer process after a Basic.cancel
def handle_info({:basic_cancel_ok, %{consumer_tag: consumer_tag}}, chan) do
{:noreply, chan}
end

# defp publish_response(channel, response) do
# # Specify the exchange and routing key for publishing the response
# response_exchange = "response_exchange"
# IO.puts("gets here")
# # Publish the response to the exchange with the specified routing key
# AMQP.Basic.publish(channel, response_exchange, "", response)
# end
end
66 changes: 55 additions & 11 deletions lib/successful_day_scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,37 @@ defmodule SuccessfulDayScheduler do
use GenServer

def start_link(_opts) do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
# false indicates that no task is currently running
GenServer.start_link(__MODULE__, {:ok, false}, name: __MODULE__)
end

def init(:ok) do
IO.puts("initializing")
def init({:ok, task_running?}) do
IO.puts("Initializing")
# Initialize the state with the task_running flag
{:ok, task_running?}
end

def handle_call(:start_transactions, _from, false) do
IO.puts("Starting transactions")
spawn_task_and_set_running()
# Reply with :ok and update state to true
{:reply, :ok, true}
end

spawn_task_and_wait()
{:ok, nil}
def handle_call(:start_transactions, _from, true) do
IO.puts("A task is already running. Ignoring request.")
# Reply with an error tuple
{:reply, {:error, :already_running}, true}
end

def handle_info(:task_completed, state) do
IO.puts("task completed")
spawn_task_and_wait()
{:noreply, state}
def handle_info(:task_completed, _state) do
IO.puts("Task completed")
# Set task_running to false to allow new tasks
{:noreply, false}
end

def spawn_task_and_wait do
IO.puts("spawking task")
defp spawn_task_and_set_running do
IO.puts("Spawning task")
pid = self()

spawn_link(fn ->
Expand All @@ -28,3 +41,34 @@ defmodule SuccessfulDayScheduler do
end)
end
end

# defmodule SuccessfulDayScheduler do
# use GenServer

# def start_link(_opts) do
# GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
# end

# def init(:ok) do
# IO.puts("initializing")

# spawn_task_and_wait()
# {:ok, nil}
# end

# def handle_info(:task_completed, state) do
# IO.puts("task completed")
# spawn_task_and_wait()
# {:noreply, state}
# end

# def spawn_task_and_wait do
# IO.puts("spawking task")
# pid = self()

# spawn_link(fn ->
# Mix.Tasks.SuccessfulDay.run([])
# send(pid, :task_completed)
# end)
# end
# end

0 comments on commit 2515369

Please sign in to comment.