Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
ensure_unicode: encode/decode unicode of standard outputs (stdout, stderr)
Raises:
DeviceConnectionError: if any error occurs when connecting the device
AdbError: if any other adb error occurs
Returns:
command(s) standard output (stdout)
"""
proc = self.start_cmd(cmds, device)
stdout, stderr = proc.communicate()
if ensure_unicode:
stdout = stdout.decode(get_std_encoding(sys.stdout))
stderr = stderr.decode(get_std_encoding(sys.stderr))
if proc.returncode > 0:
# adb connection error
pattern = DeviceConnectionError.DEVICE_CONNECTION_ERROR
if isinstance(stderr, binary_type):
pattern = pattern.encode("utf-8")
if re.search(pattern, stderr):
raise DeviceConnectionError(stderr)
else:
raise AdbError(stdout, stderr)
return stdout
Returns:
a subprocess
"""
if device:
if not self.serialno:
raise RuntimeError("please set serialno first")
cmd_options = self.cmd_options + ['-s', self.serialno]
else:
cmd_options = self.cmd_options
cmds = cmd_options + split_cmd(cmds)
LOGGING.debug(" ".join(cmds))
if not PY3:
cmds = [c.encode(get_std_encoding(sys.stdin)) for c in cmds]
proc = subprocess.Popen(
cmds,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
return proc
"""
if self.server_proc:
self.server_proc.kill()
self.server_proc = None
self.localport, deviceport = self.adb.setup_forward("localabstract:minitouch_{}".format)
deviceport = deviceport[len("localabstract:"):]
p = self.adb.start_shell("/data/local/tmp/minitouch -n '%s' 2>&1" % deviceport)
nbsp = NonBlockingStreamReader(p.stdout, name="minitouch_server")
while True:
line = nbsp.readline(timeout=5.0)
if line is None:
raise RuntimeError("minitouch setup timeout")
line = line.decode(get_std_encoding(sys.stdout))
# 识别出setup成功的log,并匹配出max_x, max_y
m = re.match("Type \w touch device .+ \((\d+)x(\d+) with \d+ contacts\) detected on .+ \(.+\)", line)
if m:
self.max_x, self.max_y = int(m.group(1)), int(m.group(2))
break
else:
self.max_x = 32768
self.max_y = 32768
# nbsp.kill() # 保留,不杀了,后面还会继续读取并pirnt
if p.poll() is not None:
# server setup error, may be already setup by others
# subprocess exit immediately
raise RuntimeError("minitouch server quit immediately")
self.server_proc = p
# reg_cleanup(self.server_proc.kill)
def get_status(self):
"""
Perform `adb get-state` and return the device status
Raises:
AdbError: if status cannot be obtained from the device
Returns:
None if status is `not found`, otherwise return the standard output from `adb get-state` command
"""
proc = self.start_cmd("get-state")
stdout, stderr = proc.communicate()
stdout = stdout.decode(get_std_encoding(sys.stdout))
stderr = stderr.decode(get_std_encoding(sys.stdout))
if proc.returncode == 0:
return stdout.strip()
elif "not found" in stderr:
return None
else:
raise AdbError(stdout, stderr)
"""
cmds = [self.proxy_process, str(local_port), str(remote_port)]
proc = subprocess.Popen(
cmds,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# something like port binding fail
time.sleep(0.5)
if proc.poll() is not None:
stdout, stderr = proc.communicate()
stdout = stdout.decode(get_std_encoding(sys.stdout))
stderr = stderr.decode(get_std_encoding(sys.stderr))
raise AirtestError((stdout, stderr))
self.subprocessHandle.append(proc)