Skip to content

Commit

Permalink
use Semaphore instead of Mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
wr0ngway committed Jan 22, 2024
1 parent 176e897 commit 799f02a
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 14 deletions.
7 changes: 3 additions & 4 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@ GEM
tzinfo (~> 2.0)
addressable (2.8.6)
public_suffix (>= 2.0.2, < 6.0)
async (2.8.0)
async (1.31.0)
console (~> 1.10)
fiber-annotation
io-event (~> 1.1)
nio4r (~> 2.3)
timers (~> 4.1)
base64 (0.2.0)
bigdecimal (3.1.6)
Expand Down Expand Up @@ -70,7 +69,6 @@ GEM
i18n (1.14.1)
concurrent-ruby (~> 1.0)
io-console (0.7.2)
io-event (1.4.2)
irb (1.11.1)
rdoc
reline (>= 0.4.2)
Expand Down Expand Up @@ -100,6 +98,7 @@ GEM
net-http (0.4.1)
uri
netrc (0.11.0)
nio4r (2.7.0)
pry (0.14.2)
coderay (~> 1.1)
method_source (~> 1.0)
Expand Down
22 changes: 14 additions & 8 deletions lib/kubetruth/ctapi.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
require "cloudtruth-client"
require_relative 'parameter'
require 'faraday-cookie_jar'
require 'async/semaphore'

module Kubetruth
class CtApi
include GemLogger::LoggerSupport

@@config = nil
@@ctapis = {}
@@ctapis_mutex = Async::Semaphore.new(1)

def self.configure(api_key:, api_url:)
if api_key.nil? || api_url.nil?
Expand Down Expand Up @@ -55,17 +57,21 @@ def auth_settings
# polling cycle to mitigate costs of fetching all projects/environments for
# ID lookup
def self.create(environment: "default", tag: nil)
@@ctapis[[environment, tag]] ||= CtApi.new(environment: environment, tag: tag)
@@ctapis_mutex.acquire do
@@ctapis[[environment, tag]] ||= CtApi.new(environment: environment, tag: tag)
end
end

def self.reset
@@ctapis = {}
@@ctapis_mutex.acquire do
@@ctapis = {}
end
end

def initialize(environment: "default", tag: nil)
@environments_mutex = Mutex.new
@projects_mutex = Mutex.new
@templates_mutex = Mutex.new
@environments_mutex = Async::Semaphore.new(1)
@projects_mutex = Async::Semaphore.new(1)
@templates_mutex = Async::Semaphore.new(1)

raise ArgumentError.new("CtApi has not been configured") if @@config.nil?

Expand All @@ -82,7 +88,7 @@ def initialize(environment: "default", tag: nil)
end

def environments
@environments_mutex.synchronize do
@environments_mutex.acquire do
@environments ||= begin
result = apis[:environments].environments_list
logger.debug{"Environments query result: #{result.inspect}"}
Expand All @@ -102,7 +108,7 @@ def environment_id(environment)
end

def projects
@projects_mutex.synchronize do
@projects_mutex.acquire do
@projects ||= begin
result = apis[:projects].projects_list
logger.debug{"Projects query result: #{result.inspect}"}
Expand Down Expand Up @@ -160,7 +166,7 @@ def parameters(project:)
end

def templates(project:)
@templates_mutex.synchronize do
@templates_mutex.acquire do
@templates ||= {}
@templates[project] ||= begin
proj_id = projects[project]
Expand Down
5 changes: 3 additions & 2 deletions lib/kubetruth/kubeapi.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'kubeclient'
require 'async/semaphore'

module Kubetruth
class KubeApi
Expand Down Expand Up @@ -47,7 +48,7 @@ def initialize(namespace: nil, token: nil, api_url: nil)

@api_url = api_url || 'https://kubernetes.default.svc'
@api_clients = {}
@namespace_mutex = Mutex.new
@namespace_mutex = Async::Semaphore.new(1)
end

def api_url(api)
Expand Down Expand Up @@ -80,7 +81,7 @@ def apiVersion_client(apiVersion)
end

def ensure_namespace(ns = namespace)
@namespace_mutex.synchronize do
@namespace_mutex.acquire do
begin
client.get_namespace(ns)
rescue Kubeclient::ResourceNotFoundError
Expand Down
26 changes: 26 additions & 0 deletions spec/kubetruth/etl_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,32 @@ class ForceExit < Exception; end
expect(n3).to eq("parenttask -> unnamed -> childtask")
end

it "allows semaphore as mutex" do
val = nil
order = []
mutex = Async::Semaphore.new(1)
etl.async(annotation: "parenttask") do |t|
etl.async(annotation: "task1") do |t1|
order << 1
mutex.acquire do
order << 2
sleep(0.1)
order << 4
val ||= "task1"
end
end
etl.async(annotation: "task2") do |t1|
order << 3
mutex.acquire do
order << 5
val ||= "task2"
end
end
end
expect(val).to eq("task1")
expect(order).to eq([1, 2, 3, 4, 5])
end

end

end
Expand Down

0 comments on commit 799f02a

Please sign in to comment.