diff --git a/lib/toniq/job_concurrency_limiter.ex b/lib/toniq/job_concurrency_limiter.ex index d05b44e..0e76b5a 100644 --- a/lib/toniq/job_concurrency_limiter.ex +++ b/lib/toniq/job_concurrency_limiter.ex @@ -64,15 +64,16 @@ defmodule Toniq.JobConcurrencyLimiter do end defp run_next_pending_job(state, job), do: run_next_pending_job(state, job, pending_jobs(state, job)) - defp run_next_pending_job(state, _job, []), do: state defp run_next_pending_job(state, job, pending_jobs) do - [ first_pending_job | pending_jobs ] = pending_jobs - - state = run_now(state, first_pending_job) - - update_worker_state(state, job, - %{ worker_state(state, job) | pending_jobs: pending_jobs } - ) + case :queue.out(pending_jobs) do + {:empty, _pending_jobs} -> + state + {{:value, first_pending_job}, pending_jobs} -> + state = run_now(state, first_pending_job) + update_worker_state(state, job, + %{ worker_state(state, job) | pending_jobs: pending_jobs } + ) + end end defp run_now(state, {job, caller}) do @@ -83,7 +84,7 @@ defmodule Toniq.JobConcurrencyLimiter do defp run_later(state, {job, caller}) do worker_state = worker_state(state, job) update_worker_state(state, job, - %{ worker_state | pending_jobs: worker_state.pending_jobs ++ [ {job, caller} ] } + %{ worker_state | pending_jobs: :queue.in({job, caller}, worker_state.pending_jobs) } ) end @@ -109,5 +110,5 @@ defmodule Toniq.JobConcurrencyLimiter do defp update_worker_state(state, job, worker_state), do: Map.put(state, job.worker, worker_state) defp running_count(state, job), do: worker_state(state, job).running_count defp pending_jobs(state, job), do: worker_state(state, job).pending_jobs - defp worker_state(state, job), do: Map.get(state, job.worker, %{ pending_jobs: [], running_count: 0 }) + defp worker_state(state, job), do: Map.get(state, job.worker, %{ pending_jobs: :queue.new(), running_count: 0 }) end