Skip to content

Commit

Permalink
fix: event loops restart when they fail
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangulle committed Feb 5, 2025
1 parent 5d443a5 commit ffacd49
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 10 deletions.
24 changes: 23 additions & 1 deletion src/plauna/core/events.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
(ns plauna.core.events)
(ns plauna.core.events
(:require
[clojure.core.async :refer [chan go <! go-loop] :as async]
[taoensso.telemere :as t]))

(defn create-event
([type payload options]
Expand All @@ -9,3 +12,22 @@
{:type type
:options (conj (:options triggering-event) options)
:payload payload}))

(defn return-key-on-complete [key fn]
(t/log! :info ["Starting restart loop for" key])
(go (let [return-val (<! (fn))]
(t/log! :debug ["Event loop for" key "returned" return-val])
key)))

(defn keep-track [active-register event-register]
(go-loop [merged (async/merge (vals active-register))]
(when-let [event-key (<! merged)]
(recur (async/merge (vals (conj active-register {event-key (return-key-on-complete event-key (get event-register event-key))})))))))

(defn start-event-loops
"Start event loops which restart by themselves if they somehow complete.
Takes an event register in the form {:event-key event-fn} where event-fn should always return a channel."
[event-register]
(let [active-register (reduce (fn [register entry] (conj register {(first entry) (return-key-on-complete (first entry) (second entry))})) {} event-register)]
(keep-track active-register event-register)))
18 changes: 9 additions & 9 deletions src/plauna/entry.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(ns plauna.entry
(:require
[clojure.string :as s]
[clojure.core.async :refer [chan]]
[clojure.core.async :refer [chan] :as async]
[plauna.files :as files]
[plauna.server :as server]
[plauna.client :as client]
Expand All @@ -10,7 +10,8 @@
[plauna.analysis :as analysis]
[taoensso.telemere :as t]
[taoensso.telemere.streams :as tstreams]
[plauna.parser :as parser])
[plauna.parser :as parser]
[plauna.core.events :as events])
(:gen-class))

(tstreams/streams->telemere!)
Expand All @@ -22,6 +23,11 @@
(defmulti parse-cli-arg (fn [arg] (first (s/split arg #"="))))
(defmethod parse-cli-arg "--config-file" [arg-string] {:config-file (second (s/split arg-string #"="))})

(def event-register {:enrichment-event-loop (fn [] (analysis/enrichment-event-loop @messaging/main-publisher @messaging/main-chan))
:client-event-loop (fn [] (client/client-event-loop @messaging/main-publisher))
:database-event-loop (fn [] (database/database-event-loop @messaging/main-publisher))
:parser-event-loop (fn [] (parser/parser-event-loop @messaging/main-publisher @messaging/main-chan))})

(defn start-imap-client
[config]
(let [listen-channel (chan 10)]
Expand All @@ -30,12 +36,6 @@
(client/create-folder-monitor client-config listen-channel))
(t/log! :debug "Listening to new emails from listen-channel")))

(defn start-event-loops [main-publisher main-channel]
(parser/parser-event-loop main-publisher main-channel)
(database/database-event-loop main-publisher)
(client/client-event-loop main-publisher)
(analysis/enrichment-event-loop main-publisher main-channel))

(defn -main
[& args]
(let [parsed-config (reduce (fn [acc val] (conj acc (parse-cli-arg val))) {} args)]
Expand All @@ -44,5 +44,5 @@
(doseq [address (:addresses (:email (files/config)))] (database/add-to-my-addresses address))
(database/create-db)
(start-imap-client (files/config))
(start-event-loops @messaging/main-publisher @messaging/main-chan)
(events/start-event-loops event-register)
(server/start-server (files/config))))
37 changes: 37 additions & 0 deletions test/plauna/core/events_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
(ns plauna.core.events-test
(:require [plauna.core.events :as events]
[clojure.core.async :as async]
[clojure.test :as test]))

(test/deftest return-key-on-complete-works
(let [test-chan (async/chan)
test-fn (fn [] (async/go (async/<! test-chan)))
test-case (events/return-key-on-complete :test-key test-fn)]
(async/close! test-chan)
(test/is (= :test-key (async/<!! test-case)))))

(test/deftest keep-track-works
(let [test-atom (atom 0)
test-chan (atom (async/chan))
test-fn (fn [] (async/go (swap! test-atom inc) (async/<! @test-chan)))
test-register {:test-case test-fn}]
(events/keep-track {:test-case (events/return-key-on-complete :test-case test-fn)} test-register)
(swap! test-chan (fn [old] (async/close! old) (async/chan)))
(Thread/sleep 100)
(swap! test-chan (fn [old] (async/close! old) (async/chan)))
(Thread/sleep 100)
(test/is (= 2 @test-atom))))

(test/deftest event-register-works
(let [test-atom (atom 0)
test-chan (atom (async/chan))
test-fn (fn [] (async/go (swap! test-atom inc) (async/<! @test-chan)))
test-register {:test-case test-fn}]
(events/start-event-loops test-register)
(swap! test-chan (fn [old] (async/close! old) (async/chan)))
(Thread/sleep 100)
(swap! test-chan (fn [old] (async/close! old) (async/chan)))
(Thread/sleep 100)
(swap! test-chan (fn [old] (async/close! old) (async/chan)))
(Thread/sleep 100)
(test/is (= 3 @test-atom))))

0 comments on commit ffacd49

Please sign in to comment.