diff --git a/README.md b/README.md index dc8aca585..2fa9d68d3 100644 --- a/README.md +++ b/README.md @@ -476,6 +476,14 @@ By default, Dapr will use the `POST` verb. If your app uses Dapr for gRPC, you s dapr invoke --app-id nodeapp --method mymethod --verb GET ``` +Invoke your app in Kubernetes mode: + +If your app running in a Kubernetes cluster, use the `invoke` command with `--kubernetes` flag or the `-k` shorthand. + +``` +$ dapr invoke --kubernetes --app-id nodeapp --method mymethod +``` + ### List To list all Dapr instances running on your machine: diff --git a/cmd/invoke.go b/cmd/invoke.go index c8b009f59..968d83e33 100644 --- a/cmd/invoke.go +++ b/cmd/invoke.go @@ -21,6 +21,7 @@ import ( "github.com/spf13/cobra" + "github.com/dapr/cli/pkg/kubernetes" "github.com/dapr/cli/pkg/print" "github.com/dapr/cli/pkg/standalone" ) @@ -38,15 +39,18 @@ var ( var InvokeCmd = &cobra.Command{ Use: "invoke", - Short: "Invoke a method on a given Dapr application. Supported platforms: Self-hosted", + Short: "Invoke a method on a given Dapr application. Supported platforms: Kubernetes and self-hosted", Example: ` -# Invoke a sample method on target app with POST Verb -dapr invoke --app-id target --method sample --data '{"key":"value"} +# Invoke a sample method on target app with POST Verb in self-hosted mode +dapr invoke --app-id target --method sample --data '{"key":"value"}' -# Invoke a sample method on target app with GET Verb +# Invoke a sample method on target app with in Kubernetes +dapr invoke -k --app-id target --method sample --data '{"key":"value"}' + +# Invoke a sample method on target app with GET Verb in self-hosted mode dapr invoke --app-id target --method sample --verb GET -# Invoke a sample method on target app with GET Verb using Unix domain socket +# Invoke a sample method on target app with GET Verb using Unix domain socket in self-hosted mode dapr invoke --unix-domain-socket --app-id target --method sample --verb GET `, Run: func(cmd *cobra.Command, args []string) { @@ -66,7 +70,6 @@ dapr invoke --unix-domain-socket --app-id target --method sample --verb GET } else if invokeData != "" { bytePayload = []byte(invokeData) } - client := standalone.NewClient() // TODO(@daixiang0): add Windows support. if invokeSocket != "" { @@ -78,7 +81,14 @@ dapr invoke --unix-domain-socket --app-id target --method sample --verb GET } } - response, err := client.Invoke(invokeAppID, invokeAppMethod, bytePayload, invokeVerb, invokeSocket) + var response string + if kubernetesMode { + response, err = kubernetes.Invoke(invokeAppID, invokeAppMethod, bytePayload, invokeVerb) + } else { + client := standalone.NewClient() + response, err = client.Invoke(invokeAppID, invokeAppMethod, bytePayload, invokeVerb, invokeSocket) + } + if err != nil { err = fmt.Errorf("error invoking app %s: %w", invokeAppID, err) print.FailureStatusEvent(os.Stderr, err.Error()) @@ -93,6 +103,7 @@ dapr invoke --unix-domain-socket --app-id target --method sample --verb GET } func init() { + InvokeCmd.Flags().BoolVarP(&kubernetesMode, "kubernetes", "k", false, "Invoke a method on a Dapr application in a Kubernetes cluster") InvokeCmd.Flags().StringVarP(&invokeAppID, "app-id", "a", "", "The application id to invoke") InvokeCmd.Flags().StringVarP(&invokeAppMethod, "method", "m", "", "The method to invoke") InvokeCmd.Flags().StringVarP(&invokeData, "data", "d", "", "The JSON serialized data string (optional)") diff --git a/pkg/kubernetes/invoke.go b/pkg/kubernetes/invoke.go new file mode 100644 index 000000000..2438a856d --- /dev/null +++ b/pkg/kubernetes/invoke.go @@ -0,0 +1,178 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +import ( + "context" + "fmt" + "net/url" + "strings" + + core_v1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/net" + k8s "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +type AppInfo struct { + AppID string `csv:"APP ID" json:"appId" yaml:"appId"` + HTTPPort string `csv:"HTTP PORT" json:"httpPort" yaml:"httpPort"` + GRPCPort string `csv:"GRPC PORT" json:"grpcPort" yaml:"grpcPort"` + AppPort string `csv:"APP PORT" json:"appPort" yaml:"appPort"` + PodName string `csv:"POD NAME" json:"podName" yaml:"podName"` + Namespace string `csv:"NAMESPACE" json:"namespace" yaml:"namespace"` +} + +type ( + DaprPod core_v1.Pod + DaprAppList []*AppInfo +) + +// Invoke is a command to invoke a remote or local dapr instance. +func Invoke(appID, method string, data []byte, verb string) (string, error) { + client, err := Client() + if err != nil { + return "", err + } + + app, err := GetAppInfo(client, appID) + if err != nil { + return "", err + } + + return invoke(client.CoreV1().RESTClient(), app, method, data, verb) +} + +func invoke(client rest.Interface, app *AppInfo, method string, data []byte, verb string) (string, error) { + res, err := app.Request(client.Verb(verb), method, data, verb) + if err != nil { + return "", fmt.Errorf("error get request: %w", err) + } + + result := res.Do(context.TODO()) + rawbody, err := result.Raw() + if err != nil { + return "", fmt.Errorf("error get raw: %w", err) + } + + if len(rawbody) > 0 { + return string(rawbody), nil + } + + return "", nil +} + +func GetAppInfo(client k8s.Interface, appID string) (*AppInfo, error) { + list, err := ListAppInfos(client, appID) + if err != nil { + return nil, err + } + if len(list) == 0 { + return nil, fmt.Errorf("%s not found", appID) + } + app := list[0] + return app, nil +} + +// List outputs plugins. +func ListAppInfos(client k8s.Interface, appIDs ...string) (DaprAppList, error) { + opts := v1.ListOptions{} + podList, err := client.CoreV1().Pods(v1.NamespaceAll).List(context.TODO(), opts) + if err != nil { + return nil, fmt.Errorf("err get pods list:%w", err) + } + + fn := func(*AppInfo) bool { + return true + } + if len(appIDs) > 0 { + fn = func(a *AppInfo) bool { + for _, id := range appIDs { + if id != "" && a.AppID == id { + return true + } + } + return false + } + } + + l := make(DaprAppList, 0) + for _, p := range podList.Items { + p := DaprPod(p) + for _, c := range p.Spec.Containers { + if c.Name == "daprd" { + app := getAppInfoFromPod(&p) + if fn(app) { + l = append(l, app) + } + } + } + } + + return l, nil +} + +func getAppInfoFromPod(p *DaprPod) *AppInfo { + var appInfo *AppInfo + for _, c := range p.Spec.Containers { + if c.Name == "daprd" { + appInfo = &AppInfo{ + PodName: p.Name, + Namespace: p.Namespace, + } + for i, arg := range c.Args { + if arg == "--app-port" { + port := c.Args[i+1] + appInfo.AppPort = port + } else if arg == "--dapr-http-port" { + port := c.Args[i+1] + appInfo.HTTPPort = port + } else if arg == "--dapr-grpc-port" { + port := c.Args[i+1] + appInfo.GRPCPort = port + } else if arg == "--app-id" { + id := c.Args[i+1] + appInfo.AppID = id + } + } + } + } + + return appInfo +} + +func (a *AppInfo) Request(r *rest.Request, method string, data []byte, verb string) (*rest.Request, error) { + r = r.Namespace(a.Namespace). + Resource("pods"). + SubResource("proxy"). + SetHeader("Content-Type", "application/json"). + Name(net.JoinSchemeNamePort("", a.PodName, a.AppPort)) + if data != nil { + r = r.Body(data) + } + + u, err := url.Parse(method) + if err != nil { + return nil, fmt.Errorf("error parse method %s: %w", method, err) + } + + r = r.Suffix(u.Path) + + for k, vs := range u.Query() { + r = r.Param(k, strings.Join(vs, ",")) + } + + return r, nil +} diff --git a/pkg/kubernetes/invoke_test.go b/pkg/kubernetes/invoke_test.go new file mode 100644 index 000000000..a39d9bd6d --- /dev/null +++ b/pkg/kubernetes/invoke_test.go @@ -0,0 +1,242 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +import ( + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utiltesting "k8s.io/client-go/util/testing" +) + +func newDaprAppPod(name string, namespace string, appName string, creationTime time.Time, appPort string, httpPort string, grpcPort string) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Annotations: map[string]string{}, + Labels: map[string]string{ + "app": appName, + }, + CreationTimestamp: metav1.Time{ + Time: creationTime, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + {}, + { + Name: "daprd", + Args: []string{ + "--mode", + "kubernetes", + "--dapr-http-port", + httpPort, + "--dapr-grpc-port", + grpcPort, + "--dapr-internal-grpc-port", + "50002", + "--dapr-listen-addresses", + "[::1],127.0.0.1", + "--dapr-public-port", + "3501", + "--app-port", + appPort, + "--app-id", + appName, + "--control-plane-address", + "dapr-api.keel-system.svc.cluster.local:80", + "--app-protocol", + "http", + "--placement-host-address", + "dapr-placement-server.keel-system.svc.cluster.local:50005", + "--config", + "testAppID-Config", + "--log-level", + "info", + "--app-max-concurrency", + "-1", + "--sentry-address", + "dapr-sentry.keel-system.svc.cluster.local:80", + "--enable-metrics=true", + "--metrics-port", + "9090", + "--dapr-http-max-request-size", + "-1", + "--enable-mtls", + }, + }, + }, + }, + } +} + +func Test_getAppInfo(t *testing.T) { + client := fake.NewSimpleClientset(newDaprAppPod( + "testAppPod", "testAppNameSpace", + "testAppID", time.Now(), + "8080", "80801", "80802")) + + testCases := []struct { + name string + errorExpected bool + errString string + appID string + want *AppInfo + }{ + { + name: "get test Pod", + appID: "testAppID", + errorExpected: false, + errString: "", + want: &AppInfo{ + AppID: "testAppID", HTTPPort: "80801", GRPCPort: "80802", AppPort: "8080", PodName: "testAppPod", Namespace: "testAppNameSpace", + }, + }, + { + name: "get error Pod", + appID: "errorAppID", + errorExpected: true, + errString: "errorAppID not found", + want: nil, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + appInfo, err := GetAppInfo(client, tc.appID) + if tc.errorExpected { + assert.Error(t, err, "expected an error") + assert.Equal(t, tc.errString, err.Error(), "expected error strings to match") + } else { + assert.NoError(t, err, "expected no error") + assert.Equal(t, tc.want, appInfo, "expected appInfo to match") + } + }) + } +} + +func Test_invoke(t *testing.T) { + app := &AppInfo{ + AppID: "testAppID", AppPort: "8080", HTTPPort: "3500", GRPCPort: "50001", PodName: "testAppPod", Namespace: "testAppNameSpace", + } + + testCases := []struct { + name string + errorExpected bool + errString string + appID string + method string + verb string + data []byte + URLExpected string + }{ + { + name: "get request", + errorExpected: false, + errString: "", + method: "hello", + verb: "GET", + data: nil, + URLExpected: "https://localhost/api/v1/" + + "namespaces/testAppNameSpace/pods/testAppPod:8080/proxy/" + + "hello", + }, + { + name: "get request", + errorExpected: false, + errString: "", + method: "hello?abc=123&cdr=345#abb=aaa", + verb: "GET", + data: nil, + URLExpected: "https://localhost/api/v1/" + + "namespaces/testAppNameSpace/pods/testAppPod:8080/proxy/" + + "hello?abc=123&cdr=345#abb=aaa", + }, + { + name: "post request", + errorExpected: false, + errString: "", + method: "hello?abc=123&cdr=345#abb=aaa", + verb: "POST", + data: []byte("hello"), + URLExpected: "https://localhost/api/v1/" + + "namespaces/testAppNameSpace/pods/testAppPod:8080/proxy/" + + "hello?abc=123&cdr=345#abb=aaa", + }, + { + name: "post request", + errorExpected: false, + errString: "errorAppID not found", + method: "hello", + verb: "POST", + data: []byte("hello"), + URLExpected: "https://localhost/api/v1/" + + "namespaces/testAppNameSpace/pods/testAppPod:8080/proxy/" + + "hello", + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + testServer, fakeHandler := testServerEnv(t, 200) + defer testServer.Close() + client, err := restClient(testServer) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + _, err = invoke(client, app, tc.method, tc.data, tc.verb) + if tc.errorExpected { + assert.Error(t, err, "expected an error") + assert.Equal(t, tc.errString, err.Error(), "expected error strings to match") + } else { + assert.NoError(t, err, "expected no error") + data := string(tc.data) + fakeHandler.ValidateRequest(t, tc.URLExpected, tc.verb, &data) + } + }) + } +} + +func testServerEnv(t *testing.T, statusCode int) (*httptest.Server, *utiltesting.FakeHandler) { + t.Helper() + fakeHandler := utiltesting.FakeHandler{ + StatusCode: statusCode, + ResponseBody: "", + T: t, + } + testServer := httptest.NewServer(&fakeHandler) + return testServer, &fakeHandler +} + +func restClient(testServer *httptest.Server) (*rest.RESTClient, error) { + c, err := rest.RESTClientFor(&rest.Config{ + Host: testServer.URL, + ContentConfig: rest.ContentConfig{ + GroupVersion: &v1.SchemeGroupVersion, + NegotiatedSerializer: scheme.Codecs.WithoutConversion(), + }, + APIPath: "api", + Username: "user", + Password: "pass", + }) + return c, err +}