Skip to content

Commit

Permalink
allow async concurrency tuning
Browse files Browse the repository at this point in the history
  • Loading branch information
wr0ngway committed Oct 24, 2023
1 parent f78ca9f commit 82b01f8
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 14 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ old-index.yaml
client
.vscode/settings.json
.env
.vscode/launch.json
4 changes: 4 additions & 0 deletions helm/kubetruth/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ spec:
{{- if not .Values.appSettings.async }}
- --no-async
{{- end }}
{{- if .Values.appSettings.concurrency }}
- --concurrency
- "{{ .Values.appSettings.concurrency }}"
{{- end }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
Expand Down
3 changes: 2 additions & 1 deletion helm/kubetruth/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ appSettings:
pollingInterval:
debug: false
async: true
concurrency:

# If secret.create == false, secret generation is left to the user
# outside of the chart, and a name should be specified
Expand Down Expand Up @@ -182,7 +183,7 @@ projectMappings:
{%- if name_parts.size > 1 %}
namespace: "{{ name_parts | first }}"
{%- endif %}
spec: {{ templates[template] | nindent: 2 }}
{%- endif %}
Expand Down
6 changes: 5 additions & 1 deletion lib/kubetruth/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ class CLI < CLIBase
:flag, "Run using async I/O (or not)",
default: true

option ["-c", "--concurrency"], "CONCURRENCY", "Concurrency limit for async I/O", default: 3 do |a|
Integer(a)
end

# TODO: option to map template to configmap?

def execute
Expand All @@ -51,7 +55,7 @@ def execute
Kubetruth::CtApi.configure(api_key: api_key, api_url: api_url)
Kubetruth::KubeApi.configure(namespace: kube_namespace, token: kube_token, api_url: kube_url)

etl = ETL.new(dry_run: dry_run?, async: async?)
etl = ETL.new(dry_run: dry_run?, async: async?, async_concurrency: concurrency)

Signal.trap("HUP") do
puts "Handling HUP signal - waking up ETL poller" # logger cant be called from trap
Expand Down
23 changes: 15 additions & 8 deletions lib/kubetruth/etl.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require 'benchmark'
require 'yaml'
require 'async'
require 'async/semaphore'
require 'yaml/safe_load_stream'
using YAMLSafeLoadStream

Expand All @@ -14,9 +15,11 @@ module Kubetruth
class ETL
include GemLogger::LoggerSupport

def initialize(dry_run: false, async: true)
def initialize(dry_run: false, async: true, async_concurrency: 3)
@dry_run = dry_run
@async = async
@async_concurrency = async_concurrency
@async_semaphore = nil
@wrote_crds = false
end

Expand Down Expand Up @@ -88,14 +91,14 @@ def with_polling(interval, &block)

def async_task_tree(task)
msg = ""

if task.parent
# The root task seems to always be nil, so exclude it from name
unless task.parent.parent.nil? && task.parent.annotation.blank?
msg << async_task_tree(task.parent) << " -> "
end
end

msg << (task.annotation ? task.annotation : "unnamed")
msg
end
Expand All @@ -106,7 +109,7 @@ def wait

def async(*args, **kwargs)
if @async
Async(*args, **kwargs) do |task|
blk = ->(task) {
task_name = async_task_tree(task)
begin
logger.info "Starting async task: #{task_name}"
Expand All @@ -116,7 +119,8 @@ def async(*args, **kwargs)
logger.log_exception(e, "Failure in async task: #{task_name}")
task.stop
end
end
}
@async_semaphore.nil? ? Async(*args, **kwargs, &blk) : @async_semaphore.async(*args, **kwargs, &blk)
else
task_name = kwargs[:annotation] || "unnamed"
begin
Expand Down Expand Up @@ -179,7 +183,7 @@ def initialize(project)
def params_to_hash(param_list)
Hash[param_list.collect {|param| [param.key, param.value]}]
end

def params
@param_data ||= begin
# constructing the hash will cause any overrides to happen in the right
Expand Down Expand Up @@ -216,6 +220,7 @@ def params

def apply
async(annotation: "ETL Event Loop") do
@async_semaphore = Async::Semaphore.new(@async_concurrency) if @async
logger.warn("Performing dry-run") if @dry_run

load_config do |namespace, config|
Expand Down Expand Up @@ -280,7 +285,7 @@ def apply
parsed_ymls.each do |parsed_yml|
if parsed_yml.present?
async(annotation: "Apply Template: #{template_id}") do
kube_apply(parsed_yml)
kube_apply(parsed_yml)
end
else
logger.debug {"Skipping empty stream template"}
Expand All @@ -293,7 +298,9 @@ def apply
end
end
end
end.wait
ensure
@async_semaphore = nil
end.wait
end

def kube_apply(parsed_yml)
Expand Down
3 changes: 2 additions & 1 deletion spec/kubetruth/cli_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ module Kubetruth
--kube-url ku
--dry-run
--no-async
--concurrency 5
--polling-interval 27
]

Expand All @@ -42,7 +43,7 @@ module Kubetruth
})

etl = double(ETL)
expect(ETL).to receive(:new).with(dry_run: true, async: false).and_return(etl)
expect(ETL).to receive(:new).with(dry_run: true, async: false, async_concurrency: 5).and_return(etl)
expect(etl).to receive(:apply)
expect(etl).to receive(:with_polling).with(27).and_yield
cli.run(args)
Expand Down
8 changes: 5 additions & 3 deletions spec/kubetruth/etl_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ class ForceExit < Exception; end
let(:config) {
Kubetruth::Config.new([root_spec_crd])
}
let(:collection) { ProjectCollection.new(config) }
let(:collection) { ProjectCollection.new(config.root_spec) }

before(:each) do

Expand All @@ -447,7 +447,7 @@ class ForceExit < Exception; end

it "renders multiple templates" do
allow(etl).to receive(:load_config).and_yield(@ns, config)

expect(collection).to receive(:names).and_return(["proj1"])

expect(etl).to receive(:kube_apply).with(hash_including("kind" => "ConfigMap"))
Expand Down Expand Up @@ -629,7 +629,7 @@ class ForceExit < Exception; end
let(:config) {
Kubetruth::Config.new([root_spec_crd])
}
let(:collection) { ProjectCollection.new(config) }
let(:collection) { ProjectCollection.new(config.root_spec) }

before(:each) do

Expand Down Expand Up @@ -683,6 +683,7 @@ class ForceExit < Exception; end
it "sets log level when supplied by root pm in config" do
Kubetruth::Logging.root_log_level = "error" # undo debug logging set by test harness
config.root_spec.log_level = "error"
Logging.clear

allow(etl).to receive(:load_config).and_yield(@ns, config)
allow(config.root_spec.resource_templates["configmap"]).to receive(:render).and_return("")
Expand All @@ -698,6 +699,7 @@ class ForceExit < Exception; end

it "sets log level when supplied by override pm in config" do
Kubetruth::Logging.root_log_level = "error" # undo debug logging set by test harness
Logging.clear

override_crd = {scope: "override", project_selector: "proj1", log_level: "debug"}
config = Kubetruth::Config.new([root_spec_crd, override_crd])
Expand Down

0 comments on commit 82b01f8

Please sign in to comment.