Skip to content

Commit

Permalink
enable development of vw locally
Browse files Browse the repository at this point in the history
  • Loading branch information
mjudeikis committed Nov 24, 2024
1 parent 9f10e30 commit 0a4b302
Show file tree
Hide file tree
Showing 10 changed files with 307 additions and 139 deletions.
8 changes: 8 additions & 0 deletions cmd/kcp/kcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,26 @@ func main() {

// manually extract root directory from flags first as it influence all other flags
rootDir := ".kcp"
additionalMappingsFile := ""
for i, f := range os.Args {
if f == "--root-directory" {
if i < len(os.Args)-1 {
rootDir = os.Args[i+1]
} // else let normal flag processing fail
} else if strings.HasPrefix(f, "--root-directory=") {
rootDir = strings.TrimPrefix(f, "--root-directory=")
} else if f == "--mapping-file" {
if i < len(os.Args)-1 {
additionalMappingsFile = os.Args[i+1]
} // else let normal flag processing fail
} else if strings.HasPrefix(f, "--mapping-file") {
additionalMappingsFile = strings.TrimPrefix(f, "--mapping-file=")
}
}

serverOptions := options.NewOptions(rootDir)
serverOptions.Server.GenericControlPlane.Logs.Verbosity = logsapiv1.VerbosityLevel(2)
serverOptions.Server.Extra.AdditionalMappingsFile = additionalMappingsFile

startCmd := &cobra.Command{
Use: "start",
Expand Down
12 changes: 12 additions & 0 deletions cmd/kcp/options/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,20 @@ import (

type GenericOptions struct {
RootDirectory string
MappingFile string
}

func NewGeneric(rootDir string) *GenericOptions {
return &GenericOptions{
RootDirectory: rootDir,
MappingFile: "",
}
}

func (o *GenericOptions) AddFlags(fss *cliflag.NamedFlagSets) {
fs := fss.FlagSet("KCP")
fs.StringVar(&o.RootDirectory, "root-directory", o.RootDirectory, "Root directory. Set to \"\" to disable file (e.g. certificates) generation in a root directory.")
fs.StringVar(&o.MappingFile, "mapping-file", o.MappingFile, "Path to additional mapping file to be used by mini-front-proxy.")
}

func (o *GenericOptions) Complete() (*GenericOptions, error) {
Expand All @@ -55,6 +58,15 @@ func (o *GenericOptions) Complete() (*GenericOptions, error) {
return nil, err
}
}
if o.MappingFile != "" {
if !filepath.IsAbs(o.MappingFile) {
pwd, err := os.Getwd()
if err != nil {
return nil, err
}
o.MappingFile = filepath.Join(pwd, o.MappingFile)
}
}

return o, nil
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/proxy/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,17 @@ func shardHandler(index index.Index, proxy http.Handler) http.HandlerFunc {
return
}

shardURLString, found, errCode := index.LookupURL(clusterPath)
if errCode != 0 {
http.Error(w, "Not available.", errCode)
result, found := index.LookupURL(clusterPath)
if result.ErrorCode != 0 {
http.Error(w, "Not available.", result.ErrorCode)
return
}
if !found {
logger.WithValues("clusterPath", clusterPath).V(4).Info("Unknown cluster path")
responsewriters.Forbidden(req.Context(), attributes, w, req, kcpauthorization.WorkspaceAccessNotPermittedReason, kubernetesscheme.Codecs)
return
}
shardURL, err := url.Parse(shardURLString)
shardURL, err := url.Parse(result.URL)
if err != nil {
responsewriters.InternalError(w, req, err)
return
Expand Down
14 changes: 10 additions & 4 deletions pkg/proxy/index/index_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ const (
)

type Index interface {
LookupURL(path logicalcluster.Path) (url string, found bool, errorCode int)
LookupURL(path logicalcluster.Path) (index.Result, bool)
}

type ClusterClientGetter func(shard *corev1alpha1.Shard) (kcpclientset.ClusterInterface, error)
Expand Down Expand Up @@ -291,10 +291,16 @@ func (c *Controller) stopShard(shardName string) {
delete(c.shardLogicalClusterInformers, shardName)
}

func (c *Controller) LookupURL(path logicalcluster.Path) (url string, found bool, errorCode int) {
func (c *Controller) LookupURL(path logicalcluster.Path) (index.Result, bool) {
r, found := c.state.LookupURL(path)
if found && r.ErrorCode != 0 {
return r.URL, found, r.ErrorCode
return index.Result{
URL: r.URL,
ErrorCode: r.ErrorCode,
}, found
}
return r.URL, found, 0
return index.Result{
URL: r.URL,
ErrorCode: 0,
}, found
}
129 changes: 14 additions & 115 deletions pkg/proxy/mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,121 +23,34 @@ import (
"net/http/httputil"
"net/url"
"os"
"path"
"strings"

"github.com/kcp-dev/logicalcluster/v3"

"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/klog/v2"
"sigs.k8s.io/yaml"

"github.com/kcp-dev/kcp/pkg/proxy/index"
proxyoptions "github.com/kcp-dev/kcp/pkg/proxy/options"
"github.com/kcp-dev/kcp/pkg/server/proxy"
)

// PathMapping describes how to route traffic from a path to a backend server.
// Each Path is registered with the DefaultServeMux with a handler that
// delegates to the specified backend.
type PathMapping struct {
Path string `json:"path"`
Backend string `json:"backend"`
BackendServerCA string `json:"backend_server_ca"`
ProxyClientCert string `json:"proxy_client_cert"`
ProxyClientKey string `json:"proxy_client_key"`
UserHeader string `json:"user_header,omitempty"`
GroupHeader string `json:"group_header,omitempty"`
ExtraHeaderPrefix string `json:"extra_header_prefix"`
}

type HttpHandler struct {
index index.Index
mapping []httpHandlerMapping
defaultHandler http.Handler
}

// httpHandlerMapping is used to route traffic to the correct backend server.
// Higher weight means that the mapping is more specific and should be matched first.
type httpHandlerMapping struct {
weight int
path string
handler http.Handler
}

func (h *HttpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// mappings are used to route traffic to the correct backend server.
// It should not have `/clusters` as prefix because that is handled by the
// shardHandler or mounts. Logic is as follows:
// 1. We detect URL for the request and find the correct handler. URL can be
// shard based, virtual workspace or mount. First two are covered by r.URL,
// where mounts are covered by annotation on the workspace with the mount path.
// 2. If mountpoint is found, we rewrite the URL to resolve, else use one in
// request to match with mappings.
// 3. Iterate over mappings and find the one that matches the URL. If found,
// use the handler for that mapping, else use default handler - kcp.
// Mappings are done from most specific to least specific:
// Example: /clusters/cluster1/ will be matched before /clusters/
for _, m := range h.mapping {
url, errorCode := h.resolveURL(r)
if errorCode != 0 {
http.Error(w, http.StatusText(errorCode), errorCode)
return
}
if strings.HasPrefix(url, m.path) {
m.handler.ServeHTTP(w, r)
return
}
}

h.defaultHandler.ServeHTTP(w, r)
}

func (h *HttpHandler) resolveURL(r *http.Request) (string, int) {
// if we don't match any of the paths, use the default behavior - request
var cs = strings.SplitN(strings.TrimLeft(r.URL.Path, "/"), "/", 3)
if len(cs) < 2 || cs[0] != "clusters" {
return r.URL.Path, 0
}

clusterPath := logicalcluster.NewPath(cs[1])
if !clusterPath.IsValid() {
return r.URL.Path, 0
}

u, found, errCode := h.index.LookupURL(clusterPath)
if errCode != 0 {
return "", errCode
}
if found {
u, err := url.Parse(u)
if err == nil && u != nil {
u.Path = strings.TrimSuffix(u.Path, "/")
r.URL.Path = path.Join(u.Path, strings.Join(cs[2:], "/")) // override request prefix and keep kube api contextual suffix
return u.Path, 0
}
}

return r.URL.Path, 0
}

func NewHandler(ctx context.Context, o *proxyoptions.Options, index index.Index) (http.Handler, error) {
mappingData, err := os.ReadFile(o.MappingFile)
if err != nil {
return nil, fmt.Errorf("failed to read mapping file %q: %w", o.MappingFile, err)
}

var mapping []PathMapping
var mapping []proxy.PathMapping
if err = yaml.Unmarshal(mappingData, &mapping); err != nil {
return nil, fmt.Errorf("failed to unmarshal mapping file %q: %w", o.MappingFile, err)
}

handlers := HttpHandler{
index: index,
mapping: []httpHandlerMapping{
handlers := proxy.HttpHandler{
Index: index,
Mappings: proxy.HttpHandlerMappings{
{
weight: 0,
path: "/metrics",
handler: legacyregistry.Handler(),
Weight: 0,
Path: "/metrics",
Handler: legacyregistry.Handler(),
},
},
}
Expand Down Expand Up @@ -185,31 +98,17 @@ func NewHandler(ctx context.Context, o *proxyoptions.Options, index index.Index)

logger.V(2).WithValues("path", m.Path).Info("adding handler")
if m.Path == "/" {
handlers.defaultHandler = handler
handlers.DefaultHandler = handler
} else {
handlers.mapping = append(handlers.mapping, httpHandlerMapping{
weight: len(m.Path),
path: m.Path,
handler: handler,
handlers.Mappings = append(handlers.Mappings, proxy.HttpHandlerMapping{
Weight: len(m.Path),
Path: m.Path,
Handler: handler,
})
}
}

handlers.mapping = sortMappings(handlers.mapping)
handlers.Mappings.Sort()

return &handlers, nil
}

func sortMappings(mappings []httpHandlerMapping) []httpHandlerMapping {
// sort mappings by weight
// higher weight means that the mapping is more specific and should be matched first
// Example: /clusters/cluster1/ will be matched before /clusters/
for i := range mappings {
for j := range mappings {
if mappings[i].weight > mappings[j].weight {
mappings[i], mappings[j] = mappings[j], mappings[i]
}
}
}
return mappings
}
8 changes: 7 additions & 1 deletion pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,13 @@ func NewConfig(opts kcpserveroptions.CompletedOptions) (*Config, error) {
apiHandler = mux

apiHandler = filters.WithAuditInit(apiHandler) // Must run before any audit annotation is made
apiHandler = WithLocalProxy(apiHandler, opts.Extra.ShardName, opts.Extra.ShardBaseURL, c.KcpSharedInformerFactory.Tenancy().V1alpha1().Workspaces(), c.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters())
apiHandler = WithLocalProxy(apiHandler,
opts.Extra.ShardName,
opts.Extra.ShardBaseURL,
opts.Extra.AdditionalMappingsFile,
c.KcpSharedInformerFactory.Tenancy().V1alpha1().Workspaces(),
c.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(),
)
apiHandler = WithInClusterServiceAccountRequestRewrite(apiHandler)
apiHandler = kcpfilters.WithAcceptHeader(apiHandler)
apiHandler = WithUserAgent(apiHandler)
Expand Down
Loading

0 comments on commit 0a4b302

Please sign in to comment.