Skip to content

Commit 22149ac

Browse files
committed
added crossplane functions support
1 parent de4e917 commit 22149ac

File tree

20 files changed

+438
-41
lines changed

20 files changed

+438
-41
lines changed

cmd/overlock/configuration/load.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,10 @@ func (c *loadCmd) Run(ctx context.Context, config *rest.Config, dc *dynamic.Dyna
5353
pkgs = append(pkgs, pkg)
5454
}
5555
if c.Upgrade {
56-
cfg.Name = cfg.UpgradeVersion(ctx, dc, cfg.Name, pkgs)
56+
cfg.Name, err = cfg.UpgradeVersion(ctx, dc, cfg.Name, pkgs)
57+
if err != nil {
58+
return err
59+
}
5760
}
5861

5962
logger.Debugf("Loading image to: %s", cfg.Name)

cmd/overlock/function/apply.go

+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package function
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/web-seven/overlock/internal/function"
8+
"go.uber.org/zap"
9+
10+
"k8s.io/client-go/dynamic"
11+
"k8s.io/client-go/rest"
12+
)
13+
14+
type applyCmd struct {
15+
Link string `arg:"" required:"" help:"Link URL (or multiple comma separated) to Crossplane function to be applied to Environment."`
16+
Wait bool `optional:"" short:"w" help:"Wait until function is installed."`
17+
Timeout string `optional:"" short:"t" help:"Timeout is used to set how much to wait until function is installed (valid time units are ns, us, ms, s, m, h)"`
18+
}
19+
20+
func (c *applyCmd) Run(ctx context.Context, dc *dynamic.DynamicClient, config *rest.Config, logger *zap.SugaredLogger) error {
21+
function.ApplyFunction(ctx, c.Link, config, logger)
22+
if !c.Wait {
23+
return nil
24+
}
25+
26+
var timeoutChan <-chan time.Time
27+
if c.Timeout != "" {
28+
timeout, err := time.ParseDuration(c.Timeout)
29+
if err != nil {
30+
return err
31+
}
32+
timeoutChan = time.After(timeout)
33+
}
34+
return function.HealthCheck(ctx, dc, c.Link, c.Wait, timeoutChan, logger)
35+
}

cmd/overlock/function/delete.go

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package function
2+
3+
import (
4+
"context"
5+
6+
"github.com/web-seven/overlock/internal/function"
7+
"go.uber.org/zap"
8+
"k8s.io/client-go/dynamic"
9+
)
10+
11+
type deleteCmd struct {
12+
FunctionURL string `arg:"" required:"" help:"Specifies the URL (or multimple comma separated) of function to be deleted from Environment."`
13+
}
14+
15+
func (c *deleteCmd) Run(ctx context.Context, dynamic *dynamic.DynamicClient, logger *zap.SugaredLogger) error {
16+
return function.DeleteFunction(ctx, c.FunctionURL, dynamic, logger)
17+
}

cmd/overlock/function/function.go

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package function
2+
3+
type Cmd struct {
4+
Apply applyCmd `cmd:"" help:"Apply Crossplane Function."`
5+
List listCmd `cmd:"" help:"Apply Crossplane Function."`
6+
Load loadCmd `cmd:"" help:"Load Crossplane Function from archive."`
7+
Delete deleteCmd `cmd:"" help:"Delete Crossplane Function."`
8+
}

cmd/overlock/function/list.go

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package function
2+
3+
import (
4+
"context"
5+
6+
"github.com/pterm/pterm"
7+
"github.com/web-seven/overlock/internal/function"
8+
"go.uber.org/zap"
9+
10+
"k8s.io/client-go/dynamic"
11+
)
12+
13+
type listCmd struct {
14+
}
15+
16+
func (listCmd) Run(ctx context.Context, dynamicClient *dynamic.DynamicClient, logger *zap.SugaredLogger) error {
17+
functions := function.GetFunctions(ctx, dynamicClient)
18+
table := pterm.TableData{{"NAME", "PACKAGE"}}
19+
for _, conf := range functions {
20+
table = append(table, []string{conf.Name, conf.Spec.Package})
21+
}
22+
pterm.DefaultTable.WithHasHeader().WithData(table).Render()
23+
return nil
24+
}

cmd/overlock/function/load.go

+92
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package function
2+
3+
import (
4+
"bufio"
5+
"context"
6+
"os"
7+
8+
"github.com/web-seven/overlock/internal/function"
9+
"github.com/web-seven/overlock/internal/kube"
10+
"github.com/web-seven/overlock/internal/loader"
11+
"github.com/web-seven/overlock/internal/packages"
12+
"github.com/web-seven/overlock/internal/registry"
13+
"go.uber.org/zap"
14+
"k8s.io/client-go/dynamic"
15+
"k8s.io/client-go/rest"
16+
)
17+
18+
type loadCmd struct {
19+
Name string `arg:"" help:"Name of function."`
20+
Path string `help:"Path to function package archive."`
21+
Stdin bool `help:"Load function package from STDIN."`
22+
Apply bool `help:"Apply function after load."`
23+
Upgrade bool `help:"Upgrade existing function."`
24+
}
25+
26+
func (c *loadCmd) Run(ctx context.Context, config *rest.Config, dc *dynamic.DynamicClient, logger *zap.SugaredLogger) error {
27+
28+
client, err := kube.Client(config)
29+
if err != nil {
30+
return err
31+
}
32+
33+
isLocal, err := registry.IsLocalRegistry(ctx, client)
34+
if !isLocal || err != nil {
35+
reg := registry.NewLocal()
36+
reg.SetDefault(true)
37+
err := reg.Create(ctx, config, logger)
38+
if err != nil {
39+
return err
40+
}
41+
}
42+
43+
fnc := function.Function{}
44+
fnc.Name = c.Name
45+
46+
fncs := function.GetFunctions(ctx, dc)
47+
var pkgs []packages.Package
48+
for _, c := range fncs {
49+
pkg := packages.Package{
50+
Name: c.Name,
51+
Url: c.Spec.Package,
52+
}
53+
pkgs = append(pkgs, pkg)
54+
}
55+
if c.Upgrade {
56+
fnc.Name, err = fnc.UpgradeVersion(ctx, dc, fnc.Name, pkgs)
57+
if err != nil {
58+
return err
59+
}
60+
}
61+
62+
logger.Debugf("Loading image to: %s", fnc.Name)
63+
if c.Path != "" {
64+
logger.Debugf("Loading from path: %s", c.Path)
65+
fnc.Image, err = loader.LoadPathArchive(c.Path)
66+
if err != nil {
67+
return err
68+
}
69+
} else if c.Stdin {
70+
logger.Debug("Loading from STDIN")
71+
reader := bufio.NewReader(os.Stdin)
72+
err = fnc.LoadStdinArchive(reader)
73+
if err != nil {
74+
return err
75+
}
76+
} else {
77+
logger.Warn("Archive path or STDIN required for load function.")
78+
return nil
79+
}
80+
81+
logger.Debug("Pushing to local registry")
82+
err = registry.PushLocalRegistry(ctx, fnc.Name, fnc.Image, config, logger)
83+
if err != nil {
84+
return err
85+
}
86+
logger.Infof("Image archive %s loaded to local registry.", fnc.Name)
87+
88+
if c.Apply {
89+
return function.ApplyFunction(ctx, fnc.Name, config, logger)
90+
}
91+
return nil
92+
}

cmd/overlock/main.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/go-logr/logr"
1111
"github.com/web-seven/overlock/cmd/overlock/configuration"
1212
"github.com/web-seven/overlock/cmd/overlock/environment"
13+
"github.com/web-seven/overlock/cmd/overlock/function"
1314
"github.com/web-seven/overlock/cmd/overlock/generate"
1415
"github.com/web-seven/overlock/cmd/overlock/provider"
1516
"github.com/web-seven/overlock/cmd/overlock/version"
@@ -86,7 +87,8 @@ type cli struct {
8687
Resource resource.Cmd `cmd:"" name:"resource" aliases:"res" help:"Overlock Resource commands"`
8788
Registry registry.Cmd `cmd:"" name:"registry" aliases:"reg" help:"Packages registy commands"`
8889
InstallCompletions kongplete.InstallCompletions `cmd:"" help:"Install shell completions"`
89-
Provider provider.Cmd `cmd:"" name:"provider" help:"Overlock Provider commands"`
90+
Provider provider.Cmd `cmd:"" name:"provider" aliases:"prv" help:"Overlock Provider commands"`
91+
Function function.Cmd `cmd:"" name:"function" aliases:"fnc" help:"Overlock Function commands"`
9092
Search registry.SearchCmd `cmd:"" help:"Search for packages"`
9193
Generate generate.Cmd `cmd:"" help:"Generate example by XRD YAML file"`
9294
}

cmd/overlock/registry/create.go

+3
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ type createCmd struct {
2626

2727
func (c *createCmd) Run(ctx context.Context, client *kubernetes.Clientset, config *rest.Config, logger *zap.SugaredLogger) error {
2828
reg := registry.New(c.RegistryServer, c.Username, c.Password, c.Email)
29+
if c.Local {
30+
reg = registry.NewLocal()
31+
}
2932
reg.SetDefault(c.Default)
3033
reg.SetLocal(c.Local)
3134
reg.WithContext(c.Context)

examples/environment-repository.yaml

-16
This file was deleted.

internal/function/apply.go

+74
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package function
2+
3+
import (
4+
"context"
5+
"strings"
6+
"time"
7+
8+
"github.com/pkg/errors"
9+
"go.uber.org/zap"
10+
11+
"k8s.io/apimachinery/pkg/runtime"
12+
"k8s.io/client-go/dynamic"
13+
"k8s.io/client-go/rest"
14+
15+
"github.com/web-seven/overlock/internal/engine"
16+
17+
"github.com/crossplane/crossplane-runtime/pkg/resource"
18+
crossv1 "github.com/crossplane/crossplane/apis/pkg/v1beta1"
19+
"sigs.k8s.io/controller-runtime/pkg/client"
20+
)
21+
22+
// RunFunctionHealthCheck performs a health check on functions defined by the links string.
23+
func HealthCheck(ctx context.Context, dc dynamic.Interface, links string, wait bool, timeoutChan <-chan time.Time, logger *zap.SugaredLogger) error {
24+
25+
linkSet := make(map[string]struct{})
26+
for _, link := range strings.Split(links, ",") {
27+
linkSet[link] = struct{}{}
28+
}
29+
cfgs := GetFunctions(ctx, dc)
30+
31+
for {
32+
select {
33+
case <-timeoutChan:
34+
logger.Error("Timeout reached.")
35+
return nil
36+
default:
37+
allHealthy := true
38+
for _, cfg := range cfgs {
39+
if _, linkMatched := linkSet[cfg.Spec.Package]; linkMatched {
40+
if !CheckHealthStatus(cfg.Status.Conditions) {
41+
allHealthy = false
42+
break
43+
}
44+
}
45+
}
46+
if allHealthy {
47+
logger.Info("Function(s) are healthy.")
48+
return nil
49+
}
50+
time.Sleep(5 * time.Second)
51+
}
52+
}
53+
}
54+
55+
func ApplyFunction(ctx context.Context, links string, config *rest.Config, logger *zap.SugaredLogger) error {
56+
scheme := runtime.NewScheme()
57+
crossv1.AddToScheme(scheme)
58+
if kube, err := client.New(config, client.Options{Scheme: scheme}); err == nil {
59+
for _, link := range strings.Split(links, ",") {
60+
cfg := &crossv1.Function{}
61+
engine.BuildPack(cfg, link, map[string]string{})
62+
pa := resource.NewAPIPatchingApplicator(kube)
63+
64+
if err := pa.Apply(ctx, cfg); err != nil {
65+
return errors.Wrap(err, "Error apply function(s).")
66+
}
67+
}
68+
} else {
69+
return err
70+
}
71+
72+
logger.Info("Function(s) applied successfully.")
73+
return nil
74+
}

internal/function/delete.go

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package function
2+
3+
import (
4+
"context"
5+
"strings"
6+
7+
crossv1 "github.com/crossplane/crossplane/apis/pkg/v1beta1"
8+
"github.com/web-seven/overlock/internal/engine"
9+
"go.uber.org/zap"
10+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11+
"k8s.io/client-go/dynamic"
12+
)
13+
14+
func DeleteFunction(ctx context.Context, urls string, dynamicClient *dynamic.DynamicClient, logger *zap.SugaredLogger) error {
15+
16+
for _, url := range strings.Split(urls, ",") {
17+
cfg := crossv1.Function{}
18+
engine.BuildPack(&cfg, url, map[string]string{})
19+
20+
err := dynamicClient.Resource(ResourceId()).Namespace("").Delete(ctx, cfg.GetName(), metav1.DeleteOptions{})
21+
if err != nil {
22+
return err
23+
}
24+
}
25+
26+
logger.Info("Function(s) removed successfully.")
27+
return nil
28+
}

internal/function/function.go

+55
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package function
2+
3+
import (
4+
"context"
5+
6+
condition "github.com/crossplane/crossplane-runtime/apis/common/v1"
7+
8+
regv1 "github.com/google/go-containerregistry/pkg/v1"
9+
"github.com/web-seven/overlock/internal/kube"
10+
"github.com/web-seven/overlock/internal/packages"
11+
"go.uber.org/zap"
12+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
13+
"k8s.io/apimachinery/pkg/runtime/schema"
14+
"k8s.io/client-go/dynamic"
15+
)
16+
17+
const (
18+
apiGroup = "pkg.crossplane.io"
19+
apiVersion = "v1"
20+
apiPlural = "functions"
21+
)
22+
23+
type Function struct {
24+
Name string
25+
Image regv1.Image
26+
packages.Package
27+
}
28+
29+
func CheckHealthStatus(status []condition.Condition) bool {
30+
healthStatus := false
31+
for _, condition := range status {
32+
if condition.Type == "Healthy" && condition.Status == "True" {
33+
healthStatus = true
34+
}
35+
}
36+
return healthStatus
37+
}
38+
39+
func GetFunction(ctx context.Context, logger *zap.SugaredLogger, sourceDynamicClient dynamic.Interface, paramsFunction kube.ResourceParams) ([]unstructured.Unstructured, error) {
40+
41+
functions, err := kube.GetKubeResources(paramsFunction)
42+
if err != nil {
43+
return nil, err
44+
}
45+
46+
return functions, nil
47+
}
48+
49+
func ResourceId() schema.GroupVersionResource {
50+
return schema.GroupVersionResource{
51+
Group: apiGroup,
52+
Version: apiVersion,
53+
Resource: apiPlural,
54+
}
55+
}

0 commit comments

Comments
 (0)