diff --git a/workspaces/controller/Makefile b/workspaces/controller/Makefile index 6032c8c9..85f49ed7 100644 --- a/workspaces/controller/Makefile +++ b/workspaces/controller/Makefile @@ -1,5 +1,5 @@ # Image URL to use all building/pushing image targets -IMG ?= controller:latest +IMG ?= ghcr.io/kubeflow/notebooks/workspace-controller # ENVTEST_K8S_VERSION refers to the version of kubebuilder assets to be downloaded by envtest binary. ENVTEST_K8S_VERSION = 1.31.0 @@ -97,6 +97,9 @@ build: manifests generate fmt vet ## Build manager binary. run: manifests generate fmt vet ## Run a controller from your host. go run ./cmd/main.go +kind-load: + kind load docker-image ${IMG} -n kind + # If you wish to build the manager image targeting other platforms you can use the --platform flag. # (i.e. docker build --platform linux/arm64). However, you must enable docker buildKit for it. # More info: https://docs.docker.com/develop/develop-images/build_enhancements/ diff --git a/workspaces/controller/api/v1beta1/workspace_types.go b/workspaces/controller/api/v1beta1/workspace_types.go index 03e8a66a..5d8a0a74 100644 --- a/workspaces/controller/api/v1beta1/workspace_types.go +++ b/workspaces/controller/api/v1beta1/workspace_types.go @@ -36,6 +36,12 @@ type WorkspaceSpec struct { // +kubebuilder:default=false Paused *bool `json:"paused,omitempty"` + // DisableCulling controls whether automatic culling is disabled for the workspace. + // If true, the workspace will not be culled + // +kubebuilder:validation:Optional + // +kubebuilder:default=false + DisableCulling *bool `json:"disableCulling,omitempty"` + // if true, pending updates are NOT applied when the Workspace is paused // if false, pending updates are applied when the Workspace is paused // +kubebuilder:validation:Optional @@ -187,6 +193,9 @@ type WorkspaceActivity struct { // +kubebuilder:default=0 // +kubebuilder:example=1704067200 LastUpdate int64 `json:"lastUpdate"` + + // Information about the last activity probe + LastProbe ProbeStatus `json:"lastProbe"` } type WorkspacePodOptionsStatus struct { @@ -221,6 +230,30 @@ type WorkspacePodOptionRedirectStep struct { Target string `json:"target"` } +type ProbeStatus struct { + + // the time the probe was started (UNIX epoch in milliseconds) + // +kubebuilder:validation:Minimum=0 + // +kubebuilder:example=1710435303000 + StartTimeMs int64 `json:"startTimeMs"` + + // the time the probe was completed (UNIX epoch in milliseconds) + // +kubebuilder:validation:Minimum=0 + // +kubebuilder:example=1710435305000 + EndTimeMs int64 `json:"endTimeMs"` + + // the result of the probe + // ENUM: "Success" | "Failure" | "Timeout" | "" + // +kubebuilder:default="" + Result ProbeResult `json:"result"` + + // a human-readable message about the probe result + // WARNING: this field is NOT FOR MACHINE USE, subject to change without notice + // +kubebuilder:default="" + // +kubebuilder:example="Jupyter probe succeeded" + Message string `json:"message"` +} + // +kubebuilder:validation:Enum:={"Running","Terminating","Paused","Pending","Error","Unknown"} type WorkspaceState string @@ -233,6 +266,16 @@ const ( WorkspaceStateUnknown WorkspaceState = "Unknown" ) +// +kubebuilder:validation:Enum={"Success","Failure","Timeout",""} +type ProbeResult string + +const ( + ProbeResultSuccess ProbeResult = "Success" + ProbeResultFailure ProbeResult = "Failure" + ProbeResultTimeout ProbeResult = "Timeout" + ProbeResultUnknown ProbeResult = "Unknown" +) + /* =============================================================================== Workspace @@ -242,6 +285,7 @@ const ( // +kubebuilder:object:root=true // +kubebuilder:printcolumn:name="State",type="string",JSONPath=".status.state",description="The current state of the Workspace" // +kubebuilder:subresource:status +// +kubebuilder:resource:shortName=ws // Workspace is the Schema for the Workspaces API type Workspace struct { diff --git a/workspaces/controller/api/v1beta1/workspacekind_types.go b/workspaces/controller/api/v1beta1/workspacekind_types.go index 2d846237..c90904bb 100644 --- a/workspaces/controller/api/v1beta1/workspacekind_types.go +++ b/workspaces/controller/api/v1beta1/workspacekind_types.go @@ -184,6 +184,18 @@ type WorkspaceKindCullingConfig struct { // +kubebuilder:default=86400 MaxInactiveSeconds *int32 `json:"maxInactiveSeconds,omitempty"` + // the maximum number of seconds between probes + // +kubebuilder:validation:Optional + // +kubebuilder:validation:Minimum:=60 + // +kubebuilder:default=300 + MaxProbeIntervalSeconds *int32 `json:"maxProbeIntervalSeconds,omitempty"` + + // the minimum number of seconds between probes to avoid spamming in case on failure + // +kubebuilder:validation:Optional + // +kubebuilder:validation:Minimum:=10 + // +kubebuilder:default=20 + MinProbeIntervalSeconds *int32 `json:"minProbeIntervalSeconds,omitempty"` + // the probe used to determine if the Workspace is active ActivityProbe ActivityProbe `json:"activityProbe"` } @@ -205,10 +217,25 @@ type ActivityProbe struct { } type ActivityProbeExec struct { - // the command to run - // +kubebuilder:validation:MinItems:=1 - // +kubebuilder:example={"bash", "-c", "exit 0"} - Command []string `json:"command"` + // the script should write a JSON file at this path. + // any existing file in this path will be REMOVED before the script is run + // +kubebuilder:example="/tmp/activity_probe.json" + OutputPath string `json:"outputPath"` + + // the number of seconds to wait for the script to complete + // +kubebuilder:validation:Minimum:=1 + // +kubebuilder:validation:Maximum:=300 + // +kubebuilder:default=10 + TimeoutSeconds int32 `json:"timeoutSeconds"` + + // the script to run to determine if the Workspace is active + // - the script must exit with a 0 status code unless there is an error + // - workspaces with failing activity probes will NOT be culled + // - the script must have a shebang (e.g. `#!/usr/bin/env bash` or `#!/usr/bin/env python`) + // - the script should be idempotent and without side effects, it may be run multiple times + // - typically, it will be more efficient to write a probe which checks for a specific + // activity indicator agreed with your users, rather than checking the entire filesystem + Script string `json:"script"` } // +kubebuilder:validation:XValidation:message="'lastActivity' must be true",rule="has(self.lastActivity) && self.lastActivity" @@ -216,6 +243,9 @@ type ActivityProbeJupyter struct { // if the Jupyter-specific probe is enabled // +kubebuilder:example=true LastActivity bool `json:"lastActivity"` + + // The ID of the port used for probing Jupyter via HTTP requests. + PortId string `json:"portId"` } type WorkspaceKindProbes struct { @@ -547,7 +577,7 @@ type OptionMetric struct { // +kubebuilder:printcolumn:name="Deprecated",type="boolean",JSONPath=".spec.spawner.deprecated",description="If this WorkspaceKind is deprecated" // +kubebuilder:printcolumn:name="Hidden",type="boolean",JSONPath=".spec.spawner.hidden",description="If this WorkspaceKind is hidden from the spawner UI" // +kubebuilder:subresource:status -// +kubebuilder:resource:scope=Cluster +// +kubebuilder:resource:scope=Cluster,shortName=wsk // WorkspaceKind is the Schema for the WorkspaceKinds API type WorkspaceKind struct { diff --git a/workspaces/controller/api/v1beta1/zz_generated.deepcopy.go b/workspaces/controller/api/v1beta1/zz_generated.deepcopy.go index 1beab4fd..3b4fbb22 100644 --- a/workspaces/controller/api/v1beta1/zz_generated.deepcopy.go +++ b/workspaces/controller/api/v1beta1/zz_generated.deepcopy.go @@ -31,7 +31,7 @@ func (in *ActivityProbe) DeepCopyInto(out *ActivityProbe) { if in.Exec != nil { in, out := &in.Exec, &out.Exec *out = new(ActivityProbeExec) - (*in).DeepCopyInto(*out) + **out = **in } if in.Jupyter != nil { in, out := &in.Jupyter, &out.Jupyter @@ -53,11 +53,6 @@ func (in *ActivityProbe) DeepCopy() *ActivityProbe { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ActivityProbeExec) DeepCopyInto(out *ActivityProbeExec) { *out = *in - if in.Command != nil { - in, out := &in.Command, &out.Command - *out = make([]string, len(*in)) - copy(*out, *in) - } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ActivityProbeExec. @@ -453,6 +448,21 @@ func (in *PodVolumeMount) DeepCopy() *PodVolumeMount { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ProbeStatus) DeepCopyInto(out *ProbeStatus) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ProbeStatus. +func (in *ProbeStatus) DeepCopy() *ProbeStatus { + if in == nil { + return nil + } + out := new(ProbeStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RedirectMessage) DeepCopyInto(out *RedirectMessage) { *out = *in @@ -498,6 +508,7 @@ func (in *Workspace) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *WorkspaceActivity) DeepCopyInto(out *WorkspaceActivity) { *out = *in + out.LastProbe = in.LastProbe } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkspaceActivity. @@ -565,6 +576,16 @@ func (in *WorkspaceKindCullingConfig) DeepCopyInto(out *WorkspaceKindCullingConf *out = new(int32) **out = **in } + if in.MaxProbeIntervalSeconds != nil { + in, out := &in.MaxProbeIntervalSeconds, &out.MaxProbeIntervalSeconds + *out = new(int32) + **out = **in + } + if in.MinProbeIntervalSeconds != nil { + in, out := &in.MinProbeIntervalSeconds, &out.MinProbeIntervalSeconds + *out = new(int32) + **out = **in + } in.ActivityProbe.DeepCopyInto(&out.ActivityProbe) } @@ -1060,6 +1081,11 @@ func (in *WorkspaceSpec) DeepCopyInto(out *WorkspaceSpec) { *out = new(bool) **out = **in } + if in.DisableCulling != nil { + in, out := &in.DisableCulling, &out.DisableCulling + *out = new(bool) + **out = **in + } if in.DeferUpdates != nil { in, out := &in.DeferUpdates, &out.DeferUpdates *out = new(bool) diff --git a/workspaces/controller/cmd/main.go b/workspaces/controller/cmd/main.go index ae09c2ed..9a14355e 100644 --- a/workspaces/controller/cmd/main.go +++ b/workspaces/controller/cmd/main.go @@ -21,6 +21,8 @@ import ( "flag" "os" + "k8s.io/client-go/kubernetes" + // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. _ "k8s.io/client-go/plugin/pkg/client/auth" @@ -147,6 +149,20 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "WorkspaceKind") os.Exit(1) } + clientset, err := kubernetes.NewForConfig(mgr.GetConfig()) + if err != nil { + setupLog.Error(err, "unable to create clientset") + os.Exit(1) + } + if err = (&controllerInternal.CullingReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Config: mgr.GetConfig(), + ClientSet: clientset, + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "Culler") + os.Exit(1) + } // +kubebuilder:scaffold:builder if os.Getenv("ENABLE_WEBHOOKS") != "false" { diff --git a/workspaces/controller/config/crd/bases/kubeflow.org_workspacekinds.yaml b/workspaces/controller/config/crd/bases/kubeflow.org_workspacekinds.yaml index 9fea4463..4a83ade3 100644 --- a/workspaces/controller/config/crd/bases/kubeflow.org_workspacekinds.yaml +++ b/workspaces/controller/config/crd/bases/kubeflow.org_workspacekinds.yaml @@ -11,6 +11,8 @@ spec: kind: WorkspaceKind listKind: WorkspaceKindList plural: workspacekinds + shortNames: + - wsk singular: workspacekind scope: Cluster versions: @@ -261,18 +263,34 @@ spec: - if the Workspace had activity in the last 60 seconds this command should return status 0, otherwise it should return status 1 properties: - command: - description: the command to run - example: - - bash - - -c - - exit 0 - items: - type: string - minItems: 1 - type: array + outputPath: + description: |- + the script should write a JSON file at this path. + any existing file in this path will be REMOVED before the script is run + example: /tmp/activity_probe.json + type: string + script: + description: |- + the script to run to determine if the Workspace is active + - the script must exit with a 0 status code unless there is an error + - workspaces with failing activity probes will NOT be culled + - the script must have a shebang (e.g. `#!/usr/bin/env bash` or `#!/usr/bin/env python`) + - the script should be idempotent and without side effects, it may be run multiple times + - typically, it will be more efficient to write a probe which checks for a specific + activity indicator agreed with your users, rather than checking the entire filesystem + type: string + timeoutSeconds: + default: 10 + description: the number of seconds to wait for the + script to complete + format: int32 + maximum: 300 + minimum: 1 + type: integer required: - - command + - outputPath + - script + - timeoutSeconds type: object jupyter: description: |- @@ -285,8 +303,13 @@ spec: description: if the Jupyter-specific probe is enabled example: true type: boolean + portId: + description: The ID of the port used for probing Jupyter + via HTTP requests. + type: string required: - lastActivity + - portId type: object x-kubernetes-validations: - message: '''lastActivity'' must be true' @@ -307,6 +330,19 @@ spec: format: int32 minimum: 60 type: integer + maxProbeIntervalSeconds: + default: 300 + description: the maximum number of seconds between probes + format: int32 + minimum: 60 + type: integer + minProbeIntervalSeconds: + default: 20 + description: the minimum number of seconds between probes + to avoid spamming in case on failure + format: int32 + minimum: 10 + type: integer required: - activityProbe type: object diff --git a/workspaces/controller/config/crd/bases/kubeflow.org_workspaces.yaml b/workspaces/controller/config/crd/bases/kubeflow.org_workspaces.yaml index c66f269d..8f869fa3 100644 --- a/workspaces/controller/config/crd/bases/kubeflow.org_workspaces.yaml +++ b/workspaces/controller/config/crd/bases/kubeflow.org_workspaces.yaml @@ -11,6 +11,8 @@ spec: kind: Workspace listKind: WorkspaceList plural: workspaces + shortNames: + - ws singular: workspace scope: Namespaced versions: @@ -50,6 +52,12 @@ spec: if true, pending updates are NOT applied when the Workspace is paused if false, pending updates are applied when the Workspace is paused type: boolean + disableCulling: + default: false + description: |- + DisableCulling controls whether automatic culling is disabled for the workspace. + If true, the workspace will not be culled + type: boolean kind: description: the WorkspaceKind to use example: jupyterlab @@ -179,6 +187,47 @@ spec: example: 1704067200 format: int64 type: integer + lastProbe: + description: Information about the last activity probe + properties: + endTimeMs: + description: the time the probe was completed (UNIX epoch + in milliseconds) + example: 1710435305000 + format: int64 + minimum: 0 + type: integer + message: + default: "" + description: |- + a human-readable message about the probe result + WARNING: this field is NOT FOR MACHINE USE, subject to change without notice + example: Jupyter probe succeeded + type: string + result: + default: "" + description: |- + the result of the probe + ENUM: "Success" | "Failure" | "Timeout" | "" + enum: + - Success + - Failure + - Timeout + - "" + type: string + startTimeMs: + description: the time the probe was started (UNIX epoch in + milliseconds) + example: 1710435303000 + format: int64 + minimum: 0 + type: integer + required: + - endTimeMs + - message + - result + - startTimeMs + type: object lastUpdate: default: 0 description: the last time we checked for activity on the Workspace @@ -188,6 +237,7 @@ spec: type: integer required: - lastActivity + - lastProbe - lastUpdate type: object pauseTime: diff --git a/workspaces/controller/config/manager/manager.yaml b/workspaces/controller/config/manager/manager.yaml index 1e6d6609..15a21e4f 100644 --- a/workspaces/controller/config/manager/manager.yaml +++ b/workspaces/controller/config/manager/manager.yaml @@ -67,6 +67,9 @@ spec: image: controller:latest imagePullPolicy: IfNotPresent name: manager + env: + - name: HTTP_TIMEOUT_SECONDS + value: "5" securityContext: allowPrivilegeEscalation: false capabilities: diff --git a/workspaces/controller/config/rbac/role.yaml b/workspaces/controller/config/rbac/role.yaml index cedd310e..fc1e8552 100644 --- a/workspaces/controller/config/rbac/role.yaml +++ b/workspaces/controller/config/rbac/role.yaml @@ -14,6 +14,12 @@ rules: - get - list - watch +- apiGroups: + - "" + resources: + - pods/exec + verbs: + - create - apiGroups: - "" resources: diff --git a/workspaces/controller/config/samples/jupyterlab_v1beta1_workspace.yaml b/workspaces/controller/config/samples/jupyterlab_v1beta1_workspace.yaml index 1c8e076d..f75d3fb7 100644 --- a/workspaces/controller/config/samples/jupyterlab_v1beta1_workspace.yaml +++ b/workspaces/controller/config/samples/jupyterlab_v1beta1_workspace.yaml @@ -10,6 +10,8 @@ spec: ## if false, pending updates are applied when the Workspace is paused deferUpdates: false + disableCulling: false + ## the WorkspaceKind to use kind: "jupyterlab" diff --git a/workspaces/controller/config/samples/jupyterlab_v1beta1_workspacekind.yaml b/workspaces/controller/config/samples/jupyterlab_v1beta1_workspacekind.yaml index 71ed533b..6cf3763c 100644 --- a/workspaces/controller/config/samples/jupyterlab_v1beta1_workspacekind.yaml +++ b/workspaces/controller/config/samples/jupyterlab_v1beta1_workspacekind.yaml @@ -81,7 +81,15 @@ spec: ## the maximum number of seconds a Workspace can be inactive ## - maxInactiveSeconds: 86400 + maxInactiveSeconds: 100 + + ## the maximum number of seconds between probes + ## + maxProbeIntervalSeconds: 60 + + ## the minimum number of seconds between probes + ## + minProbeIntervalSeconds: 10 ## the probe used to determine if the Workspace is active ## @@ -91,20 +99,38 @@ spec: ## - if the Workspace had activity in the last 60 seconds this command ## should return status 0, otherwise it should return status 1 ## - #exec: - # command: - # - "bash" - # - "-c" - # - "exit 0" - +# exec: +# outputPath: "/tmp/activity_probe.json" +# timeoutSeconds: 60 +# script: |- +# #!/usr/bin/env bash +# +# set -euo pipefail +# +# # Define the output path +# output_path="/tmp/activity_probe.json" +# +# # Find the most recent modification time in the $HOME directory +# last_activity_epoch=$(find "$HOME" -type f -printf '%T@\n' 2>/dev/null | awk 'max < $1 { max = $1 } END { print max }') +# +# # Write the last activity time to the output path +# if [ -n "$last_activity_epoch" ]; then +# # Convert epoch time to ISO 8601 format +# last_activity=$(date -d "@$last_activity_epoch" -Iseconds) +# echo "{\"last_activity\": \"$last_activity\"}" > "$output_path" +# else +# # Handle the case where no files are found +# echo "{\"last_activity\": null}" > "$output_path" +# fi ## OPTION 2: a Jupyter-specific probe ## - will poll the `/api/status` endpoint of the Jupyter API, and use the `last_activity` field ## https://github.com/jupyter-server/jupyter_server/blob/v2.13.0/jupyter_server/services/api/handlers.py#L62-L67 ## - note, users need to be careful that their other probes don't trigger a "last_activity" update ## e.g. they should only check the health of Jupyter using the `/api/status` endpoint - ## + # jupyter: lastActivity: true + portId: jupyterlab ## standard probes to determine Container health (MUTABLE) ## - spec for Probe: diff --git a/workspaces/controller/go.mod b/workspaces/controller/go.mod index 8e1b5b5b..8f2b6c3f 100644 --- a/workspaces/controller/go.mod +++ b/workspaces/controller/go.mod @@ -34,13 +34,16 @@ require ( github.com/google/gofuzz v1.2.0 // indirect github.com/google/pprof v0.0.0-20240525223248-4bfdf5a9a2af // indirect github.com/google/uuid v1.6.0 // indirect + github.com/gorilla/websocket v1.5.0 // indirect github.com/imdario/mergo v0.3.6 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/mailru/easyjson v0.7.7 // indirect + github.com/moby/spdystream v0.4.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.19.1 // indirect github.com/prometheus/client_model v0.6.1 // indirect diff --git a/workspaces/controller/go.sum b/workspaces/controller/go.sum index 8496f957..6d9b94f8 100644 --- a/workspaces/controller/go.sum +++ b/workspaces/controller/go.sum @@ -1,3 +1,5 @@ +github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= +github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -48,6 +50,8 @@ github.com/google/pprof v0.0.0-20240525223248-4bfdf5a9a2af h1:kmjWCqn2qkEml422C2 github.com/google/pprof v0.0.0-20240525223248-4bfdf5a9a2af/go.mod h1:K1liHPHnj73Fdn/EKuT8nrFqBihUSKXoLYU0BuatOYo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28= github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= @@ -65,6 +69,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/moby/spdystream v0.4.0 h1:Vy79D6mHeJJjiPdFEL2yku1kl0chZpJfZcPpb16BRl8= +github.com/moby/spdystream v0.4.0/go.mod h1:xBAYlnt/ay+11ShkdFKNAG7LsyK/tmNBVvVOwrfMgdI= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -72,6 +78,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus= +github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA= github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk= diff --git a/workspaces/controller/internal/controller/culling_controller.go b/workspaces/controller/internal/controller/culling_controller.go new file mode 100644 index 00000000..f30303d2 --- /dev/null +++ b/workspaces/controller/internal/controller/culling_controller.go @@ -0,0 +1,554 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "net" + "net/http" + "net/url" + "os" + "strconv" + "strings" + "time" + + "github.com/go-logr/logr" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/httpstream" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/remotecommand" + "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + + kubefloworgv1beta1 "github.com/kubeflow/notebooks/workspaces/controller/api/v1beta1" + "github.com/kubeflow/notebooks/workspaces/controller/internal/helper" +) + +const ( + defaultClusterDomain = "cluster.local" + inactivityToleranceBufferSeconds = 5 + defaultHTTPTimeout = 5 * time.Second +) + +// CullingReconciler reconciles a Workspace object +type CullingReconciler struct { + client.Client + Scheme *runtime.Scheme + ClientSet *kubernetes.Clientset + Config *rest.Config +} + +type ActivityProbe struct { + HasActivity *bool `json:"has_activity,omitempty"` + LastActivity *string `json:"last_activity,omitempty"` +} + +// +kubebuilder:rbac:groups="core",resources=pods/exec,verbs=create + +func (r *CullingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { //nolint:gocyclo + log := log.FromContext(ctx) + log.V(2).Info("reconciling Workspace for culling") + // fetch the Workspace + workspace := &kubefloworgv1beta1.Workspace{} + if err := r.Get(ctx, req.NamespacedName, workspace); err != nil { + if client.IgnoreNotFound(err) == nil { + // Request object not found, could have been deleted after reconcile request. + // Owned objects are automatically garbage collected. + // For additional cleanup logic use finalizers. + // Return and don't requeue. + return ctrl.Result{}, nil + } + log.Error(err, "unable to fetch Workspace") + return ctrl.Result{}, err + } + if !workspace.GetDeletionTimestamp().IsZero() { + log.V(2).Info("Workspace is being deleted, skipping culling") + return ctrl.Result{}, nil + } + if workspace.Spec.DisableCulling != nil && *workspace.Spec.DisableCulling { + log.V(2).Info("Culling is disabled for this workspace", "DisableCulling", *workspace.Spec.DisableCulling) + return ctrl.Result{}, nil + } + + if *workspace.Spec.Paused { + log.V(2).Info("Workspace is paused, skipping culling") + return ctrl.Result{}, nil + } + + if workspace.Status.State != kubefloworgv1beta1.WorkspaceStateRunning { + log.V(2).Info("Workspace is not running, skipping culling") + return ctrl.Result{}, nil + } + + workspaceKindName := workspace.Spec.Kind + log = log.WithValues("workspaceKind", workspaceKindName) + workspaceKind := &kubefloworgv1beta1.WorkspaceKind{} + if err := r.Get(ctx, client.ObjectKey{Name: workspaceKindName}, workspaceKind); err != nil { + if apierrors.IsNotFound(err) { + log.V(0).Info("Workspace references unknown WorkspaceKind") + return ctrl.Result{}, err + } + log.Error(err, "unable to fetch WorkspaceKind for Workspace") + return ctrl.Result{}, err + } + + if !*workspaceKind.Spec.PodTemplate.Culling.Enabled { + log.Info("culling is disabled for this workspace kind") + return ctrl.Result{}, nil + } + + // Fetch the last activity, update and probe times from the Workspace status + lastActivityTime := time.Unix(workspace.Status.Activity.LastActivity, 0) + lastUpdateTime := time.Unix(workspace.Status.Activity.LastUpdate, 0) + lastProbeTime := time.Unix(workspace.Status.Activity.LastProbe.EndTimeMs/1000, 0) + + // Fetch the culling configuration from the WorkspaceKind spec + maxInactiveSeconds := *workspaceKind.Spec.PodTemplate.Culling.MaxInactiveSeconds + maxProbeIntervalSeconds := *workspaceKind.Spec.PodTemplate.Culling.MaxProbeIntervalSeconds + minProbeIntervalSeconds := *workspaceKind.Spec.PodTemplate.Culling.MinProbeIntervalSeconds + + // Calculate time since the last activity, the last update and the last probe + timeSinceLastActivity := time.Since(lastActivityTime).Seconds() + timeSinceLastUpdate := time.Since(lastUpdateTime).Seconds() + timeSinceLastProbe := time.Since(lastProbeTime).Seconds() + + // Calculate the requeue time for the next probe + minRequeueAfter := time.Duration(minProbeIntervalSeconds) * time.Second + requeueAfter := max(time.Duration(float64(maxProbeIntervalSeconds)-timeSinceLastProbe)*time.Second, minRequeueAfter) + log.Info("requesting requeue", "requeueAfter", requeueAfter, "minRequeueAfter", minRequeueAfter) + // if the workspace has been probed recently, requeue for the next probe + if timeSinceLastProbe < float64(minProbeIntervalSeconds) { + log.V(2).Info("Workspace has been probed recently, requeueing for the next probe.", + "MinProbeIntervalSeconds", minProbeIntervalSeconds, + "TimeSinceLastProbe", timeSinceLastProbe) + return ctrl.Result{RequeueAfter: requeueAfter}, nil + } + + // If the workspace has been active recently, requeue for the next probe + if timeSinceLastActivity < float64(maxInactiveSeconds) { + log.V(2).Info("Workspace activity is within the allowed period, requeueing for the next probe.", + "MaxInactiveSeconds", maxInactiveSeconds, + "TimeSinceLastActivity", timeSinceLastActivity, "requeueAfter", requeueAfter) + return ctrl.Result{RequeueAfter: requeueAfter}, nil + } + // If the workspace was updated recently, requeue for the next probe + if timeSinceLastUpdate < float64(maxProbeIntervalSeconds) { + log.V(2).Info("Workspace has been updated recently, requeueing for the next probe.", + "MinProbeIntervalSeconds", maxProbeIntervalSeconds, + "TimeSinceLastUpdate", timeSinceLastUpdate) + return ctrl.Result{RequeueAfter: requeueAfter}, nil + } + + // Check if JupyterLab API probing is enabled + if workspaceKind.Spec.PodTemplate.Culling.ActivityProbe.Jupyter != nil { + probeStartTime := time.Now() + serviceName, err := r.getServiceName(ctx, workspace) + if err != nil { + log.Error(err, "Error fetching service name for workspace") + return r.updateWorkspaceActivityStatus(ctx, log, workspace, &minRequeueAfter, &kubefloworgv1beta1.ProbeStatus{ + StartTimeMs: probeStartTime.UnixMilli(), + EndTimeMs: time.Now().UnixMilli(), + Result: kubefloworgv1beta1.ProbeResultFailure, + Message: "Failed to fetch service name for workspace", + }, nil, nil) + } + port, err := r.getWorkspacePort(workspace, workspaceKind) + if err != nil { + log.Error(err, "Error fetching port for workspace") + return r.updateWorkspaceActivityStatus(ctx, log, workspace, &minRequeueAfter, &kubefloworgv1beta1.ProbeStatus{ + StartTimeMs: probeStartTime.UnixMilli(), + EndTimeMs: time.Now().UnixMilli(), + Result: kubefloworgv1beta1.ProbeResultFailure, + Message: "Failed to fetch port for workspace", + }, nil, nil) + } + jupyterAPIEndpoint := fmt.Sprintf("http://%s.%s.svc.%s:%d/workspace/%s/%s/jupyterlab/api/status", serviceName, workspace.Namespace, defaultClusterDomain, port, workspace.Namespace, workspace.Name) + + lastActivity, err, probeMessage, probeResult := fetchLastActivityFromJupyterAPI(jupyterAPIEndpoint) + if err != nil { + log.Error(err, "Error fetching last activity from JupyterLab API") + return r.updateWorkspaceActivityStatus(ctx, log, workspace, &minRequeueAfter, &kubefloworgv1beta1.ProbeStatus{ + StartTimeMs: probeStartTime.UnixMilli(), + EndTimeMs: time.Now().UnixMilli(), + Result: probeResult, + Message: probeMessage, + }, nil, nil) + } + + // If the workspace has been inactive for too long, initiate culling + if time.Since(*lastActivity).Seconds() > float64(maxInactiveSeconds+inactivityToleranceBufferSeconds) { + log.V(2).Info("Culling the workspace due to inactivity", "TimeSinceLastActivity", time.Since(*lastActivity).Seconds()) + workspace.Spec.Paused = ptr.To(true) + err := r.Update(ctx, workspace) + if err != nil { + log.Error(err, "Error updating workspace during culling") + return r.updateWorkspaceActivityStatus(ctx, log, workspace, &requeueAfter, &kubefloworgv1beta1.ProbeStatus{ + StartTimeMs: probeStartTime.UnixMilli(), + EndTimeMs: time.Now().UnixMilli(), + Result: kubefloworgv1beta1.ProbeResultFailure, + Message: "Failed to pause workspace", + }, nil, nil) + } + } + return r.updateWorkspaceActivityStatus(ctx, log, workspace, &requeueAfter, &kubefloworgv1beta1.ProbeStatus{ + StartTimeMs: probeStartTime.UnixMilli(), + EndTimeMs: time.Now().UnixMilli(), + Result: probeResult, + Message: probeMessage, + }, ptr.To(probeStartTime.Unix()), ptr.To(lastActivity.Unix())) + } + + // Check if Bash probing is enabled + if workspaceKind.Spec.PodTemplate.Culling.ActivityProbe.Exec != nil { + probeStartTime := time.Now() + podName, err := r.getPodName(ctx, workspace) + if err != nil { + log.Error(err, "Error fetching pod name for workspace") + return r.updateWorkspaceActivityStatus(ctx, log, workspace, &minRequeueAfter, &kubefloworgv1beta1.ProbeStatus{ + StartTimeMs: probeStartTime.UnixMilli(), + EndTimeMs: time.Now().UnixMilli(), + Result: kubefloworgv1beta1.ProbeResultFailure, + Message: "Failed to fetch pod name for workspace", + }, nil, nil) + } + stdout, stderr, err := r.execCommand(ctx, podName, workspace.Namespace, workspaceKind.Spec.PodTemplate.Culling.ActivityProbe.Exec) + if err != nil { + log.Error(err, "Error executing command probe", "stderr", stderr) + return r.updateWorkspaceActivityStatus(ctx, log, workspace, &requeueAfter, &kubefloworgv1beta1.ProbeStatus{ + StartTimeMs: probeStartTime.UnixMilli(), + EndTimeMs: time.Now().UnixMilli(), + Result: kubefloworgv1beta1.ProbeResultFailure, + Message: "Failed to execute command probe", + }, nil, nil) + + } + + // handle the probe result + activityProbe, err := parseActivityProbeJson(stdout) + if err != nil { + log.Error(err, "Error parsing activity probe JSON") + return r.updateWorkspaceActivityStatus(ctx, log, workspace, &requeueAfter, &kubefloworgv1beta1.ProbeStatus{ + StartTimeMs: probeStartTime.UnixMilli(), + EndTimeMs: time.Now().UnixMilli(), + Result: kubefloworgv1beta1.ProbeResultFailure, + Message: "Failed to parse activity probe JSON", + }, nil, nil) + } + lastActivity := time.Now().Unix() + if activityProbe.HasActivity != nil && !*activityProbe.HasActivity { + log.V(2).Info("Culling the workspace due to inactivity") + // TODO: figure out how to set the last activity time + lastActivity = time.Now().Unix() + workspace.Spec.Paused = ptr.To(true) + err := r.Update(ctx, workspace) + if err != nil { + log.Error(err, "Error updating workspace during culling") + return r.updateWorkspaceActivityStatus(ctx, log, workspace, &requeueAfter, &kubefloworgv1beta1.ProbeStatus{ + StartTimeMs: probeStartTime.UnixMilli(), + EndTimeMs: time.Now().UnixMilli(), + Result: kubefloworgv1beta1.ProbeResultFailure, + Message: "Failed to update workspace during culling", + }, nil, nil) + } + + } + if activityProbe.HasActivity == nil && activityProbe.LastActivity != nil { + lastActivityTime, err = time.Parse(time.RFC3339, *activityProbe.LastActivity) + if err != nil { + log.Error(err, "Error parsing last activity time") + return r.updateWorkspaceActivityStatus(ctx, log, workspace, &requeueAfter, &kubefloworgv1beta1.ProbeStatus{ + StartTimeMs: probeStartTime.UnixMilli(), + EndTimeMs: time.Now().UnixMilli(), + Result: kubefloworgv1beta1.ProbeResultFailure, + Message: "Failed to parse last activity time", + }, nil, nil) + } + lastActivity = lastActivityTime.Unix() + if time.Since(lastActivityTime).Seconds() > float64(maxInactiveSeconds+inactivityToleranceBufferSeconds) { + log.V(2).Info("Culling the workspace due to inactivity", "TimeSinceLastActivity", time.Since(lastActivityTime).Seconds()) + workspace.Spec.Paused = ptr.To(true) + err := r.Update(ctx, workspace) + if err != nil { + log.Error(err, "Error updating workspace during culling") + return r.updateWorkspaceActivityStatus(ctx, log, workspace, &requeueAfter, &kubefloworgv1beta1.ProbeStatus{ + StartTimeMs: probeStartTime.UnixMilli(), + EndTimeMs: time.Now().UnixMilli(), + Result: kubefloworgv1beta1.ProbeResultFailure, + Message: "Failed to update workspace during culling", + }, nil, nil) + } + } + } + return r.updateWorkspaceActivityStatus(ctx, log, workspace, &requeueAfter, &kubefloworgv1beta1.ProbeStatus{ + StartTimeMs: probeStartTime.UnixMilli(), + EndTimeMs: time.Now().UnixMilli(), + Result: kubefloworgv1beta1.ProbeResultSuccess, + Message: "Bash probe succeeded", + }, ptr.To(probeStartTime.Unix()), ptr.To(lastActivity)) + } + + log.Info("culling controller finished") + return ctrl.Result{RequeueAfter: requeueAfter}, nil +} + +// SetupWithManager sets up the controller with the Manager. +func (r *CullingReconciler) SetupWithManager(mgr ctrl.Manager) error { + + return ctrl.NewControllerManagedBy(mgr). + For(&kubefloworgv1beta1.Workspace{}).Named("culling_controller"). + Complete(r) +} + +// updateWorkspaceActivityStatus attempts to immediately update the Workspace activity status with the provided status. +func (r *CullingReconciler) updateWorkspaceActivityStatus(ctx context.Context, log logr.Logger, workspace *kubefloworgv1beta1.Workspace, requeueAfter *time.Duration, probeStatus *kubefloworgv1beta1.ProbeStatus, lastUpdate, lastActivity *int64) (ctrl.Result, error) { + if workspace == nil { + return ctrl.Result{}, fmt.Errorf("provided Workspace was nil") + } + if lastUpdate != nil { + workspace.Status.Activity.LastUpdate = *lastUpdate + } + if lastActivity != nil { + workspace.Status.Activity.LastActivity = *lastActivity + } + if probeStatus != nil { + workspace.Status.Activity.LastProbe = *probeStatus + } + if err := r.Status().Update(ctx, workspace); err != nil { + if apierrors.IsConflict(err) { + log.V(2).Info("update conflict while updating Workspace status, will requeue") + return ctrl.Result{Requeue: true}, nil + } + log.Error(err, "unable to update Workspace status") + return ctrl.Result{}, err + } + if requeueAfter != nil { + return ctrl.Result{RequeueAfter: *requeueAfter}, nil + } + + return ctrl.Result{}, nil +} + +func (r *CullingReconciler) getServiceName(ctx context.Context, workspace *kubefloworgv1beta1.Workspace) (string, error) { + ownedServices := &corev1.ServiceList{} + listOpts := &client.ListOptions{ + FieldSelector: fields.OneTermEqualSelector(helper.IndexWorkspaceOwnerField, workspace.Name), + Namespace: workspace.Namespace, + } + + // List services owned by the workspace + if err := r.List(ctx, ownedServices, listOpts); err != nil { + return "", err + } + + // Check the number of owned services + if len(ownedServices.Items) > 1 { + serviceList := make([]string, len(ownedServices.Items)) + for i, svc := range ownedServices.Items { + serviceList[i] = svc.Name + } + serviceListString := strings.Join(serviceList, ", ") + return "", fmt.Errorf("workspace owns multiple Services: %s", serviceListString) + + } else if len(ownedServices.Items) == 0 { + return "", errors.New("workspace does not own any Service") + } + + // Return the single found service name + return ownedServices.Items[0].Name, nil +} + +func (r *CullingReconciler) getWorkspacePort(workspace *kubefloworgv1beta1.Workspace, workspaceKind *kubefloworgv1beta1.WorkspaceKind) (int32, error) { + for _, imageConfigValue := range workspaceKind.Spec.PodTemplate.Options.ImageConfig.Values { + if imageConfigValue.Id == workspace.Spec.PodTemplate.Options.ImageConfig { + for _, port := range imageConfigValue.Spec.Ports { + if port.Id == workspaceKind.Spec.PodTemplate.Culling.ActivityProbe.Jupyter.PortId { + return port.Port, nil + } + } + } + } + return 0, errors.New("port not found") +} + +func (r *CullingReconciler) getPodName(ctx context.Context, workspace *kubefloworgv1beta1.Workspace) (string, error) { + var statefulSetName string + ownedStatefulSets := &appsv1.StatefulSetList{} + listOpts := &client.ListOptions{ + FieldSelector: fields.OneTermEqualSelector(helper.IndexWorkspaceOwnerField, workspace.Name), + Namespace: workspace.Namespace, + } + if err := r.List(ctx, ownedStatefulSets, listOpts); err != nil { + return "", err + } + + // reconcile StatefulSet + if len(ownedStatefulSets.Items) > 1 { + statefulSetList := make([]string, len(ownedStatefulSets.Items)) + for i, sts := range ownedStatefulSets.Items { + statefulSetList[i] = sts.Name + } + statefulSetListString := strings.Join(statefulSetList, ", ") + return "", fmt.Errorf("workspace owns multiple StatefulSets: %s", statefulSetListString) + } else if len(ownedStatefulSets.Items) == 0 { + return "", errors.New("workspace does not own any StatefulSet") + } + + statefulSetName = ownedStatefulSets.Items[0].Name + podName := fmt.Sprintf("%s-0", statefulSetName) + return podName, nil +} + +func (r *CullingReconciler) execCommand(ctx context.Context, podName, podNamespace string, exec *kubefloworgv1beta1.ActivityProbeExec) (string, string, error) { + timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(exec.TimeoutSeconds)*time.Second) + defer cancel() + + command := fmt.Sprintf(` + rm -f %s + %s + cat %s + `, exec.OutputPath, exec.Script, exec.OutputPath) + + req := r.ClientSet.CoreV1().RESTClient(). + Post(). + Resource("pods"). + Name(podName). + Namespace(podNamespace). + SubResource("exec"). + VersionedParams(&corev1.PodExecOptions{ + Container: "main", + Command: []string{"bash", "-c", command}, + Stdin: false, + Stdout: true, + Stderr: true, + TTY: false, + }, scheme.ParameterCodec) + + executor, err := createExecutor(req.URL(), r.Config) + if err != nil { + return "", "", fmt.Errorf("error creating executor: %w", err) + } + + var stdout, stderr bytes.Buffer + err = executor.StreamWithContext(timeoutCtx, remotecommand.StreamOptions{ + Stdin: nil, + Stdout: &stdout, + Stderr: &stderr, + Tty: false, + }) + + return stdout.String(), stderr.String(), err +} + +// fetchLastActivityFromJupyterAPI queries the JupyterLab API for the last activity time. +func fetchLastActivityFromJupyterAPI(apiEndpoint string) (*time.Time, error, string, kubefloworgv1beta1.ProbeResult) { + httpTimeoutSeconds := defaultHTTPTimeout + if timeout, err := strconv.Atoi(os.Getenv("HTTP_TIMEOUT_SECONDS")); err == nil && timeout > 0 { + httpTimeoutSeconds = time.Duration(timeout) * time.Second + } + httpClient := &http.Client{Timeout: httpTimeoutSeconds} + resp, err := httpClient.Get(apiEndpoint) + var netErr net.Error + if err != nil { + if errors.As(err, &netErr) && netErr.Timeout() { + return nil, fmt.Errorf("JupyterLab API request timed out: %w", err), + "JupyterLab API request timeout", kubefloworgv1beta1.ProbeResultTimeout + } else { + return nil, fmt.Errorf("JupyterLab API request failed: %w", err), + "Jupyter probe failed", kubefloworgv1beta1.ProbeResultFailure + } + } + // Check if the API returned a 200-OK status + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("JupyterLab API returned non-200 status: %d", resp.StatusCode), + fmt.Sprintf("Jupyter probe failed: HTTP %d", resp.StatusCode), kubefloworgv1beta1.ProbeResultFailure + } + + // Decode the API response to extract the last activity time + var status struct { + LastActivity string `json:"last_activity"` + } + + defer resp.Body.Close() //nolint:errcheck + if err := json.NewDecoder(resp.Body).Decode(&status); err != nil { + return nil, fmt.Errorf("failed to parse JupyterLab API response: %w", err), + "Jupyter probe failed: invalid response body", kubefloworgv1beta1.ProbeResultFailure + } + + // Parse the last activity time from the response + lastActivity, err := time.Parse(time.RFC3339, status.LastActivity) + if err != nil { + return nil, fmt.Errorf("failed to parse last activity time: %w", err), + "Jupyter probe failed: invalid last activity time", kubefloworgv1beta1.ProbeResultFailure + } + + return &lastActivity, nil, "Jupyter probe succeeded", kubefloworgv1beta1.ProbeResultSuccess +} + +// createExecutor creates a new Executor for the given URL and REST config. +func createExecutor(requestUrl *url.URL, config *rest.Config) (remotecommand.Executor, error) { + exec, err := remotecommand.NewSPDYExecutor(config, "POST", requestUrl) + if err != nil { + return nil, err + } + // WebSocketExecutor must be "GET" method as described in RFC 6455 Sec. 4.1 (page 17). + websocketExec, err := remotecommand.NewWebSocketExecutor(config, "GET", requestUrl.String()) + if err != nil { + return nil, err + } + exec, err = remotecommand.NewFallbackExecutor(websocketExec, exec, func(err error) bool { + return httpstream.IsUpgradeFailure(err) || isHTTPSProxyError(err) + }) + if err != nil { + return nil, err + } + + return exec, nil +} + +// isHTTPSProxyError checks if the given error is due to an unknown scheme in the proxy. +func isHTTPSProxyError(err error) bool { + if err == nil { + return false + } + return strings.Contains(err.Error(), "proxy: unknown scheme: https") +} + +// parseActivityProbeJson parses the JSON string into an ActivityProbe struct and ensures +// that at least has_activity or last_activity fields are present. +func parseActivityProbeJson(jsonString string) (*ActivityProbe, error) { + activityProbe := &ActivityProbe{} + if err := json.Unmarshal([]byte(jsonString), activityProbe); err != nil { + return nil, err + } + if activityProbe.HasActivity == nil && activityProbe.LastActivity == nil { + return nil, errors.New("has_activity and last_activity fields are missing in the activity probe JSON") + } + return activityProbe, nil + +} diff --git a/workspaces/controller/internal/controller/culling_controller_test.go b/workspaces/controller/internal/controller/culling_controller_test.go new file mode 100644 index 00000000..2f16a99e --- /dev/null +++ b/workspaces/controller/internal/controller/culling_controller_test.go @@ -0,0 +1,17 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller diff --git a/workspaces/controller/internal/controller/workspacekind_controller_test.go b/workspaces/controller/internal/controller/workspacekind_controller_test.go index 39330761..ffa8ad16 100644 --- a/workspaces/controller/internal/controller/workspacekind_controller_test.go +++ b/workspaces/controller/internal/controller/workspacekind_controller_test.go @@ -129,7 +129,9 @@ var _ = Describe("WorkspaceKind Controller", func() { newWorkspaceKind = workspaceKind.DeepCopy() newWorkspaceKind.Spec.PodTemplate.Culling.ActivityProbe = kubefloworgv1beta1.ActivityProbe{ Exec: &kubefloworgv1beta1.ActivityProbeExec{ - Command: []string{"bash", "-c", "exit 0"}, + OutputPath: "/path/to/output", + TimeoutSeconds: 9, + Script: "echo 'hello, world!'", }, Jupyter: &kubefloworgv1beta1.ActivityProbeJupyter{ LastActivity: true,