Skip to content
This repository has been archived by the owner on Jul 25, 2022. It is now read-only.

Commit

Permalink
Sync log
Browse files Browse the repository at this point in the history
Store transactions into SyncLog and return them via separate endpoint so
clients can get entries to sync.
  • Loading branch information
gaynetdinov committed Feb 27, 2018
1 parent f044556 commit ef1bd12
Show file tree
Hide file tree
Showing 31 changed files with 527 additions and 317 deletions.
1 change: 1 addition & 0 deletions lib/ex_money.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ defmodule ExMoney do
worker(ExMoney.IdleWorker, [], restart: :transient),
worker(ExMoney.AccountsBalanceHistoryWorker, []),
worker(ExMoney.Saltedge.SyncWorker, []),
worker(ExMoney.SyncLogWorker, []),
worker(ExMoney.Scheduler, []),
worker(ExMoney.RuleProcessor, []),
worker(ExMoney.Saltedge.LoginRefreshWorker, [], restart: :transient),
Expand Down
24 changes: 8 additions & 16 deletions lib/ex_money/rule_processor.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule ExMoney.RuleProcessor do
use GenServer

alias ExMoney.{Repo, Rule, Transaction, Category, Account}
alias ExMoney.{Repo, Rule, Transactions, Category, Account}

import Ecto.Query

Expand All @@ -10,9 +10,7 @@ defmodule ExMoney.RuleProcessor do
end

def handle_cast({:process, transaction_id}, _state) do
transaction = Transaction
|> where([tr], tr.id == ^transaction_id)
|> Repo.one
transaction = Transactions.get_transaction(transaction_id)

Rule
|> where([r], r.account_id == ^transaction.account_id)
Expand All @@ -31,13 +29,10 @@ defmodule ExMoney.RuleProcessor do
def handle_cast({:process_all, rule_id}, _state) do
rule = Repo.get(Rule, rule_id)

transactions = Repo.all(from tr in Transaction,
where: tr.account_id == ^rule.account_id,
where: tr.rule_applied == false,
where: fragment("description ~* ?", ^rule.pattern) or fragment("extra->>'payee' ~* ?", ^rule.pattern))
transactions = Transactions.search(rule.account_id, rule.pattern)

Enum.each(transactions, fn(tr) ->
GenServer.cast(:rule_processor, {:process, tr.id})
Enum.each(transactions, fn(transaction) ->
GenServer.cast(:rule_processor, {:process, transaction.id})
end)

{:noreply, %{}}
Expand All @@ -47,8 +42,7 @@ defmodule ExMoney.RuleProcessor do
{:ok, re} = Regex.compile(rule.pattern, "i")

if Regex.match?(re, transaction_description(transaction, transaction.extra)) do
Transaction.update_changeset(transaction, %{category_id: rule.target_id, rule_applied: true})
|> Repo.update!
Transactions.update_transaction!(transaction, %{category_id: rule.target_id, rule_applied: true})
end
end

Expand All @@ -62,7 +56,7 @@ defmodule ExMoney.RuleProcessor do
Decimal.compare(transaction.amount, Decimal.new(0)) == Decimal.new(-1) do

Repo.transaction(fn ->
Transaction.changeset_custom(%Transaction{},
Transactions.create_custom_transaction!(
%{
"amount" => transaction.amount,
"category_id" => withdraw_category.id,
Expand All @@ -72,10 +66,8 @@ defmodule ExMoney.RuleProcessor do
"type" => "expense"
}
)
|> Repo.insert!

Transaction.update_changeset(transaction, %{rule_applied: true, category_id: withdraw_category.id})
|> Repo.update!
Transactions.update_transaction!(transaction, %{rule_applied: true, category_id: withdraw_category.id})

# transaction.amount is negative, sub with something negative -> add
new_balance = Decimal.sub(account.balance, transaction.amount)
Expand Down
19 changes: 7 additions & 12 deletions lib/ex_money/saltedge/transactions_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule ExMoney.Saltedge.TransactionsWorker do

require Logger

alias ExMoney.{Repo, Transaction, Category, Account}
alias ExMoney.{Repo, Transactions, Category, Account}

def start_link(_opts \\ []) do
GenServer.start_link(__MODULE__, :ok, name: :transactions_worker)
Expand Down Expand Up @@ -112,31 +112,27 @@ defmodule ExMoney.Saltedge.TransactionsWorker do
end

defp store(transactions, account) do
Enum.reduce(transactions, 0, fn(se_tran, acc) ->
Enum.reduce transactions, 0, fn(se_tran, acc) ->
se_tran = Map.put(se_tran, "saltedge_transaction_id", se_tran["id"])
se_tran = Map.put(se_tran, "saltedge_account_id", se_tran["account_id"])
se_tran = Map.put(se_tran, "user_id", account.user_id)
se_tran = Map.drop(se_tran, ["id", "account_id"])
se_tran = Map.put(se_tran, "account_id", account.id)

existing_transaction = Transaction.
existing_transaction = Transactions.
by_saltedge_transaction_id(se_tran["saltedge_transaction_id"])
|> Repo.one

if !existing_transaction and !se_tran["duplicated"] do
se_tran = set_category_id(se_tran)

changeset = Transaction.changeset(%Transaction{}, se_tran)
{:ok, inserted_transaction} = Repo.transaction fn ->
Repo.insert!(changeset)
end
inserted_transaction = Transactions.create_transaction!(se_tran)
GenServer.cast(:rule_processor, {:process, inserted_transaction.id})

GenServer.cast(:sync_log_worker, {:store_transaction, inserted_transaction.id})
acc + 1
else
acc
end
end)
end
end

defp set_category_id(transaction) do
Expand All @@ -162,7 +158,6 @@ defmodule ExMoney.Saltedge.TransactionsWorker do

defp find_last_transaction(saltedge_account_id) do
# FIXME: set last_transaction_id in cache during import
Transaction.newest(saltedge_account_id)
|> Repo.one
Transactions.newest(saltedge_account_id)
end
end
25 changes: 25 additions & 0 deletions lib/ex_money/sync_log_api/sync_log.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
defmodule ExMoney.SyncLogApi.SyncLog do
use Ecto.Schema
import Ecto.Changeset

schema "sync_log" do
field :uid, :string
field :action, :string
field :entity, :string
field :payload, :map
field :synced_at, :naive_datetime

timestamps()
end

def changeset(model, params \\ %{}) do
model
|> cast(params, [:action, :entity, :payload, :synced_at])
|> validate_required([:action, :entity, :payload])
|> generate_uid()
end

defp generate_uid(changeset) do
Ecto.Changeset.put_change(changeset, :uid, Ecto.UUID.generate())
end
end
26 changes: 26 additions & 0 deletions lib/ex_money/sync_log_api/sync_log_api.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
defmodule ExMoney.SyncLogApi do
@moduledoc """
Api functions to sync things.
"""

import Ecto.Query, warn: false
alias ExMoney.Repo

alias ExMoney.SyncLogApi.SyncLog

def store(entity, action, payload) do
%SyncLog{}
|> SyncLog.changeset(%{action: action, entity: entity, payload: payload})
|> Repo.insert
end

def get(per_page) do
query =
from sl in SyncLog,
where: is_nil(sl.synced_at),
order_by: [asc: sl.inserted_at],
limit: ^per_page

Repo.all(query)
end
end
21 changes: 21 additions & 0 deletions lib/ex_money/sync_log_worker.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
defmodule ExMoney.SyncLogWorker do
use GenServer
require Logger

alias ExMoney.{SyncLogApi, Transactions}

def start_link(_opts \\ []) do
GenServer.start_link(__MODULE__, :ok, name: :sync_log_worker)
end

def handle_cast({:store_transaction, transaction_id}, state) do
payload =
Transactions.get_transaction(transaction_id)
|> Map.from_struct()
|> Map.drop([:__meta__, :user_id, :account, :category, :saltedge_account, :user])

SyncLogApi.store("Transaction", "create", payload)

{:noreply, state}
end
end
77 changes: 77 additions & 0 deletions lib/ex_money/transactions/transaction.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
defmodule ExMoney.Transactions.Transaction do
use Ecto.Schema
import Ecto.Changeset

schema "transactions" do
field :saltedge_transaction_id, :integer
field :mode, :string
field :status, :string
field :made_on, :date
field :amount, :decimal
field :currency_code, :string
field :description, :string
field :duplicated, :boolean, default: false
field :rule_applied, :boolean, default: false
field :extra, :map
field :uid, :string

belongs_to :category, ExMoney.Category
belongs_to :user, ExMoney.User
belongs_to :account, ExMoney.Account
belongs_to :saltedge_account, ExMoney.Account,
foreign_key: :saltedge_account_id,
references: :saltedge_account_id

timestamps()
end

@required_fields ~w(
saltedge_transaction_id
mode
status
made_on
amount
currency_code
description
duplicated
saltedge_account_id
account_id
user_id
)a
@optional_fields ~w(category_id rule_applied extra)a

def changeset(model, params \\ %{}) do
model
|> cast(params, @required_fields ++ @optional_fields)
|> validate_required(@required_fields)
|> generate_uid()
end

def changeset_custom(model, params \\ %{}) do
model
|> cast(params, ~w(amount category_id account_id made_on user_id description extra)a)
|> validate_required(~w(amount category_id account_id made_on user_id)a)
|> negate_amount(params)
|> generate_uid()
end

def update_changeset(model, params \\ %{}) do
model
|> cast(params, ~w(category_id description rule_applied extra)a)
end

def negate_amount(changeset, params) when params == %{}, do: changeset
def negate_amount(changeset, %{"type" => "income"}), do: changeset

def negate_amount(changeset, %{"type" => "expense"}) do
case Ecto.Changeset.fetch_change(changeset, :amount) do
{:ok, amount} ->
Ecto.Changeset.put_change(changeset, :amount, Decimal.mult(amount, Decimal.new(-1)))
:error -> changeset
end
end

defp generate_uid(changeset) do
Ecto.Changeset.put_change(changeset, :uid, Ecto.UUID.generate())
end
end
Loading

0 comments on commit ef1bd12

Please sign in to comment.