Skip to content

Commit

Permalink
feat: support sso (#88)
Browse files Browse the repository at this point in the history
* feat: sso

* feat: sso

* feat: sso

* Update __init__.py

* fix: configs

* Update login.py

* fix: use non-interactive shell on win platform

* fix: support cp65001 codec in python2

* fix: sso for python3

* style: output

* fix: sso

* Update texts.py

* Update login.py
  • Loading branch information
sesky4 authored Nov 14, 2024
1 parent 09ea012 commit 3615c8d
Show file tree
Hide file tree
Showing 12 changed files with 739 additions and 5 deletions.
2 changes: 1 addition & 1 deletion tccli/argparser.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def _check_value(self, action, value):
def parse_known_args(self, args=None, namespace=None):
parsed, remaining = super(BaseArgParser, self).parse_known_args(args, namespace)
terminal_encoding = getattr(sys.stdin, 'encoding', 'utf-8')
if terminal_encoding is None:
if terminal_encoding is None or terminal_encoding == 'cp65001':
terminal_encoding = 'utf-8'
for arg, value in vars(parsed).items():
if isinstance(value, six.binary_type):
Expand Down
4 changes: 2 additions & 2 deletions tccli/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import tccli.options_define as Options_define
from collections import OrderedDict

from tccli import oauth
from tccli import credentials
from tccli.utils import Utils
from tccli.argument import CLIArgument, CustomArgument, ListArgument, BooleanArgument
from tccli.exceptions import UnknownArgumentError
Expand Down Expand Up @@ -291,7 +291,7 @@ def __call__(self, args, parsed_globals):
action_parameters = self.cli_unfold_argument.build_action_parameters(parsed_args)
else:
action_parameters = self._build_action_parameters(parsed_args, self.argument_map)
oauth.maybe_refresh_credential(parsed_globals.profile if parsed_globals.profile else "default")
credentials.maybe_refresh_credential(parsed_globals.profile if parsed_globals.profile else "default")
return self._action_caller(action_parameters, vars(parsed_globals))

def create_help_command(self):
Expand Down
25 changes: 25 additions & 0 deletions tccli/credentials.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import json
import os

from tccli import oauth, sso


def maybe_refresh_credential(profile):
try:
with open(cred_path_of_profile(profile), "r") as cred_file:
cred = json.load(cred_file)
except IOError:
# file not found, don't check
return

if cred.get("type", "") == "oauth":
oauth.maybe_refresh_credential(profile)
return

if cred.get("type", "") == "sso":
sso.maybe_refresh_credential(profile)
return


def cred_path_of_profile(profile):
return os.path.join(os.path.expanduser("~"), ".tccli", profile + ".credential")
8 changes: 6 additions & 2 deletions tccli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,18 @@
_locale._getdefaultlocale = (lambda *args: ['zh_CN', 'utf8'])

import io
import os
import sys
import six
import signal
from tccli.log import init
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException

try:
# cp65001 is utf-8 on windows platform
# python2 doesn't support cp65001 codec
if getattr(sys.stdin, 'encoding', 'utf-8') == "cp65001":
import codecs
codecs.register(lambda name: codecs.lookup('utf-8') if name == 'cp65001' else None)

reload(sys) # Python 2.7
sys.setdefaultencoding('utf8')
except NameError:
Expand Down
61 changes: 61 additions & 0 deletions tccli/plugins/sso/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# encoding: utf-8
from tccli.plugins.sso.login import login_command_entrypoint
from tccli.plugins.sso.logout import logout_command_entrypoint
from tccli.plugins.sso.configure import configure_command_entrypoint

service_name = "sso"
service_version = "2024-10-14"

_spec = {
"metadata": {
"serviceShortName": service_name,
"apiVersion": service_version,
"description": "sso related commands",
},
"actions": {
"configure": {
"name": "配置",
"document": "configure login url",
"input": "configureRequest",
"output": "configureResponse",
"action_caller": configure_command_entrypoint,
},
"login": {
"name": "登录",
"document": "login through sso",
"input": "loginRequest",
"output": "loginResponse",
"action_caller": login_command_entrypoint,
},
"logout": {
"name": "登出",
"document": "remove local credential file",
"input": "logoutRequest",
"output": "logoutResponse",
"action_caller": logout_command_entrypoint,
},
},
"objects": {
"loginRequest": {"members": []},
"loginResponse": {"members": []},
"logoutRequest": {"members": []},
"logoutResponse": {"members": []},
"configureRequest": {"members": [
{
"name": "url",
"member": "string",
"type": "string",
"required": True,
"document": "url for sso authentication",
},
]},
"configureResponse": {"members": []},
},
"version": "1.0",
}


def register_service(specs):
specs[service_name] = {
service_version: _spec,
}
4 changes: 4 additions & 0 deletions tccli/plugins/sso/configs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# encoding: utf-8
CLI_URL = "https://cli.cloud.tencent.com/sso"
SITE = "cn"
DEFAULT_LANG = "zh-CN"
45 changes: 45 additions & 0 deletions tccli/plugins/sso/configure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# coding: utf-8
import json
try:
from urlparse import urlparse
except ImportError:
from urllib.parse import urlparse

from tccli import sso
from tccli.plugins.sso import texts, configs


def configure_command_entrypoint(args, parsed_globals):
language = parsed_globals.get("language")
if not language:
language = configs.DEFAULT_LANG
texts.set_lang(language)

profile = parsed_globals.get("profile", "default")
if not profile:
profile = "default"

cred_data = {}
try:
with open(sso.cred_path_of_profile(profile), "r") as cred_file:
cred_data = json.load(cred_file)
except IOError:
pass

if "sso" not in cred_data:
cred_data["sso"] = {}

auth_url = args.get("url")
if not auth_url.startswith("http://") and not auth_url.startswith("https://"):
auth_url = "https://" + auth_url

parsed_url = urlparse(auth_url)
if not (parsed_url.scheme in ("http", "https") and parsed_url.netloc):
print(texts.get("invalid_auth_url") % auth_url)
return

cred_data["sso"]["authUrl"] = auth_url
with open(sso.cred_path_of_profile(profile), "w") as cred_file:
json.dump(cred_data, cred_file, indent=4)

print(texts.get("configure_succeed") % auth_url)
144 changes: 144 additions & 0 deletions tccli/plugins/sso/login.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# coding: utf-8
import json
import os.path
import random
import string
import sys
import time
import uuid
import webbrowser

from six.moves.urllib.parse import urlencode

from tccli import sso
from tccli.plugins.sso import texts, terminal, configs
from tccli.plugins.sso.texts import get as _


def print_message(msg):
print(msg)
sys.stdout.flush()


def login_command_entrypoint(args, parsed_globals):
language = parsed_globals.get("language")
if not language:
language = configs.DEFAULT_LANG
texts.set_lang(language)

profile = parsed_globals.get("profile", "default")
if not profile:
profile = "default"

login(profile, language)


def login(profile, language):
cred_path = sso.cred_path_of_profile(profile)
auth_url = ""
if os.path.exists(cred_path):
with open(cred_path, "r") as cred_file:
cred_data = json.load(cred_file)
auth_url = cred_data.get("sso", {}).get("authUrl", "")

if not auth_url:
profile_opt = ""
if profile != "default":
profile_opt = "--profile %s " % profile
print_message(_("auth_url_not_configured") % profile_opt)
return

characters = string.ascii_letters + string.digits
state = ''.join(random.choice(characters) for x in range(32))

token = _get_token(auth_url, state, language)

if token["State"] != state:
raise ValueError("invalid state %s" % token["state"])

print_message("")

login_token = token["Token"]
site = token["Site"]
accounts = sso.list_accounts_for_access_assignment(login_token, site)
if not accounts:
print_message(_("no_account"))
return

idx = terminal.select_from_items(
_("account_select_prompt"), ["%s:%s" % (x["Name"], x["Uin"]) for x in accounts], 10)
account = accounts[idx]
print_message("uin: %s" % account["Uin"])
print_message("username: %s" % account["Name"])

roles = sso.list_role_configurations_for_account(account["Uin"], login_token, site)
if not roles:
print_message(_("no_role"))
return

idx = terminal.select_from_items(
_("role_select_prompt"), [x["RoleConfigurationName"] for x in roles], 10)
role = roles[idx]
print_message("role: %s" % role["RoleConfigurationName"])

saml_resp = sso.gen_saml_response(
login_token, "RoleSAML", account["Uin"], "", role["RoleConfigurationId"], site)

token_info = sso.verify_login_skey(login_token, site)

role_arn = "qcs::cam::uin/%s:roleName/TencentCloudSSO-%s" % (account["Uin"], role["RoleConfigurationName"])
principal_arn = "qcs::cam::uin/%s:saml-provider/TencentReservedSSO-%s" % (account["Uin"], token_info["ZoneId"])
cred = sso.assume_role_with_saml(
saml_resp["SAMLResponse"], principal_arn, role_arn, "ses-%s" % uuid.uuid4(), 7200, site)

sso_info = {
"token": login_token,
"uin": account["Uin"],
"roleConfigurationId": role["RoleConfigurationId"],
"roleConfigurationName": role["RoleConfigurationName"],
"zoneId": token_info["ZoneId"],
"site": site,
"authUrl": auth_url,
"expiresAt": int(time.time()) + 3600 * 12,
}
sso.save_credential(cred, sso_info, profile)

print_message(_("login_success") % sso.cred_path_of_profile(profile))


def _get_token(auth_url, state, language):
cli_params = {
"lang": language,
"site": configs.SITE,
"state": state,
}
cli_query = urlencode(cli_params)
cli_url = configs.CLI_URL + "?" + cli_query
url_params = {
"loginType": "tccli",
"callback": cli_url,
"state": state,
}
url_query = urlencode(url_params)
auth_url = auth_url + "?" + url_query

print_message(_("try_login_with_url"))
print_message("")
print_message(auth_url)

webbrowser.open(auth_url)

while True:
time.sleep(1)

login_state = sso.check_login_state(state)
if "Error" in login_state:
raise ValueError(login_state["Error"])

if login_state["Status"] == "NotFound":
continue

if login_state["Status"] == "Finished":
return login_state["Token"]

raise ValueError("invalid resp: %s" % login_state)
21 changes: 21 additions & 0 deletions tccli/plugins/sso/logout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# coding: utf-8
import os

from tccli import sso
from tccli.plugins.sso import texts, configs


def logout_command_entrypoint(args, parsed_globals):
language = parsed_globals.get("language")
if not language:
language = configs.DEFAULT_LANG
texts.set_lang(language)

profile = parsed_globals.get("profile", "default")
if not profile:
profile = "default"

cred_path = sso.cred_path_of_profile(profile)
if os.path.exists(cred_path):
os.remove(cred_path)
print(texts.get("logout_success") % cred_path)
Loading

0 comments on commit 3615c8d

Please sign in to comment.