Skip to content

Commit 470c056

Browse files
committed
Move encryption and api functions into the base class
1 parent 1ebac33 commit 470c056

File tree

3 files changed

+190
-154
lines changed

3 files changed

+190
-154
lines changed

switchbot/devices/device.py

+165-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from typing import Any, Callable, TypeVar, cast
1212
from uuid import UUID
1313

14+
import aiohttp
1415
from bleak.backends.device import BLEDevice
1516
from bleak.backends.service import BleakGATTCharacteristic, BleakGATTServiceCollection
1617
from bleak.exc import BleakDBusError
@@ -22,7 +23,15 @@
2223
establish_connection,
2324
)
2425

25-
from ..const import DEFAULT_RETRY_COUNT, DEFAULT_SCAN_TIMEOUT
26+
from ..api_config import SWITCHBOT_APP_API_BASE_URL, SWITCHBOT_APP_CLIENT_ID
27+
from ..const import (
28+
DEFAULT_RETRY_COUNT,
29+
DEFAULT_SCAN_TIMEOUT,
30+
SwitchbotAccountConnectionError,
31+
SwitchbotApiError,
32+
SwitchbotAuthenticationError,
33+
SwitchbotModel,
34+
)
2635
from ..discovery import GetSwitchbotDevices
2736
from ..models import SwitchBotAdvertisement
2837

@@ -151,6 +160,35 @@ def __init__(
151160
self._last_full_update: float = -PASSIVE_POLL_INTERVAL
152161
self._timed_disconnect_task: asyncio.Task[None] | None = None
153162

163+
@classmethod
164+
async def api_request(
165+
cls,
166+
session: aiohttp.ClientSession,
167+
subdomain: str,
168+
path: str,
169+
data: dict = None,
170+
headers: dict = None,
171+
) -> dict:
172+
url = f"https://{subdomain}.{SWITCHBOT_APP_API_BASE_URL}/{path}"
173+
async with session.post(
174+
url,
175+
json=data,
176+
headers=headers,
177+
timeout=aiohttp.ClientTimeout(total=10),
178+
) as result:
179+
if result.status > 299:
180+
raise SwitchbotApiError(
181+
f"Unexpected status code returned by SwitchBot API: {result.status}"
182+
)
183+
184+
response = await result.json()
185+
if response["statusCode"] != 100:
186+
raise SwitchbotApiError(
187+
f"{response['message']}, status code: {response['statusCode']}"
188+
)
189+
190+
return response["body"]
191+
154192
def advertisement_changed(self, advertisement: SwitchBotAdvertisement) -> bool:
155193
"""Check if the advertisement has changed."""
156194
return bool(
@@ -665,6 +703,132 @@ def update_from_advertisement(self, advertisement: SwitchBotAdvertisement) -> No
665703
self._set_advertisement_data(advertisement)
666704

667705

706+
class SwitchbotEncryptedDevice(SwitchbotDevice):
707+
"""A Switchbot device that uses encryption."""
708+
709+
def __init__(
710+
self,
711+
device: BLEDevice,
712+
key_id: str,
713+
encryption_key: str,
714+
model: SwitchbotModel,
715+
interface: int = 0,
716+
**kwargs: Any,
717+
) -> None:
718+
"""Switchbot base class constructor for encrypted devices."""
719+
if len(key_id) == 0:
720+
raise ValueError("key_id is missing")
721+
elif len(key_id) != 2:
722+
raise ValueError("key_id is invalid")
723+
if len(encryption_key) == 0:
724+
raise ValueError("encryption_key is missing")
725+
elif len(encryption_key) != 32:
726+
raise ValueError("encryption_key is invalid")
727+
self._key_id = key_id
728+
self._encryption_key = bytearray.fromhex(encryption_key)
729+
self._iv: bytes | None = None
730+
self._cipher: bytes | None = None
731+
self._model = model
732+
super().__init__(device, None, interface, **kwargs)
733+
734+
# Old non-async method preserved for backwards compatibility
735+
@classmethod
736+
def retrieve_encryption_key(
737+
cls: SwitchbotBaseDevice, device_mac: str, username: str, password: str
738+
):
739+
async def async_fn():
740+
async with aiohttp.ClientSession() as session:
741+
return await cls.async_retrieve_encryption_key(
742+
session, device_mac, username, password
743+
)
744+
745+
return asyncio.run(async_fn())
746+
747+
@classmethod
748+
async def async_retrieve_encryption_key(
749+
cls: SwitchbotBaseDevice,
750+
session: aiohttp.ClientSession,
751+
device_mac: str,
752+
username: str,
753+
password: str,
754+
) -> dict:
755+
"""Retrieve lock key from internal SwitchBot API."""
756+
device_mac = device_mac.replace(":", "").replace("-", "").upper()
757+
758+
try:
759+
auth_result = await cls.api_request(
760+
session,
761+
"account",
762+
"account/api/v1/user/login",
763+
{
764+
"clientId": SWITCHBOT_APP_CLIENT_ID,
765+
"username": username,
766+
"password": password,
767+
"grantType": "password",
768+
"verifyCode": "",
769+
},
770+
)
771+
auth_headers = {"authorization": auth_result["access_token"]}
772+
except Exception as err:
773+
raise SwitchbotAuthenticationError(f"Authentication failed: {err}") from err
774+
775+
try:
776+
userinfo = await cls.api_request(
777+
session, "account", "account/api/v1/user/userinfo", {}, auth_headers
778+
)
779+
if "botRegion" in userinfo and userinfo["botRegion"] != "":
780+
region = userinfo["botRegion"]
781+
else:
782+
region = "us"
783+
except Exception as err:
784+
raise SwitchbotAccountConnectionError(
785+
f"Failed to retrieve SwitchBot Account user details: {err}"
786+
) from err
787+
788+
try:
789+
device_info = await cls.api_request(
790+
session,
791+
f"wonderlabs.{region}",
792+
"wonder/keys/v1/communicate",
793+
{
794+
"device_mac": device_mac,
795+
"keyType": "user",
796+
},
797+
auth_headers,
798+
)
799+
800+
return {
801+
"key_id": device_info["communicationKey"]["keyId"],
802+
"encryption_key": device_info["communicationKey"]["key"],
803+
}
804+
except Exception as err:
805+
raise SwitchbotAccountConnectionError(
806+
f"Failed to retrieve encryption key from SwitchBot Account: {err}"
807+
) from err
808+
809+
@classmethod
810+
async def verify_encryption_key(
811+
cls,
812+
device: BLEDevice,
813+
key_id: str,
814+
encryption_key: str,
815+
model: SwitchbotModel,
816+
**kwargs: Any,
817+
) -> bool:
818+
try:
819+
device = cls(
820+
device, key_id=key_id, encryption_key=encryption_key, model=model
821+
)
822+
except ValueError:
823+
return False
824+
try:
825+
info = await device.get_basic_info()
826+
except SwitchbotOperationError:
827+
return False
828+
829+
return info is not None
830+
831+
668832
class SwitchbotDeviceOverrideStateDuringConnection(SwitchbotBaseDevice):
669833
"""Base Representation of a Switchbot Device.
670834

switchbot/devices/lock.py

+9-136
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,15 @@
22

33
from __future__ import annotations
44

5-
import asyncio
65
import logging
76
import time
87
from typing import Any
98

10-
import aiohttp
119
from bleak.backends.device import BLEDevice
1210
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
1311

14-
from ..api_config import SWITCHBOT_APP_API_BASE_URL, SWITCHBOT_APP_CLIENT_ID
15-
from ..const import (
16-
LockStatus,
17-
SwitchbotAccountConnectionError,
18-
SwitchbotApiError,
19-
SwitchbotAuthenticationError,
20-
SwitchbotModel,
21-
)
22-
from .device import SwitchbotDevice, SwitchbotOperationError
12+
from ..const import LockStatus, SwitchbotModel
13+
from .device import SwitchbotEncryptedDevice
2314

2415
COMMAND_HEADER = "57"
2516
COMMAND_GET_CK_IV = f"{COMMAND_HEADER}0f2103"
@@ -54,7 +45,7 @@
5445
# The return value of the command is 6 when the command is successful but the battery is low.
5546

5647

57-
class SwitchbotLock(SwitchbotDevice):
48+
class SwitchbotLock(SwitchbotEncryptedDevice):
5849
"""Representation of a Switchbot Lock."""
5950

6051
def __init__(
@@ -66,141 +57,23 @@ def __init__(
6657
model: SwitchbotModel = SwitchbotModel.LOCK,
6758
**kwargs: Any,
6859
) -> None:
69-
if len(key_id) == 0:
70-
raise ValueError("key_id is missing")
71-
elif len(key_id) != 2:
72-
raise ValueError("key_id is invalid")
73-
if len(encryption_key) == 0:
74-
raise ValueError("encryption_key is missing")
75-
elif len(encryption_key) != 32:
76-
raise ValueError("encryption_key is invalid")
7760
if model not in (SwitchbotModel.LOCK, SwitchbotModel.LOCK_PRO):
7861
raise ValueError("initializing SwitchbotLock with a non-lock model")
79-
self._iv = None
80-
self._cipher = None
81-
self._key_id = key_id
82-
self._encryption_key = bytearray.fromhex(encryption_key)
8362
self._notifications_enabled: bool = False
84-
self._model: SwitchbotModel = model
85-
super().__init__(device, None, interface, **kwargs)
63+
super().__init__(device, key_id, encryption_key, model, interface, **kwargs)
8664

87-
@staticmethod
65+
@classmethod
8866
async def verify_encryption_key(
67+
cls,
8968
device: BLEDevice,
9069
key_id: str,
9170
encryption_key: str,
9271
model: SwitchbotModel = SwitchbotModel.LOCK,
9372
**kwargs: Any,
9473
) -> bool:
95-
try:
96-
lock = SwitchbotLock(
97-
device, key_id=key_id, encryption_key=encryption_key, model=model
98-
)
99-
except ValueError:
100-
return False
101-
try:
102-
lock_info = await lock.get_basic_info()
103-
except SwitchbotOperationError:
104-
return False
105-
106-
return lock_info is not None
107-
108-
@staticmethod
109-
async def api_request(
110-
session: aiohttp.ClientSession,
111-
subdomain: str,
112-
path: str,
113-
data: dict = None,
114-
headers: dict = None,
115-
) -> dict:
116-
url = f"https://{subdomain}.{SWITCHBOT_APP_API_BASE_URL}/{path}"
117-
async with session.post(
118-
url,
119-
json=data,
120-
headers=headers,
121-
timeout=aiohttp.ClientTimeout(total=10),
122-
) as result:
123-
if result.status > 299:
124-
raise SwitchbotApiError(
125-
f"Unexpected status code returned by SwitchBot API: {result.status}"
126-
)
127-
128-
response = await result.json()
129-
if response["statusCode"] != 100:
130-
raise SwitchbotApiError(
131-
f"{response['message']}, status code: {response['statusCode']}"
132-
)
133-
134-
return response["body"]
135-
136-
# Old non-async method preserved for backwards compatibility
137-
@staticmethod
138-
def retrieve_encryption_key(device_mac: str, username: str, password: str):
139-
async def async_fn():
140-
async with aiohttp.ClientSession() as session:
141-
return await SwitchbotLock.async_retrieve_encryption_key(
142-
session, device_mac, username, password
143-
)
144-
145-
return asyncio.run(async_fn())
146-
147-
@staticmethod
148-
async def async_retrieve_encryption_key(
149-
session: aiohttp.ClientSession, device_mac: str, username: str, password: str
150-
) -> dict:
151-
"""Retrieve lock key from internal SwitchBot API."""
152-
device_mac = device_mac.replace(":", "").replace("-", "").upper()
153-
154-
try:
155-
auth_result = await SwitchbotLock.api_request(
156-
session,
157-
"account",
158-
"account/api/v1/user/login",
159-
{
160-
"clientId": SWITCHBOT_APP_CLIENT_ID,
161-
"username": username,
162-
"password": password,
163-
"grantType": "password",
164-
"verifyCode": "",
165-
},
166-
)
167-
auth_headers = {"authorization": auth_result["access_token"]}
168-
except Exception as err:
169-
raise SwitchbotAuthenticationError(f"Authentication failed: {err}") from err
170-
171-
try:
172-
userinfo = await SwitchbotLock.api_request(
173-
session, "account", "account/api/v1/user/userinfo", {}, auth_headers
174-
)
175-
if "botRegion" in userinfo and userinfo["botRegion"] != "":
176-
region = userinfo["botRegion"]
177-
else:
178-
region = "us"
179-
except Exception as err:
180-
raise SwitchbotAccountConnectionError(
181-
f"Failed to retrieve SwitchBot Account user details: {err}"
182-
) from err
183-
184-
try:
185-
device_info = await SwitchbotLock.api_request(
186-
session,
187-
f"wonderlabs.{region}",
188-
"wonder/keys/v1/communicate",
189-
{
190-
"device_mac": device_mac,
191-
"keyType": "user",
192-
},
193-
auth_headers,
194-
)
195-
196-
return {
197-
"key_id": device_info["communicationKey"]["keyId"],
198-
"encryption_key": device_info["communicationKey"]["key"],
199-
}
200-
except Exception as err:
201-
raise SwitchbotAccountConnectionError(
202-
f"Failed to retrieve encryption key from SwitchBot Account: {err}"
203-
) from err
74+
return super().verify_encryption_key(
75+
device, key_id, encryption_key, model, **kwargs
76+
)
20477

20578
async def lock(self) -> bool:
20679
"""Send lock command."""

0 commit comments

Comments
 (0)