Skip to content

Support x86 emulator and Add press_keys、go_recent #39

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 90 additions & 19 deletions hmdriver2/_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
import socket
import json
import struct
import time
import os
import hashlib
Expand All @@ -20,6 +21,12 @@


class HmClient:
HEADER_BYTES = b'_uitestkit_rpc_message_head_'
TAILER_BYTES = b'_uitestkit_rpc_message_tail_'
HEADER_LENGTH = len(HEADER_BYTES)
TAILER_LENGTH = len(TAILER_BYTES)
SESSION_ID_LENGTH = 4 # 4 bytes for session ID

"""harmony uitest client"""
def __init__(self, serial: str):
self.hdc = HdcWrapper(serial)
Expand Down Expand Up @@ -58,27 +65,89 @@ def _send_msg(self, msg: typing.Dict):
"client": "127.0.0.1"
}
"""
msg = json.dumps(msg, ensure_ascii=False, separators=(',', ':'))
logger.debug(f"sendMsg: {msg}")
self.sock.sendall(msg.encode('utf-8') + b'\n')

def _recv_msg(self, buff_size: int = 4096, decode=False, print=True) -> typing.Union[bytearray, str]:
full_msg = bytearray()
msg_str = json.dumps(msg, ensure_ascii=False, separators=(',', ':'))
logger.debug(f"sendMsg: {msg_str}")

# 生成 session_id
msg_bytes = msg_str.encode('utf-8')
session_id = self._generate_session_id(msg_str)
header = (
self.HEADER_BYTES +
struct.pack('>I', session_id) +
struct.pack('>I', len(msg_bytes))
)

# 发送完整消息
self.sock.sendall(header + msg_bytes + self.TAILER_BYTES)

def _generate_session_id(self, message: str) -> int:
"""
生成 sessionId 的逻辑,将时间戳、消息字符串拼接后生成整数哈希值。
"""
# 拼接时间戳和消息字符串,添加随机熵
combined = (
str(int(time.time() * 1000)) + # 毫秒时间戳
message +
os.urandom(4).hex() # 16字节随机熵
)
return int(hashlib.sha256(combined.encode()).hexdigest()[:8], 16)

def _recv_msg(self, decode=False, print=True) -> typing.Union[bytearray, str]:
"""
接收消息,解析请求头和尾部,返回消息内容。
"""
try:
# FIXME
relay = self.sock.recv(buff_size)
if decode:
relay = relay.decode()
# 接收头部
header_len = self.HEADER_LENGTH + self.SESSION_ID_LENGTH + 4
header = self._recv_exact(header_len) # 头部 + session_id + length
if not header or header[:len(self.HEADER_BYTES)] != self.HEADER_BYTES:
logger.warning("Invalid header received")
return ""

# 解析消息长度(目前无需验证session_id,所以不做处理)
msg_length = struct.unpack('>I', header[self.HEADER_LENGTH + self.SESSION_ID_LENGTH:])[0] # 大端序

# 接收消息体
msg_bytes = self._recv_exact(msg_length)
if not msg_bytes:
logger.warning("Failed to receive message body")
return ""

# 接收尾部
tailer = self._recv_exact(self.TAILER_LENGTH)
if not tailer or tailer != self.TAILER_BYTES:
logger.warning("Invalid tailer received")
return ""

# 解码消息
if not decode:
logger.debug(f"recvMsg byte message (size: %d)", len(msg_bytes))
return bytearray(msg_bytes)

msg_str = msg_bytes.decode('utf-8')
if print:
logger.debug(f"recvMsg: {relay}")
full_msg = relay
logger.debug(f"recvMsg: {msg_str}")
return msg_str

except (socket.timeout, UnicodeDecodeError) as e:
logger.warning(e)
if decode:
full_msg = ""
except (socket.timeout, ValueError, json.JSONDecodeError) as e:
logger.warning(f"Error receiving message: {e}")
return ""

return full_msg
def _recv_exact(self, length: int) -> bytes:
"""
确保接收指定长度的数据。
"""
buf = bytearray(length)
view = memoryview(buf)
pos = 0

while pos < length:
chunk_size = self.sock.recv_into(view[pos:], length - pos)
if not chunk_size:
raise ConnectionError("Connection closed during receive")
pos += chunk_size

return buf

def invoke(self, api: str, this: str = "Driver#0", args: typing.List = []) -> HypiumResponse:
"""
Expand Down Expand Up @@ -192,8 +261,10 @@ def init(self):
time.sleep(0.5)

def _get_local_agent_path(self) -> str:
"""Return the local path of the agent file."""
target_agent = "uitest_agent_v1.1.0.so"
"""Return the local path of the agent file, based on the cpu_abi version (e.g., 'so/arm64-v8a/agent.so')."""
cpu_abi = self.hdc.cpu_abi()

target_agent = os.path.join("so", cpu_abi, "agent.so")
return os.path.join(os.path.dirname(os.path.realpath(__file__)), "assets", target_agent)

def _get_remote_md5sum(self, file_path: str) -> Optional[str]:
Expand Down
6 changes: 3 additions & 3 deletions hmdriver2/_screenrecord.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def start(self, video_path: str):

self._send_msg("startCaptureScreen", [])

reply: str = self._recv_msg(1024, decode=True, print=False)
reply: str = self._recv_msg(decode=True, print=False)
if "true" in reply:
record_th = threading.Thread(target=self._record_worker)
writer_th = threading.Thread(target=self._video_writer)
Expand All @@ -74,7 +74,7 @@ def _record_worker(self):
buffer = bytearray()
while not self.stop_event.is_set():
try:
buffer += self._recv_msg(4096 * 1024, decode=False, print=False)
buffer += self._recv_msg(decode=False, print=False)
except Exception as e:
print(f"Error receiving data: {e}")
break
Expand Down Expand Up @@ -121,7 +121,7 @@ def stop(self) -> str:
t.join()

self._send_msg("stopCaptureScreen", [])
self._recv_msg(1024, decode=True, print=False)
self._recv_msg(decode=True, print=False)

self.release()

Expand Down
Binary file added hmdriver2/assets/so/arm64-v8a/agent.so
Binary file not shown.
Binary file added hmdriver2/assets/so/x86_64/agent.so
Binary file not shown.
Binary file removed hmdriver2/assets/uitest_agent_v1.0.7.so
Binary file not shown.
Binary file removed hmdriver2/assets/uitest_agent_v1.1.0.so
Binary file not shown.
13 changes: 13 additions & 0 deletions hmdriver2/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,23 @@ def go_back(self):
def go_home(self):
self.hdc.send_key(KeyCode.HOME)

@delay
def go_recent(self):
self.press_keys(KeyCode.META_LEFT, KeyCode.TAB)

@delay
def press_key(self, key_code: Union[KeyCode, int]):
self.hdc.send_key(key_code)

@delay
def press_keys(self, key_code1: Union[KeyCode, int], key_code2: Union[KeyCode, int]):
"""Press two keys in sequence, given their integer key codes."""
code1 = key_code1.value if isinstance(key_code1, KeyCode) else key_code1
code2 = key_code2.value if isinstance(key_code2, KeyCode) else key_code2

api = "Driver.triggerCombineKeys"
self._invoke(api, args=[code1, code2])

def screen_on(self):
self.hdc.wakeup()

Expand Down