Skip to content

Commit

Permalink
Added amqp.
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-br committed Apr 23, 2024
1 parent bcbcff4 commit 0150f37
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 2 deletions.
1 change: 1 addition & 0 deletions lib/clear_settle_engine_admin/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ defmodule ClearSettleEngineAdmin.Application do
children = [
ClearSettleEngineAdminWeb.Telemetry,
ClearSettleEngineAdmin.Repo,
{ClearSettleEngineAdmin.RabbitMQClient, []},
NotificationListener,
{DNSCluster,
query: Application.get_env(:clear_settle_engine_admin, :dns_cluster_query) || :ignore},
Expand Down
78 changes: 78 additions & 0 deletions lib/clear_settle_engine_admin/rabbitmq_client.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
defmodule ClearSettleEngineAdmin.RabbitMQClient do
use GenServer

@moduledoc """
A GenServer client module for interacting with RabbitMQ.
"""

# API for starting the GenServer
def start_link(args) do
GenServer.start_link(__MODULE__, args, name: __MODULE__)
end

# GenServer callbacks
def init(_args) do
queue = "queue"
response_exchange = "response_exchange"
response_queue = "response_queue"

host = System.get_env("RABBITMQ_HOST") || "localhost"
{:ok, connection} = AMQP.Connection.open("amqp://guest:guest@#{host}")
{:ok, channel} = AMQP.Channel.open(connection)
{:ok, consuming_channel} = AMQP.Channel.open(connection)

AMQP.Queue.declare(channel, "queue")
AMQP.Exchange.declare(channel, "start_transactions", :topic, durable: true)
AMQP.Queue.bind(channel, "queue", "start_transactions")

AMQP.Queue.declare(consuming_channel, response_queue)
AMQP.Exchange.declare(consuming_channel, response_exchange, :topic, durable: true)
AMQP.Queue.bind(consuming_channel, response_queue, response_exchange)

{:ok, _consumer_tag} =
AMQP.Basic.consume(
consuming_channel,
response_queue,
nil,
no_ack: true
)

{:ok, %{channel: channel, connection: connection, consuming_channel: consuming_channel}}
end

def handle_call({:publish, message}, _from, %{channel: channel} = state) do
IO.puts("publishing again")
AMQP.Basic.publish(channel, "start_transactions", "", message)
{:reply, :ok, state}
end

def handle_info(
{:basic_deliver, payload, %{delivery_tag: tag, redelivered: redelivered}},
state
) do
# You might want to run payload consumption in separate Tasks in production
IO.puts("Received in admin: #{payload}")

Phoenix.PubSub.broadcast(
ClearSettleEngineAdmin.PubSub,
"trades_and_balances_updates",
{"transactions_started", payload}
)

{:noreply, state}
end

def handle_info({:basic_consume_ok, %{consumer_tag: consumer_tag}}, state) do
{:noreply, state}
end

# Sent by the broker when the consumer is unexpectedly cancelled (such as after a queue deletion)
def handle_info({:basic_cancel, %{consumer_tag: consumer_tag}}, state) do
{:stop, :normal, state}
end

# Confirmation sent by the broker to the consumer process after a Basic.cancel
def handle_info({:basic_cancel_ok, %{consumer_tag: consumer_tag}}, state) do
{:noreply, state}
end
end
Original file line number Diff line number Diff line change
@@ -1,10 +1,26 @@
<div id="balances">
<div class="flex justify-center mb-4">
<button
phx-click="schedule_trades"
class="bg-blue-500 hover:bg-blue-700 text-white font-bold py-2 px-4 rounded"
>
Schedule Trades
</button>
</div>
<h2 class="text-lg font-bold mb-4">Security Balances</h2>
<style>
.negative-balance {
background-color: #FF7F50; /* Coral */
color: white; /* Ensures text is still visible on red background */
}
/* Styles for the toast messages */
.info {
background-color: #3182ce; /* Blue */
}

.error {
background-color: #e53e3e; /* Red */
}
</style>
<table class="table-auto border-collapse border border-gray-400 w-full">
<thead>
Expand All @@ -16,7 +32,7 @@
</thead>
<tbody>
<%= for balance <- @balances do %>
<tr class="<%= if balance["balance"] < 0, do: "negative-balance" %> border border-gray-400">
<tr class={"#{if balance["balance"] < 0, do: "negative-balance"} border border-gray-400"}>
<td class="px-4 py-2"><%= balance["account_number"] %></td>
<td class="px-4 py-2"><%= balance["security_id"] %></td>
<td class="px-4 py-2"><%= balance["balance"] %></td>
Expand All @@ -29,6 +45,18 @@
<div id="trades">
<h2>Trades</h2>
<%= for trade <- @trades do %>
<p><%= trade["sell_side_account_number"] %> to <%= trade["buy_side_account_number"] %> - Security: <%= trade["security_id"] %> - Quantity: <%= trade["quantity"] %> - Timestamp: <%= trade["inserted_at"] %></p>
<p>
<%= trade["sell_side_account_number"] %> to <%= trade["buy_side_account_number"] %> - Security: <%= trade[
"security_id"
] %> - Quantity: <%= trade["quantity"] %> - Timestamp: <%= trade["inserted_at"] %>
</p>
<% end %>
</div>

<div id="toast-container" class="absolute top-0 left-0 right-0 p-4">
<%= if @toast != nil do %>
<div class={"px-4 py-2 rounded shadow-lg text-white #{@toast.type}"}>
<%= @toast.message %>
</div>
<% end %>
</div>
40 changes: 40 additions & 0 deletions lib/clear_settle_engine_admin_web/trades_and_balances_live.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ defmodule ClearSettleEngineAdminWeb.TradesAndBalancesLive do

{:ok,
assign(socket,
toast: nil,
trades: [],
balances: transformed_balances,
account_mapping: account_mapping,
Expand All @@ -51,6 +52,12 @@ defmodule ClearSettleEngineAdminWeb.TradesAndBalancesLive do
ClearSettleEngineAdminWeb.TradesAndBalancesView.render("index.html", assigns)
end

@impl true
def handle_event("schedule_trades", _params, socket) do
GenServer.call(ClearSettleEngineAdmin.RabbitMQClient, {:publish, "no_msg"})
{:noreply, socket}
end

def handle_info({"new_trade", trade_json}, socket) do
trade = JSON.decode!(trade_json)

Expand Down Expand Up @@ -93,6 +100,39 @@ defmodule ClearSettleEngineAdminWeb.TradesAndBalancesLive do
{:noreply, assign(socket, balances: updated_balances)}
end

@impl true
def handle_info({"transactions_started", "transactions_started"}, socket) do
socket =
assign(socket, :toast, %{
type: :info,
message: "Transactions have started successfully."
})

Process.send_after(self(), :clear_toast, 1500)

{:noreply, socket}
end

@impl true
def handle_info({"transactions_started", "already_running"}, socket) do
IO.puts("in here")

socket =
assign(socket, :toast, %{
type: :error,
message:
"Transactions are already running and cannot be started again until current batch has terminated."
})

Process.send_after(self(), :clear_toast, 1500)

{:noreply, socket}
end

def handle_info(:clear_toast, socket) do
{:noreply, assign(socket, toast: nil)}
end

def update_balances(balances, new_balance_data) do
existing_balance_index =
Enum.find_index(balances, fn balance ->
Expand Down

0 comments on commit 0150f37

Please sign in to comment.