-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkubectl-genresourceyaml.py
executable file
·130 lines (116 loc) · 5.52 KB
/
kubectl-genresourceyaml.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#!/usr/bin/env python3
import os
import argparse
import urllib3
import requests
from kubernetes import config
from kubernetes.client import configuration
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
parser = argparse.ArgumentParser(description='A handy kubectl plugin that performs a walkthrough of a resource schema of a third-party API registered with the Kubernetes API server. This plugin currently does not work with the core APIs and will not generate outputs for resources like Pods and Deployments, etc. The result yields a sample YAML of the resource.')
parser.add_argument('--api', type=str, help='The APIVERSION in the following format: API/version. For reference, it is the APIVERSION field in the kubectl ap-resource command output.')
parser.add_argument('--kind', type=str, help='The KIND value of the object/resource whose schema needs to be generated in the YAML format. For reference, it is the KIND field in the kubectl api-resource command output')
args = parser.parse_args()
if not args.kind:
print("error: the following arguments are required: kind")
exit()
if not args.api:
print("error: the following arguments are required: api")
exit()
apikind = args.kind
apidetail = args.api.split('/')[0]
apiversion = args.api.split('/')[1]
final_output = dict()
def get_kubeapi_request(httpsession,path,header):
response = httpsession.get(path, headers=header, verify=False)
if response.ok:
response.encoding = 'utf-8'
return response.json()
else:
return 0
def traverse_spec_objects(x, n):
for key in x:
if key not in ["description", "type", "x-kubernetes-preserve-unknown-fields"]:
if not x[key].get("type"):
print(" " * n,key+": <>")
elif x[key]["type"] == "boolean":
if "default" in x[key]:
print(" " * n,key+": <boolean> # Default: "+str(x[key]["default"]))
else:
print(" " * n,key+": <boolean>")
elif x[key]["type"] == "string":
if "default" in x[key]:
print(" " * n, key+": <string> # Default: "+x[key]["default"])
else:
print(" " * n, key+": <string>")
elif x[key]["type"] == "integer":
if "default" in x[key]:
print(" " * n, key+": <integer> # Default: "+str(x[key]["default"]))
else:
print(" " * n, key+": <integer>")
elif x[key]["type"] == "number":
if "default" in x[key]:
print(" " * n, key+": <number> # Default: "+str(x[key]["default"]))
else:
print(" " * n, key+": <number>")
elif x[key]["type"] == "array":
# An array of objects
if x[key]["items"]["type"] == "object":
print(" " * n, key+":")
print(" " * n, "- ")
n +=2
if "properties" in x[key]["items"]:
traverse_spec_objects(x[key]["items"]["properties"], n)
else:
# Properties not found in object. Non standard api. Interate thru each key
traverse_spec_objects(x[key]["items"], n)
n -=2
else:
print(" " * n, key+": ")
if "default" in x[key]:
print(" " * n, "- "+ x[key]["items"]["type"] + "# Default: "+str(x[key]["default"]))
else:
print(" " * n, "- "+ x[key]["items"]["type"])
else:
# Type is object
print(" " * n, key+":")
n +=2
if "properties" in x[key]:
traverse_spec_objects(x[key]["properties"], n)
else:
# Properties not found in object. Non standard api. Interate thru each key
traverse_spec_objects(x[key], n)
n -=2
def main():
k8s_host = ""
k8s_token = ""
k8s_headers = ""
defjson = dict()
if not os.environ.get('INCLUSTER_CONFIG'):
config.load_kube_config()
else:
config.load_incluster_config()
k8s_host = configuration.Configuration()._default.host
k8s_token = configuration.Configuration()._default.api_key['authorization']
k8s_headers = {"Accept": "application/json, */*", "Authorization": k8s_token}
k8s_session = requests.session()
parts = apidetail.split(".")
reversed_parts = ".".join(parts[::-1])
apis = get_kubeapi_request(k8s_session,k8s_host+"/openapi/v3/apis/"+apidetail+"/"+apiversion, k8s_headers)
if apis:
defjson = apis["components"]["schemas"].get(reversed_parts+"."+apiversion+"."+apikind)
if not defjson:
print("error: Object " +apikind+" not found in "+apidetail+"/"+apiversion+". Exiting...")
else:
if "spec" in defjson["properties"]:
print("apiVersion: "+apiversion)
print("kind: "+apikind)
print("metadata: ")
print(" name: <string>")
print("spec: ")
if "properties" in defjson["properties"]["spec"]:
final_output["spec"] = defjson["properties"]["spec"]["properties"]
traverse_spec_objects(final_output["spec"], 1)
else:
print("error: The requested object does not have a spec section in the registered API. Exiting...")
if __name__ == "__main__":
main()