Skip to content

Commit

Permalink
fix: restart all event loops after a failure or restart in messaging
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangulle committed Feb 7, 2025
1 parent c82dda9 commit a2ae03d
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 11 deletions.
4 changes: 2 additions & 2 deletions src/plauna/client.clj
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@
(when (some? category-name)
(t/log! :info (str "Moving email: " (-> event :payload :header :subject) " categorized as: " (-> event :payload :metadata :category)))
(try (move-messages-by-id (-> event :options :store) message-id (-> event :options :original-folder) category-name)
(catch Exception e (t/log! :error (.getMessage e))))))))
(recur (async/<! local-chan)))))
(catch Exception e (t/log! :error (.getMessage e)))))))
(recur (async/<! local-chan))))))

(defn create-folders
([store folder-names]
Expand Down
13 changes: 10 additions & 3 deletions src/plauna/core/events.clj
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,16 @@
key)))

(defn keep-track [active-register event-register]
(go-loop [merged (async/merge (vals active-register))]
(when-let [event-key (<! merged)]
(recur (async/merge (vals (conj active-register {event-key (return-key-on-complete event-key (get event-register event-key))})))))))
(let [event-chan (chan 10)
event-mix (async/mix event-chan)]
(doseq [val (vals active-register)] (async/admix event-mix val))
(go-loop [mix event-mix
register active-register]
(when-let [event-key (<! event-chan)]
(async/unmix mix (event-key register))
(let [new-chan (return-key-on-complete event-key (get event-register event-key))]
(async/admix mix new-chan)
(recur mix (conj register {event-key new-chan})))))))

(defn start-event-loops
"Start event loops which restart by themselves if they somehow complete.
Expand Down
2 changes: 2 additions & 0 deletions src/plauna/entry.clj
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
(t/set-min-level! :slf4j "org.eclipse.jetty.server.*" :error)
(tstreams/streams->telemere!)

(comment (t/set-min-level! :debug))

(set! *warn-on-reflection* true)

(defmulti parse-cli-arg (fn [arg] (first (s/split arg #"="))))
Expand Down
1 change: 0 additions & 1 deletion src/plauna/markup.clj
Original file line number Diff line number Diff line change
Expand Up @@ -138,5 +138,4 @@
(defn watcher [client folders] (render-file "watcher.html" {:id (-> client first :id) :host (:host (first client)) :user (:user (first client)) :folders folders}))

(defn preferences-page [data] (let [log-levels {:log-level-options [{:key :error :name "Error"} {:key :info :name "Info"} {:key :debug :name "Debug"}]}]
(println (conj data log-levels))
(render-file "admin-preferences.html" (conj data log-levels))))
5 changes: 2 additions & 3 deletions src/plauna/messaging.clj
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,5 @@
(recur))
bucket-channel))

(comment (channel-limiter :test)
(async/<!! @main-chan)
(restart-main-chan))
(comment
(restart-main-chan))
2 changes: 1 addition & 1 deletion test/plauna/core/events_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
(Thread/sleep 100)
(swap! test-chan (fn [old] (async/close! old) (async/chan)))
(Thread/sleep 100)
(test/is (= 2 @test-atom))))
(test/is (= 3 @test-atom))))

(test/deftest event-register-works
(let [test-atom (atom 0)
Expand Down
1 change: 0 additions & 1 deletion test/plauna/preferences.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
(:require [plauna.preferences :as sut]
[clojure.test :as t]))


(defn return-fn-for-preferences [returns] (swap! sut/fetch-fn (fn [_] (fn [_] returns))))

(t/deftest fetch-returns-default
Expand Down

0 comments on commit a2ae03d

Please sign in to comment.