Skip to content
This repository has been archived by the owner on Sep 16, 2020. It is now read-only.

Commit

Permalink
use erlang's queue data-structure for mantaining pending jobs, instea…
Browse files Browse the repository at this point in the history
…d of a list
  • Loading branch information
Nimish committed May 26, 2016
1 parent 608aae6 commit 49f658e
Showing 1 changed file with 11 additions and 10 deletions.
21 changes: 11 additions & 10 deletions lib/toniq/job_concurrency_limiter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,16 @@ defmodule Toniq.JobConcurrencyLimiter do
end

defp run_next_pending_job(state, job), do: run_next_pending_job(state, job, pending_jobs(state, job))
defp run_next_pending_job(state, _job, []), do: state
defp run_next_pending_job(state, job, pending_jobs) do
[ first_pending_job | pending_jobs ] = pending_jobs

state = run_now(state, first_pending_job)

update_worker_state(state, job,
%{ worker_state(state, job) | pending_jobs: pending_jobs }
)
case :queue.out(pending_jobs) do
{:empty, _pending_jobs} ->
state
{{:value, first_pending_job}, pending_jobs} ->
state = run_now(state, first_pending_job)
update_worker_state(state, job,
%{ worker_state(state, job) | pending_jobs: pending_jobs }
)
end
end

defp run_now(state, {job, caller}) do
Expand All @@ -83,7 +84,7 @@ defmodule Toniq.JobConcurrencyLimiter do
defp run_later(state, {job, caller}) do
worker_state = worker_state(state, job)
update_worker_state(state, job,
%{ worker_state | pending_jobs: worker_state.pending_jobs ++ [ {job, caller} ] }
%{ worker_state | pending_jobs: :queue.in({job, caller}, worker_state.pending_jobs) }
)
end

Expand All @@ -109,5 +110,5 @@ defmodule Toniq.JobConcurrencyLimiter do
defp update_worker_state(state, job, worker_state), do: Map.put(state, job.worker, worker_state)
defp running_count(state, job), do: worker_state(state, job).running_count
defp pending_jobs(state, job), do: worker_state(state, job).pending_jobs
defp worker_state(state, job), do: Map.get(state, job.worker, %{ pending_jobs: [], running_count: 0 })
defp worker_state(state, job), do: Map.get(state, job.worker, %{ pending_jobs: :queue.new(), running_count: 0 })
end

0 comments on commit 49f658e

Please sign in to comment.