diff --git a/cmd/kcp/kcp.go b/cmd/kcp/kcp.go index 56368edc6ad..3ac0d26947a 100644 --- a/cmd/kcp/kcp.go +++ b/cmd/kcp/kcp.go @@ -68,6 +68,7 @@ func main() { // manually extract root directory from flags first as it influence all other flags rootDir := ".kcp" + additionalMappingsFile := "" for i, f := range os.Args { if f == "--root-directory" { if i < len(os.Args)-1 { @@ -75,11 +76,18 @@ func main() { } // else let normal flag processing fail } else if strings.HasPrefix(f, "--root-directory=") { rootDir = strings.TrimPrefix(f, "--root-directory=") + } else if f == "--mapping-file" { + if i < len(os.Args)-1 { + additionalMappingsFile = os.Args[i+1] + } // else let normal flag processing fail + } else if strings.HasPrefix(f, "--mapping-file") { + additionalMappingsFile = strings.TrimPrefix(f, "--mapping-file=") } } serverOptions := options.NewOptions(rootDir) serverOptions.Server.GenericControlPlane.Logs.Verbosity = logsapiv1.VerbosityLevel(2) + serverOptions.Server.Extra.AdditionalMappingsFile = additionalMappingsFile startCmd := &cobra.Command{ Use: "start", diff --git a/cmd/kcp/options/generic.go b/cmd/kcp/options/generic.go index d7e08ac1688..7c08b845eb5 100644 --- a/cmd/kcp/options/generic.go +++ b/cmd/kcp/options/generic.go @@ -27,17 +27,20 @@ import ( type GenericOptions struct { RootDirectory string + MappingFile string } func NewGeneric(rootDir string) *GenericOptions { return &GenericOptions{ RootDirectory: rootDir, + MappingFile: "", } } func (o *GenericOptions) AddFlags(fss *cliflag.NamedFlagSets) { fs := fss.FlagSet("KCP") fs.StringVar(&o.RootDirectory, "root-directory", o.RootDirectory, "Root directory. Set to \"\" to disable file (e.g. certificates) generation in a root directory.") + fs.StringVar(&o.MappingFile, "mapping-file", o.MappingFile, "Path to additional mapping file to be used by mini-front-proxy.") } func (o *GenericOptions) Complete() (*GenericOptions, error) { @@ -55,6 +58,15 @@ func (o *GenericOptions) Complete() (*GenericOptions, error) { return nil, err } } + if o.MappingFile != "" { + if !filepath.IsAbs(o.MappingFile) { + pwd, err := os.Getwd() + if err != nil { + return nil, err + } + o.MappingFile = filepath.Join(pwd, o.MappingFile) + } + } return o, nil } diff --git a/pkg/proxy/handler.go b/pkg/proxy/handler.go index e41eb4410d4..de1c2162709 100644 --- a/pkg/proxy/handler.go +++ b/pkg/proxy/handler.go @@ -56,9 +56,9 @@ func shardHandler(index index.Index, proxy http.Handler) http.HandlerFunc { return } - shardURLString, found, errCode := index.LookupURL(clusterPath) - if errCode != 0 { - http.Error(w, "Not available.", errCode) + result, found := index.LookupURL(clusterPath) + if result.ErrorCode != 0 { + http.Error(w, "Not available.", result.ErrorCode) return } if !found { @@ -66,7 +66,7 @@ func shardHandler(index index.Index, proxy http.Handler) http.HandlerFunc { responsewriters.Forbidden(req.Context(), attributes, w, req, kcpauthorization.WorkspaceAccessNotPermittedReason, kubernetesscheme.Codecs) return } - shardURL, err := url.Parse(shardURLString) + shardURL, err := url.Parse(result.URL) if err != nil { responsewriters.InternalError(w, req, err) return diff --git a/pkg/proxy/index/index_controller.go b/pkg/proxy/index/index_controller.go index 0ed4ba35e26..de102ace23c 100644 --- a/pkg/proxy/index/index_controller.go +++ b/pkg/proxy/index/index_controller.go @@ -49,7 +49,7 @@ const ( ) type Index interface { - LookupURL(path logicalcluster.Path) (url string, found bool, errorCode int) + LookupURL(path logicalcluster.Path) (index.Result, bool) } type ClusterClientGetter func(shard *corev1alpha1.Shard) (kcpclientset.ClusterInterface, error) @@ -291,10 +291,16 @@ func (c *Controller) stopShard(shardName string) { delete(c.shardLogicalClusterInformers, shardName) } -func (c *Controller) LookupURL(path logicalcluster.Path) (url string, found bool, errorCode int) { +func (c *Controller) LookupURL(path logicalcluster.Path) (index.Result, bool) { r, found := c.state.LookupURL(path) if found && r.ErrorCode != 0 { - return r.URL, found, r.ErrorCode + return index.Result{ + URL: r.URL, + ErrorCode: r.ErrorCode, + }, found } - return r.URL, found, 0 + return index.Result{ + URL: r.URL, + ErrorCode: 0, + }, found } diff --git a/pkg/proxy/mapping.go b/pkg/proxy/mapping.go index 7ed3dee75f8..fbf9f990c10 100644 --- a/pkg/proxy/mapping.go +++ b/pkg/proxy/mapping.go @@ -23,10 +23,6 @@ import ( "net/http/httputil" "net/url" "os" - "path" - "strings" - - "github.com/kcp-dev/logicalcluster/v3" "k8s.io/component-base/metrics/legacyregistry" "k8s.io/klog/v2" @@ -34,110 +30,27 @@ import ( "github.com/kcp-dev/kcp/pkg/proxy/index" proxyoptions "github.com/kcp-dev/kcp/pkg/proxy/options" + "github.com/kcp-dev/kcp/pkg/server/proxy" ) -// PathMapping describes how to route traffic from a path to a backend server. -// Each Path is registered with the DefaultServeMux with a handler that -// delegates to the specified backend. -type PathMapping struct { - Path string `json:"path"` - Backend string `json:"backend"` - BackendServerCA string `json:"backend_server_ca"` - ProxyClientCert string `json:"proxy_client_cert"` - ProxyClientKey string `json:"proxy_client_key"` - UserHeader string `json:"user_header,omitempty"` - GroupHeader string `json:"group_header,omitempty"` - ExtraHeaderPrefix string `json:"extra_header_prefix"` -} - -type HttpHandler struct { - index index.Index - mapping []httpHandlerMapping - defaultHandler http.Handler -} - -// httpHandlerMapping is used to route traffic to the correct backend server. -// Higher weight means that the mapping is more specific and should be matched first. -type httpHandlerMapping struct { - weight int - path string - handler http.Handler -} - -func (h *HttpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - // mappings are used to route traffic to the correct backend server. - // It should not have `/clusters` as prefix because that is handled by the - // shardHandler or mounts. Logic is as follows: - // 1. We detect URL for the request and find the correct handler. URL can be - // shard based, virtual workspace or mount. First two are covered by r.URL, - // where mounts are covered by annotation on the workspace with the mount path. - // 2. If mountpoint is found, we rewrite the URL to resolve, else use one in - // request to match with mappings. - // 3. Iterate over mappings and find the one that matches the URL. If found, - // use the handler for that mapping, else use default handler - kcp. - // Mappings are done from most specific to least specific: - // Example: /clusters/cluster1/ will be matched before /clusters/ - for _, m := range h.mapping { - url, errorCode := h.resolveURL(r) - if errorCode != 0 { - http.Error(w, http.StatusText(errorCode), errorCode) - return - } - if strings.HasPrefix(url, m.path) { - m.handler.ServeHTTP(w, r) - return - } - } - - h.defaultHandler.ServeHTTP(w, r) -} - -func (h *HttpHandler) resolveURL(r *http.Request) (string, int) { - // if we don't match any of the paths, use the default behavior - request - var cs = strings.SplitN(strings.TrimLeft(r.URL.Path, "/"), "/", 3) - if len(cs) < 2 || cs[0] != "clusters" { - return r.URL.Path, 0 - } - - clusterPath := logicalcluster.NewPath(cs[1]) - if !clusterPath.IsValid() { - return r.URL.Path, 0 - } - - u, found, errCode := h.index.LookupURL(clusterPath) - if errCode != 0 { - return "", errCode - } - if found { - u, err := url.Parse(u) - if err == nil && u != nil { - u.Path = strings.TrimSuffix(u.Path, "/") - r.URL.Path = path.Join(u.Path, strings.Join(cs[2:], "/")) // override request prefix and keep kube api contextual suffix - return u.Path, 0 - } - } - - return r.URL.Path, 0 -} - func NewHandler(ctx context.Context, o *proxyoptions.Options, index index.Index) (http.Handler, error) { mappingData, err := os.ReadFile(o.MappingFile) if err != nil { return nil, fmt.Errorf("failed to read mapping file %q: %w", o.MappingFile, err) } - var mapping []PathMapping + var mapping []proxy.PathMapping if err = yaml.Unmarshal(mappingData, &mapping); err != nil { return nil, fmt.Errorf("failed to unmarshal mapping file %q: %w", o.MappingFile, err) } - handlers := HttpHandler{ - index: index, - mapping: []httpHandlerMapping{ + handlers := proxy.HttpHandler{ + Index: index, + Mappings: proxy.HttpHandlerMappings{ { - weight: 0, - path: "/metrics", - handler: legacyregistry.Handler(), + Weight: 0, + Path: "/metrics", + Handler: legacyregistry.Handler(), }, }, } @@ -185,31 +98,17 @@ func NewHandler(ctx context.Context, o *proxyoptions.Options, index index.Index) logger.V(2).WithValues("path", m.Path).Info("adding handler") if m.Path == "/" { - handlers.defaultHandler = handler + handlers.DefaultHandler = handler } else { - handlers.mapping = append(handlers.mapping, httpHandlerMapping{ - weight: len(m.Path), - path: m.Path, - handler: handler, + handlers.Mappings = append(handlers.Mappings, proxy.HttpHandlerMapping{ + Weight: len(m.Path), + Path: m.Path, + Handler: handler, }) } } - handlers.mapping = sortMappings(handlers.mapping) + handlers.Mappings.Sort() return &handlers, nil } - -func sortMappings(mappings []httpHandlerMapping) []httpHandlerMapping { - // sort mappings by weight - // higher weight means that the mapping is more specific and should be matched first - // Example: /clusters/cluster1/ will be matched before /clusters/ - for i := range mappings { - for j := range mappings { - if mappings[i].weight > mappings[j].weight { - mappings[i], mappings[j] = mappings[j], mappings[i] - } - } - } - return mappings -} diff --git a/pkg/server/config.go b/pkg/server/config.go index 91aa66ec8b9..33455f52124 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -460,7 +460,13 @@ func NewConfig(opts kcpserveroptions.CompletedOptions) (*Config, error) { apiHandler = mux apiHandler = filters.WithAuditInit(apiHandler) // Must run before any audit annotation is made - apiHandler = WithLocalProxy(apiHandler, opts.Extra.ShardName, opts.Extra.ShardBaseURL, c.KcpSharedInformerFactory.Tenancy().V1alpha1().Workspaces(), c.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters()) + apiHandler = WithLocalProxy(apiHandler, + opts.Extra.ShardName, + opts.Extra.ShardBaseURL, + opts.Extra.AdditionalMappingsFile, + c.KcpSharedInformerFactory.Tenancy().V1alpha1().Workspaces(), + c.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(), + ) apiHandler = WithInClusterServiceAccountRequestRewrite(apiHandler) apiHandler = kcpfilters.WithAcceptHeader(apiHandler) apiHandler = WithUserAgent(apiHandler) diff --git a/pkg/server/localproxy.go b/pkg/server/localproxy.go index 5932dd3e5fd..035c6547790 100644 --- a/pkg/server/localproxy.go +++ b/pkg/server/localproxy.go @@ -22,19 +22,24 @@ import ( "net/http" "net/http/httputil" "net/url" + "os" "github.com/kcp-dev/logicalcluster/v3" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime/schema" + userinfo "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/client-go/tools/cache" + "k8s.io/component-base/metrics/legacyregistry" "k8s.io/klog/v2" + "sigs.k8s.io/yaml" "github.com/kcp-dev/kcp/pkg/index" indexrewriters "github.com/kcp-dev/kcp/pkg/index/rewriters" "github.com/kcp-dev/kcp/pkg/server/filters" + "github.com/kcp-dev/kcp/pkg/server/proxy" corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1" tenancyv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/tenancy/v1alpha1" corev1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/core/v1alpha1" @@ -46,7 +51,7 @@ import ( // mainly interesting for standalone mode, without a real front-proxy in-front. func WithLocalProxy( handler http.Handler, - shardName, shardBaseURL string, + shardName, shardBaseURL, additionalMappingsFile string, workspaceInformer tenancyv1alpha1informers.WorkspaceClusterInformer, logicalClusterInformer corev1alpha1informers.LogicalClusterClusterInformer, ) http.Handler { @@ -91,7 +96,7 @@ func WithLocalProxy( }, }) - return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + defaultHandlerFunc := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { ctx := req.Context() logger := klog.FromContext(ctx) @@ -195,4 +200,120 @@ func WithLocalProxy( handler.ServeHTTP(w, req.WithContext(ctx)) }) + + // If additional mappings file is provided, read it and add the mappings to the handler + handlers, err := NewLocalProxyHandler(defaultHandlerFunc, indexState, additionalMappingsFile) + if err != nil { + klog.Fatalf("failed to create local proxy handler: %v", err) + } + + return handlers +} + +// NewLocalProxyHandler returns a handler with a local-only mini-front-proxy. +// This function is very similar to proxy/mapping.go.NewHandler. +// If we want to re-use that code, we basically would be merging proxy with server packages. +// Which is not desirable at the point of writing (2024.10-26), but might be in the future. +func NewLocalProxyHandler(defaultHandler http.Handler, index index.Index, additionalMappingsFile string) (http.Handler, error) { + mappingData, err := os.ReadFile(additionalMappingsFile) + if err != nil { + return nil, fmt.Errorf("failed to read mapping file %q: %w", additionalMappingsFile, err) + } + + var mapping []proxy.PathMapping + if err = yaml.Unmarshal(mappingData, &mapping); err != nil { + return nil, fmt.Errorf("failed to unmarshal mapping file %q: %w", additionalMappingsFile, err) + } + + handlers := proxy.HttpHandler{ + Index: index, + Mappings: proxy.HttpHandlerMappings{ + { + Weight: 0, + Path: "/metrics", + Handler: legacyregistry.Handler(), + }, + }, + DefaultHandler: defaultHandler, + } + + for _, m := range mapping { + u, err := url.Parse(m.Backend) + if err != nil { + return nil, fmt.Errorf("failed to create path mapping for path %q: failed to parse URL %q: %w", m.Path, m.Backend, err) + } + + // This is insecure, but just don't tell it to anyone :) + transport, err := newInsecureTransport() + if err != nil { + return nil, fmt.Errorf("failed to create path mapping for path %q: %w", m.Path, err) + } + + var handler http.Handler + p := httputil.NewSingleHostReverseProxy(u) + p.Transport = transport + handler = p + + userHeader := "X-Remote-User" + groupHeader := "X-Remote-Group" + extraHeaderPrefix := "X-Remote-Extra-" + if m.UserHeader != "" { + userHeader = m.UserHeader + } + if m.GroupHeader != "" { + groupHeader = m.GroupHeader + } + if m.ExtraHeaderPrefix != "" { + extraHeaderPrefix = m.ExtraHeaderPrefix + } + + handler = withProxyAuthHeaders(handler, userHeader, groupHeader, extraHeaderPrefix) + + handlers.Mappings = append(handlers.Mappings, proxy.HttpHandlerMapping{ + Weight: len(m.Path), + Path: m.Path, + Handler: handler, + }) + } + + handlers.Mappings.Sort() + + return &handlers, nil +} + +func newInsecureTransport() (*http.Transport, error) { + transport := http.DefaultTransport.(*http.Transport).Clone() + transport.TLSClientConfig = &tls.Config{ + InsecureSkipVerify: true, + } + return transport, nil +} + +// withProxyAuthHeaders does client cert termination by extracting the user and groups and +// passing them through access headers to the shard. +func withProxyAuthHeaders(delegate http.Handler, userHeader, groupHeader string, extraHeaderPrefix string) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if u, ok := request.UserFrom(r.Context()); ok { + appendClientCertAuthHeaders(r.Header, u, userHeader, groupHeader, extraHeaderPrefix) + } + + delegate.ServeHTTP(w, r) + } +} + +func appendClientCertAuthHeaders(header http.Header, user userinfo.Info, userHeader, groupHeader, extraHeaderPrefix string) { + header.Set(userHeader, user.GetName()) + + for _, group := range user.GetGroups() { + header.Add(groupHeader, group) + } + + for k, values := range user.GetExtra() { + // Key must be encoded to enable e.g authentication.kubernetes.io/cluster-name + // This is decoded in the RequestHeader auth handler + encodedKey := url.PathEscape(k) + for _, v := range values { + header.Add(extraHeaderPrefix+encodedKey, v) + } + } } diff --git a/pkg/server/options/options.go b/pkg/server/options/options.go index 2a6a3fc8515..c74b181455d 100644 --- a/pkg/server/options/options.go +++ b/pkg/server/options/options.go @@ -67,6 +67,11 @@ type ExtraOptions struct { ExternalLogicalClusterAdminKubeconfig string ConversionCELTransformationTimeout time.Duration BatteriesIncluded []string + // DEVELOPMENT ONLY. AdditionalMappingsFile is the path to a file that contains additional mappings + // for the mini-front-proxy to use. The file should be in the format of the + // --mapping-file flag of the front-proxy. Do NOT expose this flag to users via main server options. + // It is overridden by the kcp start command. + AdditionalMappingsFile string } type completedOptions struct { diff --git a/pkg/server/proxy/handler.go b/pkg/server/proxy/handler.go new file mode 100644 index 00000000000..7cf72f6ec4b --- /dev/null +++ b/pkg/server/proxy/handler.go @@ -0,0 +1,111 @@ +package proxy + +import ( + "net/http" + "net/url" + "path" + "strings" + + "github.com/kcp-dev/logicalcluster/v3" + + "github.com/kcp-dev/kcp/pkg/proxy/index" +) + +// PathMapping describes how to route traffic from a path to a backend server. +// Each Path is registered with the DefaultServeMux with a handler that +// delegates to the specified backend. +type PathMapping struct { + Path string `json:"path"` + Backend string `json:"backend"` + BackendServerCA string `json:"backend_server_ca"` + ProxyClientCert string `json:"proxy_client_cert"` + ProxyClientKey string `json:"proxy_client_key"` + UserHeader string `json:"user_header,omitempty"` + GroupHeader string `json:"group_header,omitempty"` + ExtraHeaderPrefix string `json:"extra_header_prefix"` +} + +type HttpHandler struct { + Index index.Index + Mappings HttpHandlerMappings + DefaultHandler http.Handler +} + +// httpHandlerMapping is used to route traffic to the correct backend server. +// Higher weight means that the mapping is more specific and should be matched first. +type HttpHandlerMapping struct { + Weight int + Path string + Handler http.Handler +} + +type HttpHandlerMappings []HttpHandlerMapping + +// Sort mappings by weight +// higher weight means that the mapping is more specific and should be matched first +// Example: /clusters/cluster1/ will be matched before /clusters/ +func (h HttpHandlerMappings) Sort() { + for i := range h { + for j := range h { + if h[i].Weight > h[j].Weight { + h[i], h[j] = h[j], h[i] + } + } + } +} + +func (h *HttpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // mappings are used to route traffic to the correct backend server. + // It should not have `/clusters` as prefix because that is handled by the + // shardHandler or mounts. Logic is as follows: + // 1. We detect URL for the request and find the correct handler. URL can be + // shard based, virtual workspace or mount. First two are covered by r.URL, + // where mounts are covered by annotation on the workspace with the mount path. + // 2. If mountpoint is found, we rewrite the URL to resolve, else use one in + // request to match with mappings. + // 3. Iterate over mappings and find the one that matches the URL. If found, + // use the handler for that mapping, else use default handler - kcp. + // Mappings are done from most specific to least specific: + // Example: /clusters/cluster1/ will be matched before /clusters/ + for _, m := range h.Mappings { + url, errorCode := h.resolveURL(r) + if errorCode != 0 { + http.Error(w, http.StatusText(errorCode), errorCode) + return + } + if strings.HasPrefix(url, m.Path) { + m.Handler.ServeHTTP(w, r) + return + } + } + + h.DefaultHandler.ServeHTTP(w, r) +} + +func (h *HttpHandler) resolveURL(r *http.Request) (string, int) { + // if we don't match any of the paths, use the default behavior - request + var cs = strings.SplitN(strings.TrimLeft(r.URL.Path, "/"), "/", 3) + if len(cs) < 2 || cs[0] != "clusters" { + return r.URL.Path, 0 + } + + clusterPath := logicalcluster.NewPath(cs[1]) + if !clusterPath.IsValid() { + return r.URL.Path, 0 + } + + result, found := h.Index.LookupURL(clusterPath) + if result.ErrorCode != 0 { + return "", result.ErrorCode + } + if found { + u, err := url.Parse(result.URL) + if err == nil && u != nil { + u.Path = strings.TrimSuffix(u.Path, "/") + r.URL.Path = path.Join(u.Path, strings.Join(cs[2:], "/")) // override request prefix and keep kube api contextual suffix + return u.Path, 0 + } + } + + return r.URL.Path, 0 +} diff --git a/pkg/proxy/mapping_test.go b/pkg/server/proxy/mapping_test.go similarity index 65% rename from pkg/proxy/mapping_test.go rename to pkg/server/proxy/mapping_test.go index 51ce459ed7b..dea77ddb5c3 100644 --- a/pkg/proxy/mapping_test.go +++ b/pkg/server/proxy/mapping_test.go @@ -22,23 +22,23 @@ import ( ) func TestSortMappings(t *testing.T) { - mappings := []httpHandlerMapping{ - {weight: 3}, - {weight: 1}, - {weight: 2}, - {weight: 10}, + mappings := HttpHandlerMappings{ + {Weight: 3}, + {Weight: 1}, + {Weight: 2}, + {Weight: 10}, } - expected := []httpHandlerMapping{ - {weight: 10}, - {weight: 3}, - {weight: 2}, - {weight: 1}, + expected := HttpHandlerMappings{ + {Weight: 10}, + {Weight: 3}, + {Weight: 2}, + {Weight: 1}, } - sortedMappings := sortMappings(mappings) + mappings.Sort() - if !reflect.DeepEqual(sortedMappings, expected) { - t.Errorf("Expected %v, but got %v", expected, sortedMappings) + if !reflect.DeepEqual(mappings, expected) { + t.Errorf("Expected %v, but got %v", expected, mappings) } }