diff --git a/README.md b/README.md index 8584584..bd5e072 100644 --- a/README.md +++ b/README.md @@ -114,6 +114,10 @@ the standard ways, e.g. `kubectl edit projectmapping kubetruth-root`. The its `project_selector` pattern against the CloudTruth project names already selected by the root `project_selector`. +Note that Kubetruth watches for changes to ProjectMappings, so touching any of +them wakes it up from a polling sleep. This makes it quick and easy to test out +configuration changes without having a short polling interval. + ### Example Config The `projectmapping` resource has a shortname of `pm` for convenience when using kubectl. @@ -138,6 +142,8 @@ spec: skip: true EOF ``` +Note that project imports are non-recursive, so if A imports B and B imports C, +then A will only get B's parameters. To override the naming of kubernetes Resources on a per-Project basis: ``` diff --git a/helm/kubetruth/crds/projectmapping.yaml b/helm/kubetruth/crds/projectmapping.yaml index f75619f..65b1c33 100644 --- a/helm/kubetruth/crds/projectmapping.yaml +++ b/helm/kubetruth/crds/projectmapping.yaml @@ -51,7 +51,7 @@ spec: type: array items: type: string - description: Include the parameters from other projects into the selected ones + description: Include the parameters from other projects into the selected ones. This is non-recursive, so if A imports B and B imports C, then A will only get B's parameters required: - scope additionalPrinterColumns: diff --git a/lib/kubetruth/cli.rb b/lib/kubetruth/cli.rb index fd98d0e..2de8423 100755 --- a/lib/kubetruth/cli.rb +++ b/lib/kubetruth/cli.rb @@ -89,23 +89,10 @@ def execute api_url: kube_url } - etl = ETL.new(ct_context: ct_context, kube_context: kube_context) - - while true - - begin - etl.apply(dry_run: dry_run?) - rescue => e - logger.log_exception(e, "Failure while applying config transforms") - end - - logger.debug("Poller sleeping for #{polling_interval}") - if dry_run? - break - else - sleep polling_interval - end + etl = ETL.new(ct_context: ct_context, kube_context: kube_context, dry_run: dry_run?) + etl.with_polling(polling_interval) do + etl.apply end end diff --git a/lib/kubetruth/etl.rb b/lib/kubetruth/etl.rb index 035ff24..4b71dae 100644 --- a/lib/kubetruth/etl.rb +++ b/lib/kubetruth/etl.rb @@ -11,9 +11,10 @@ class ETL # From kubernetes error message DNS_VALIDATION_RE = /^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$/ - def initialize(ct_context:, kube_context:) + def initialize(ct_context:, kube_context:, dry_run: false) @ct_context = ct_context @kube_context = kube_context + @dry_run = dry_run @kubeapis = {} end @@ -30,12 +31,60 @@ def kubeapi(namespace) @kubeapis[namespace] ||= KubeApi.new(**@kube_context.merge(namespace: namespace)) end + def interruptible_sleep(interval) + @sleeper = Thread.current + Kernel.sleep interval + end + + def interrupt_sleep + Thread.new { @sleeper&.run } + end + + def with_polling(interval, &block) + while true + + begin + watcher = kubeapi(@kube_context[:namespace]).watch_project_mappings + + begin + thr = Thread.new do + logger.debug "Created watcher for CRD" + watcher.each do |notice| + logger.debug {"Interrupting polling sleep, CRD watcher woke up for: #{notice}"} + interrupt_sleep + break + end + logger.debug "CRD watcher exiting" + end + + begin + block.call + rescue => e + logger.log_exception(e, "Failure while applying config transforms") + end + + logger.debug("Poller sleeping for #{interval}") + interruptible_sleep(interval) + ensure + watcher.finish + thr.join + end + + rescue => e + logger.log_exception(e, "Failure in watch/polling logic") + end + + end + end + def load_config mappings = kubeapi(@kube_context[:namespace]).get_project_mappings Kubetruth::Config.new(mappings) end - def apply(dry_run: false) + def apply + logger.warn("Performing dry-run") if @dry_run + config = load_config projects = ctapi.project_names @@ -99,9 +148,7 @@ def apply(dry_run: false) config_param_hash = params_to_hash(config_params) secret_param_hash = params_to_hash(secret_params) - if dry_run - logger.info("Performing dry-run") - + if @dry_run logger.info("Config maps that would be created are:") logger.info(config_params.pretty_print_inspect) diff --git a/lib/kubetruth/kubeapi.rb b/lib/kubetruth/kubeapi.rb index 54bffde..6882ec2 100644 --- a/lib/kubetruth/kubeapi.rb +++ b/lib/kubetruth/kubeapi.rb @@ -137,5 +137,12 @@ def delete_secret(name) def get_project_mappings crdclient.get_project_mappings(namespace: namespace).collect(&:spec).collect(&:to_h) end + + def watch_project_mappings(&block) + existing = crdclient.get_project_mappings(namespace: namespace) + collection_version = existing.resourceVersion + crdclient.watch_project_mappings(namespace: namespace, resource_version: collection_version, &block) + end + end end diff --git a/spec/kubetruth/cli_spec.rb b/spec/kubetruth/cli_spec.rb index 7309dce..14b0560 100644 --- a/spec/kubetruth/cli_spec.rb +++ b/spec/kubetruth/cli_spec.rb @@ -94,6 +94,7 @@ def all_usage(clazz, path=[]) --kube-token kt --kube-url ku --dry-run + --polling-interval 27 ] etl = double(ETL) expect(ETL).to receive(:new).with(ct_context: { @@ -105,19 +106,12 @@ def all_usage(clazz, path=[]) namespace: "kn", token: "kt", api_url: "ku" - }).and_return(etl) - expect(etl).to receive(:apply).with(dry_run: true) + }, + dry_run: true).and_return(etl) + expect(etl).to receive(:with_polling).with(27) cli.run(args) end - it "polls at interval" do - etl = double(ETL) - expect(ETL).to receive(:new).and_return(etl) - expect(etl).to receive(:apply) - expect(cli).to receive(:sleep).with(27).and_raise(SystemExit) - expect { cli.run(%w[--api-key abc123 --polling-interval 27]) }.to raise_error(SystemExit) - end - end end diff --git a/spec/kubetruth/etl_spec.rb b/spec/kubetruth/etl_spec.rb index be7bff7..43e297d 100644 --- a/spec/kubetruth/etl_spec.rb +++ b/spec/kubetruth/etl_spec.rb @@ -88,6 +88,103 @@ def kubeapi(ns) end + describe "#interruptible_sleep" do + + it "runs for interval without interruption" do + etl = described_class.new(init_args) + t = Time.now.to_f + etl.interruptible_sleep(0.2) + expect(Time.now.to_f - t).to be >= 0.2 + end + + it "can be interrupted" do + etl = described_class.new(init_args) + Thread.new do + sleep 0.1 + etl.interrupt_sleep + end + t = Time.now.to_f + etl.interruptible_sleep(0.5) + expect(Time.now.to_f - t).to be < 0.2 + end + + end + + describe "#with_polling" do + + class ForceExit < Exception; end + + it "runs with an interval" do + etl = described_class.new(init_args) + + watcher = double() + expect(@kubeapi).to receive(:watch_project_mappings).and_return(watcher).twice + expect(watcher).to receive(:each).twice + expect(watcher).to receive(:finish).twice + expect(etl).to receive(:apply).twice + + count = 0 + expect(etl).to receive(:interruptible_sleep). + with(0.2).twice { |m, *args| count += 1; raise ForceExit if count > 1 } + + begin + etl.with_polling(0.2) do + etl.apply + end + rescue ForceExit + end + expect(count).to eq(2) + + end + + it "isolates run loop from block failures" do + etl = described_class.new(init_args) + + watcher = double() + expect(@kubeapi).to receive(:watch_project_mappings).and_return(watcher).twice + expect(watcher).to receive(:each).twice + expect(watcher).to receive(:finish).twice + expect(etl).to receive(:apply).and_raise("fail").twice + + count = 0 + expect(etl).to receive(:interruptible_sleep). + with(0.2).twice { |m, *args| count += 1; raise ForceExit if count > 1 } + + begin + etl.with_polling(0.2) do + etl.apply + end + rescue ForceExit + end + expect(count).to eq(2) + + end + + it "interrupts sleep on watch event" do + etl = described_class.new(init_args) + + watcher = double() + notice = double("notice", type: "UPDATED", object: double("kube_resource")) + expect(@kubeapi).to receive(:watch_project_mappings).and_return(watcher) + expect(watcher).to receive(:each).and_yield(notice) + expect(watcher).to receive(:finish) + expect(etl).to receive(:apply) + expect(etl).to receive(:interrupt_sleep) + + expect(etl).to receive(:interruptible_sleep). + with(0.2) { |m, *args| sleep(0.2); raise ForceExit } + + begin + etl.with_polling(0.2) do + etl.apply + end + rescue ForceExit + end + + end + + end + describe "#load_config" do it "loads config" do @@ -371,12 +468,13 @@ def kubeapi(ns) Parameter.new(key: "param1", value: "value1", secret: false), Parameter.new(key: "param2", value: "value2", secret: true) ] + etl = described_class.new(init_args.merge(dry_run: true)) etl.load_config.root_spec.skip_secrets = true expect(etl.ctapi).to receive(:project_names).and_return(["default"]) expect(etl).to receive(:get_params).and_return(params) expect(etl).to_not receive(:apply_config_map) expect(etl).to_not receive(:apply_secret) - etl.apply(dry_run: true) + etl.apply expect(Logging.contents).to match("Performing dry-run") end diff --git a/spec/kubetruth/kubeapi_spec.rb b/spec/kubetruth/kubeapi_spec.rb index 2baa46b..b4113d3 100644 --- a/spec/kubetruth/kubeapi_spec.rb +++ b/spec/kubetruth/kubeapi_spec.rb @@ -282,6 +282,55 @@ def apiserver; "https://127.0.0.1"; end expect(crds.first.keys.sort).to eq(Kubetruth::Config::ProjectSpec.new.to_h.keys.sort) end + it "can watch project mappings" do + skip("only works when vcr/webmock disabled") + + test_mapping_name = "test-mapping-watch" + mapping_data = <<~EOF + apiVersion: kubetruth.cloudtruth.com/v1 + kind: ProjectMapping + metadata: + name: #{test_mapping_name} + spec: + scope: override + project_selector: "^notme$" + EOF + + # p kubeapi.crdclient.get_project_mappings(namespace: namespace).resourceVersion + # p kubeapi.crdclient.get_project_mappings(namespace: namespace).collect {|r| r.metadata.name } + # p kubeapi.get_project_mappings + + watcher = kubeapi.watch_project_mappings + begin + Thread.new do + watcher.each do |notice| + # p notice.type + # p notice.object.metadata.name + # p notice.object + expect(notice.object.metadata.name).to eq(test_mapping_name) + break + end + end + + sleep(1) + + # need an admin token for this to work or temporarily add to + # projectmappings permissions on installed role + resource = Kubeclient::Resource.new + resource.metadata = {} + resource.metadata.name = test_mapping_name + resource.metadata.namespace = namespace + resource.spec = {scope: "override", project_selector: "^notme$"} + kubeapi.crdclient.create_project_mapping(resource) + + # sysrun(%Q[minikube kubectl -- --namespace #{namespace} patch pm kubetruth-test-app-root --type json --patch '[{"op": "replace", "path": "/spec/included_projects", "value": ["Base"]}]']) + # sysrun(%Q[minikube kubectl -- --namespace #{namespace} apply -f -], stdin_data: mapping_data) + sleep(1) + ensure + watcher.finish + end + end + end end