diff --git a/.gitignore b/.gitignore index 53903ff..c87b314 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ uiautomator.egg-info *.egg .tox .coverage +/atc_uiautomator.egg-info/ diff --git a/MANIFEST.in b/MANIFEST.in index d2a4466..1d10f00 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,5 +1,4 @@ include uiautomator/*.py include setup.py include uiautomator/libs/*.jar -include uiautomator/libs/*.apk -include test/*.py +include uiautomator/libs/*.apk \ No newline at end of file diff --git a/docs/img/settings.png b/docs/img/settings.png deleted file mode 100644 index be5eed4..0000000 Binary files a/docs/img/settings.png and /dev/null differ diff --git a/setup.py b/setup.py index 8c59736..17f51ce 100755 --- a/setup.py +++ b/setup.py @@ -7,20 +7,22 @@ requires = [ - "urllib3>=1.7.1" + "urllib3>=1.7.1", + 'selenium>=3.1', + 'pillow>=6.2.1' ] test_requires = [ 'nose>=1.0', 'mock>=1.0.1', - 'coverage>=3.6' + 'coverage>=3.6', ] -version = '0.3.2' +version = '0.7.5' setup( - name='uiautomator', + name='atc_uiautomator', version=version, - description='Python Wrapper for Android UiAutomator test tool', + description='Python Wrapper for Android UiAutomator test_set tool', long_description='Python wrapper for Android uiautomator tool.', author='Xiaocong He', author_email='xiaocong@gmail.com,hongbin.bao@gmail.com', @@ -37,7 +39,7 @@ 'uiautomator': [ 'uiautomator/libs/bundle.jar', 'uiautomator/libs/uiautomator-stub.jar', - 'uiautomator/libs/app-uiautomator-test.apk', + 'uiautomator/libs/app-uiautomator-test_set.apk', 'uiautomator/libs/app-uiautomator.apk' ] }, diff --git a/test/res/layout.xml b/test/res/layout.xml deleted file mode 100644 index f604767..0000000 --- a/test/res/layout.xml +++ /dev/null @@ -1,20 +0,0 @@ - - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/test/test_adb.py b/test/test_adb.py deleted file mode 100644 index ac97b9d..0000000 --- a/test/test_adb.py +++ /dev/null @@ -1,199 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import unittest -from mock import MagicMock, patch -import os -import subprocess -from uiautomator import Adb - - -class TestAdb(unittest.TestCase): - - def setUp(self): - self.os_name = os.name - - def tearDown(self): - os.name = self.os_name - - def test_serial(self): - serial = "abcdef1234567890" - adb = Adb(serial) - self.assertEqual(adb.default_serial, serial) - - adb.devices = MagicMock() - adb.devices.return_value = [serial, "123456"] - self.assertEqual(adb.device_serial(), serial) - - def test_adb_from_env(self): - home_dir = '/android/home' - with patch.dict('os.environ', {'ANDROID_HOME': home_dir}): - with patch('os.path.exists') as exists: - exists.return_value = True - - os.name = "posix" # linux - adb_obj = Adb() - adb_path = os.path.join(home_dir, "platform-tools", "adb") - self.assertEqual(adb_obj.adb(), adb_path) - exists.assert_called_once_with(adb_path) - self.assertEqual(adb_obj.adb(), adb_path) - # the second call will return the __adb_cmd directly - exists.assert_called_once_with(adb_path) - - os.name = "nt" # linux - adb_obj = Adb() - adb_path = os.path.join(home_dir, "platform-tools", "adb.exe") - self.assertEqual(adb_obj.adb(), adb_path) - - exists.return_value = False - with self.assertRaises(EnvironmentError): - Adb().adb() - - def test_adb_from_find(self): - with patch.dict('os.environ', {}, clear=True): - with patch("distutils.spawn.find_executable") as find_executable: - find_executable.return_value = "/usr/bin/adb" - with patch("os.path.realpath") as realpath: - realpath.return_value = "/home/user/android/platform-tools/adb" - self.assertEqual(realpath.return_value, Adb().adb()) - find_executable.assert_called_once_with("adb") # find_exectable should be called once - realpath.assert_called_once_with(find_executable.return_value) - realpath.return_value = find_executable.return_value - self.assertEqual(find_executable.return_value, Adb().adb()) - find_executable.return_value = None - call_count = find_executable.call_count - with self.assertRaises(EnvironmentError): - Adb().adb() - self.assertEqual(call_count + 1, find_executable.call_count) - - def test_devices(self): - adb = Adb() - adb.raw_cmd = MagicMock() - adb.raw_cmd.return_value.communicate.return_value = (b"List of devices attached \r\n014E05DE0F02000E\tdevice\r\n489328DKFL7DF\tdevice", b"") - self.assertEqual(adb.devices(), {"014E05DE0F02000E": "device", "489328DKFL7DF": "device"}) - adb.raw_cmd.assert_called_once_with("devices") - adb.raw_cmd.return_value.communicate.return_value = (b"List of devices attached \n\r014E05DE0F02000E\tdevice\n\r489328DKFL7DF\tdevice", b"") - self.assertEqual(adb.devices(), {"014E05DE0F02000E": "device", "489328DKFL7DF": "device"}) - adb.raw_cmd.return_value.communicate.return_value = (b"List of devices attached \r014E05DE0F02000E\tdevice\r489328DKFL7DF\tdevice", b"") - self.assertEqual(adb.devices(), {"014E05DE0F02000E": "device", "489328DKFL7DF": "device"}) - adb.raw_cmd.return_value.communicate.return_value = (b"List of devices attached \n014E05DE0F02000E\tdevice\n489328DKFL7DF\tdevice", b"") - self.assertEqual(adb.devices(), {"014E05DE0F02000E": "device", "489328DKFL7DF": "device"}) - adb.raw_cmd.return_value.communicate.return_value = (b"not match", "") - with self.assertRaises(EnvironmentError): - adb.devices() - - def test_forward(self): - adb = Adb() - adb.cmd = MagicMock() - adb.forward(90, 91) - adb.cmd.assert_called_once_with("forward", "tcp:90", "tcp:91") - adb.cmd.return_value.wait.assert_called_once_with() - - def test_adb_raw_cmd(self): - import subprocess - adb = Adb() - adb.adb = MagicMock() - adb.adb.return_value = "adb" - args = ["a", "b", "c"] - with patch("subprocess.Popen") as Popen: - os.name = "posix" - adb.raw_cmd(*args) - Popen.assert_called_once_with(["%s %s" % (adb.adb(), " ".join(args))], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - with patch("subprocess.Popen") as Popen: - os.name = "nt" - adb.raw_cmd(*args) - Popen.assert_called_once_with([adb.adb()] + list(args), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - - def test_adb_cmd(self): - adb = Adb() - adb.device_serial = MagicMock() - adb.device_serial.return_value = "ANDROID_SERIAL" - adb.raw_cmd = MagicMock() - args = ["a", "b", "c"] - adb.cmd(*args) - adb.raw_cmd.assert_called_once_with("-s", "%s" % adb.device_serial(), *args) - - adb.device_serial.return_value = "ANDROID SERIAL" - adb.raw_cmd = MagicMock() - args = ["a", "b", "c"] - adb.cmd(*args) - adb.raw_cmd.assert_called_once_with("-s", "'%s'" % adb.device_serial(), *args) - - def test_adb_cmd_server_host(self): - adb = Adb(adb_server_host="localhost", adb_server_port=5037) - adb.adb = MagicMock() - adb.adb.return_value = "adb" - adb.device_serial = MagicMock() - adb.device_serial.return_value = "ANDROID_SERIAL" - args = ["a", "b", "c"] - with patch("subprocess.Popen") as Popen: - os.name = "nt" - adb.raw_cmd(*args) - Popen.assert_called_once_with( - [adb.adb()] + args, - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE - ) - - adb = Adb(adb_server_host="test.com", adb_server_port=1000) - adb.adb = MagicMock() - adb.adb.return_value = "adb" - adb.device_serial = MagicMock() - adb.device_serial.return_value = "ANDROID_SERIAL" - args = ["a", "b", "c"] - with patch("subprocess.Popen") as Popen: - os.name = "posix" - adb.raw_cmd(*args) - Popen.assert_called_once_with( - [" ".join([adb.adb()] + ["-H", "test.com", "-P", "1000"] + args)], - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE - ) - - def test_device_serial(self): - with patch.dict('os.environ', {'ANDROID_SERIAL': "ABCDEF123456"}): - adb = Adb() - adb.devices = MagicMock() - adb.devices.return_value = {"ABCDEF123456": "device"} - self.assertEqual(adb.device_serial(), "ABCDEF123456") - with patch.dict('os.environ', {'ANDROID_SERIAL': "ABCDEF123456"}): - adb = Adb() - adb.devices = MagicMock() - adb.devices.return_value = {"ABCDEF123456": "device", "123456ABCDEF": "device"} - self.assertEqual(adb.device_serial(), "ABCDEF123456") - with patch.dict('os.environ', {'ANDROID_SERIAL': "HIJKLMN098765"}): - adb = Adb() - adb.devices = MagicMock() - adb.devices.return_value = {"ABCDEF123456": "device", "123456ABCDEF": "device"} - self.assertEqual(adb.device_serial(), "HIJKLMN098765") - with patch.dict('os.environ', {}, clear=True): - adb = Adb() - adb.devices = MagicMock() - adb.devices.return_value = {"ABCDEF123456": "device", "123456ABCDEF": "device"} - with self.assertRaises(EnvironmentError): - adb.device_serial() - with patch.dict('os.environ', {}, clear=True): - adb = Adb() - adb.devices = MagicMock() - adb.devices.return_value = {"ABCDEF123456": "device"} - self.assertEqual(adb.device_serial(), "ABCDEF123456") - - with self.assertRaises(EnvironmentError): - adb = Adb() - adb.devices = MagicMock() - adb.devices.return_value = {} - adb.device_serial() - - def test_forward_list(self): - adb = Adb() - adb.version = MagicMock() - adb.version.return_value = ['1.0.31', '1', '0', '31'] - adb.raw_cmd = MagicMock() - adb.raw_cmd.return_value.communicate.return_value = (b"014E05DE0F02000E tcp:9008 tcp:9008\r\n489328DKFL7DF tcp:9008 tcp:9008", b"") - self.assertEqual(adb.forward_list(), [['014E05DE0F02000E', 'tcp:9008', 'tcp:9008'], ['489328DKFL7DF', 'tcp:9008', 'tcp:9008']]) - - adb.version.return_value = ['1.0.29', '1', '0', '29'] - with self.assertRaises(EnvironmentError): - adb.forward_list() diff --git a/test/test_device.py b/test/test_device.py deleted file mode 100644 index 43f226b..0000000 --- a/test/test_device.py +++ /dev/null @@ -1,289 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import unittest -import re -import os.path -import codecs -from mock import MagicMock, call, patch -from uiautomator import AutomatorDevice, Selector - - -class TestDevice(unittest.TestCase): - - def setUp(self): - self.device = AutomatorDevice() - self.device.server = MagicMock() - self.device.server.jsonrpc = MagicMock() - self.device.server.jsonrpc_wrap = MagicMock() - - def test_info(self): - self.device.server.jsonrpc.deviceInfo = MagicMock() - self.device.server.jsonrpc.deviceInfo.return_value = {} - self.assertEqual(self.device.info, {}) - self.device.server.jsonrpc.deviceInfo.assert_called_once_with() - - def test_click(self): - self.device.server.jsonrpc.click = MagicMock() - self.device.server.jsonrpc.click.return_value = True - self.assertEqual(self.device.click(1, 2), True) - self.device.server.jsonrpc.click.assert_called_once_with(1, 2) - - def test_swipe(self): - self.device.server.jsonrpc.swipe = MagicMock() - self.device.server.jsonrpc.swipe.return_value = True - self.assertEqual(self.device.swipe(1, 2, 3, 4, 100), True) - self.device.server.jsonrpc.swipe.assert_called_once_with(1, 2, 3, 4, 100) - - def test_long_click(self): - self.device.server.jsonrpc.swipe = MagicMock() - self.device.server.jsonrpc.swipe.return_value = True - x, y = 100, 200 - self.assertEqual(self.device.long_click(x, y), True) - self.device.server.jsonrpc.swipe.assert_called_once_with(x, y, x+1, y+1, 100) - - def test_drag(self): - self.device.server.jsonrpc.drag = MagicMock() - self.device.server.jsonrpc.drag.return_value = True - self.assertEqual(self.device.drag(1, 2, 3, 4, 100), True) - self.device.server.jsonrpc.drag.assert_called_once_with(1, 2, 3, 4, 100) - - def test_dump(self): - self.device.server.jsonrpc.dumpWindowHierarchy = MagicMock() - with codecs.open(os.path.join(os.path.dirname(__file__), "res", "layout.xml"), "r", encoding="utf8") as f: - xml = f.read() - self.device.server.jsonrpc.dumpWindowHierarchy.return_value = xml - self.assertEqual(self.device.dump("/tmp/test.xml"), xml) - self.device.server.jsonrpc.dumpWindowHierarchy.assert_called_once_with(True, None) - self.assertEqual(self.device.dump("/tmp/test.xml", False), xml) - - raw_xml = "".join(re.split(r"\n[ ]*", xml)) - self.device.server.jsonrpc.dumpWindowHierarchy.return_value = raw_xml - self.assertTrue("\n " in self.device.dump("/tmp/test.xml")) - - def test_screenshot(self): - self.device.server.jsonrpc.takeScreenshot = MagicMock() - self.device.server.jsonrpc.takeScreenshot.return_value = "1.png" - self.device.server.adb.cmd = cmd = MagicMock() - self.device.server.screenshot = MagicMock() - self.device.server.screenshot.return_value = None - cmd.return_value.returncode = 0 - self.assertEqual(self.device.screenshot("a.png", 1.0, 99), "a.png") - self.device.server.jsonrpc.takeScreenshot.assert_called_once_with("screenshot.png", 1.0, 99) - self.assertEqual(cmd.call_args_list, [call("pull", "1.png", "a.png"), call("shell", "rm", "1.png")]) - - self.device.server.jsonrpc.takeScreenshot.return_value = None - self.assertEqual(self.device.screenshot("a.png", 1.0, 100), None) - - def test_freeze_rotation(self): - self.device.server.jsonrpc.freezeRotation = MagicMock() - self.device.freeze_rotation(True) - self.device.freeze_rotation(False) - self.assertEqual(self.device.server.jsonrpc.freezeRotation.call_args_list, [call(True), call(False)]) - - def test_orientation(self): - self.device.server.jsonrpc.deviceInfo = MagicMock() - orientation = { - 0: "natural", - 1: "left", - 2: "upsidedown", - 3: "right" - } - for i in range(4): - self.device.server.jsonrpc.deviceInfo.return_value = {"displayRotation": i} - self.assertEqual(self.device.orientation, orientation[i]) - # set - orientations = [ - (0, "natural", "n", 0), - (1, "left", "l", 90), - (2, "upsidedown", "u", 180), - (3, "right", "r", 270) - ] - for values in orientations: - for value in values: - self.device.server.jsonrpc.setOrientation = MagicMock() - self.device.orientation = value - self.device.server.jsonrpc.setOrientation.assert_called_once_with(values[1]) - - with self.assertRaises(ValueError): - self.device.orientation = "invalid orientation" - - def test_last_traversed_text(self): - self.device.server.jsonrpc.getLastTraversedText = MagicMock() - self.device.server.jsonrpc.getLastTraversedText.return_value = "abcdef" - self.assertEqual(self.device.last_traversed_text, "abcdef") - self.device.server.jsonrpc.getLastTraversedText.assert_called_once_with() - - def test_clear_traversed_text(self): - self.device.server.jsonrpc.clearLastTraversedText = MagicMock() - self.device.clear_traversed_text() - self.device.server.jsonrpc.clearLastTraversedText.assert_called_once_with() - - def test_open(self): - self.device.server.jsonrpc.openNotification = MagicMock() - self.device.open.notification() - self.device.server.jsonrpc.openNotification.assert_called_once_with() - self.device.server.jsonrpc.openQuickSettings = MagicMock() - self.device.open.quick_settings() - self.device.server.jsonrpc.openQuickSettings.assert_called_once_with() - - def test_watchers(self): - names = ["a", "b", "c"] - self.device.server.jsonrpc.getWatchers = MagicMock() - self.device.server.jsonrpc.getWatchers.return_value = names - self.assertEqual(self.device.watchers, names) - self.device.server.jsonrpc.getWatchers.assert_called_once_with() - - self.device.server.jsonrpc.hasAnyWatcherTriggered = MagicMock() - self.device.server.jsonrpc.hasAnyWatcherTriggered.return_value = True - self.assertEqual(self.device.watchers.triggered, True) - self.device.server.jsonrpc.hasAnyWatcherTriggered.assert_called_once_with() - - self.device.server.jsonrpc.removeWatcher = MagicMock() - self.device.watchers.remove("a") - self.device.server.jsonrpc.removeWatcher.assert_called_once_with("a") - self.device.server.jsonrpc.removeWatcher = MagicMock() - self.device.watchers.remove() - self.assertEqual(self.device.server.jsonrpc.removeWatcher.call_args_list, [call(name) for name in names]) - - self.device.server.jsonrpc.resetWatcherTriggers = MagicMock() - self.device.watchers.reset() - self.device.server.jsonrpc.resetWatcherTriggers.assert_called_once_with() - - self.device.server.jsonrpc.runWatchers = MagicMock() - self.device.watchers.run() - self.device.server.jsonrpc.runWatchers.assert_called_once_with() - - def test_watcher(self): - self.device.server.jsonrpc.hasWatcherTriggered = MagicMock() - self.device.server.jsonrpc.hasWatcherTriggered.return_value = False - self.assertFalse(self.device.watcher("name").triggered) - self.device.server.jsonrpc.hasWatcherTriggered.assert_called_once_with("name") - - self.device.server.jsonrpc.removeWatcher = MagicMock() - self.device.watcher("a").remove() - self.device.server.jsonrpc.removeWatcher.assert_called_once_with("a") - - self.device.server.jsonrpc.registerClickUiObjectWatcher = MagicMock() - condition1 = {"text": "my text", "className": "android"} - condition2 = {"description": "my desc", "clickable": True} - target = {"className": "android.widget.Button", "text": "OK"} - self.device.watcher("watcher").when(**condition1).when(**condition2).click(**target) - self.device.server.jsonrpc.registerClickUiObjectWatcher.assert_called_once_with( - "watcher", - [Selector(**condition1), Selector(**condition2)], - Selector(**target) - ) - - self.device.server.jsonrpc.registerPressKeyskWatcher = MagicMock() - self.device.watcher("watcher2").when(**condition1).when(**condition2).press.back.home.power("menu") - self.device.server.jsonrpc.registerPressKeyskWatcher.assert_called_once_with( - "watcher2", [Selector(**condition1), Selector(**condition2)], ("back", "home", "power", "menu")) - - def test_press(self): - key = ["home", "back", "left", "right", "up", "down", "center", - "menu", "search", "enter", "delete", "del", "recent", - "volume_up", "volume_down", "volume_mute", "camera", "power"] - self.device.server.jsonrpc.pressKey = MagicMock() - self.device.server.jsonrpc.pressKey.return_value = True - self.assertTrue(self.device.press.home()) - self.device.server.jsonrpc.pressKey.return_value = False - self.assertFalse(self.device.press.back()) - self.device.server.jsonrpc.pressKey.return_value = False - for k in key: - self.assertFalse(self.device.press(k)) - self.assertEqual(self.device.server.jsonrpc.pressKey.call_args_list, [call("home"), call("back")] + [call(k) for k in key]) - - self.device.server.jsonrpc.pressKeyCode.return_value = True - self.assertTrue(self.device.press(1)) - self.assertTrue(self.device.press(1, 2)) - self.assertEqual(self.device.server.jsonrpc.pressKeyCode.call_args_list, [call(1), call(1, 2)]) - - def test_wakeup(self): - self.device.server.jsonrpc.wakeUp = MagicMock() - self.device.wakeup() - self.device.server.jsonrpc.wakeUp.assert_called_once_with() - - self.device.server.jsonrpc.wakeUp = MagicMock() - self.device.screen.on() - self.device.server.jsonrpc.wakeUp.assert_called_once_with() - - self.device.server.jsonrpc.wakeUp = MagicMock() - self.device.screen("on") - self.device.server.jsonrpc.wakeUp.assert_called_once_with() - - def test_screen_status(self): - self.device.server.jsonrpc.deviceInfo = MagicMock() - self.device.server.jsonrpc.deviceInfo.return_value = {"screenOn": True} - self.assertTrue(self.device.screen == "on") - self.assertTrue(self.device.screen != "off") - - self.device.server.jsonrpc.deviceInfo.return_value = {"screenOn": False} - self.assertTrue(self.device.screen == "off") - self.assertTrue(self.device.screen != "on") - - def test_sleep(self): - self.device.server.jsonrpc.sleep = MagicMock() - self.device.sleep() - self.device.server.jsonrpc.sleep.assert_called_once_with() - - self.device.server.jsonrpc.sleep = MagicMock() - self.device.screen.off() - self.device.server.jsonrpc.sleep.assert_called_once_with() - - self.device.server.jsonrpc.sleep = MagicMock() - self.device.screen("off") - self.device.server.jsonrpc.sleep.assert_called_once_with() - - def test_wait_idle(self): - self.device.server.jsonrpc_wrap.return_value.waitForIdle = MagicMock() - self.device.server.jsonrpc_wrap.return_value.waitForIdle.return_value = True - self.assertTrue(self.device.wait.idle(timeout=10)) - self.device.server.jsonrpc_wrap.return_value.waitForIdle.assert_called_once_with(10) - - self.device.server.jsonrpc_wrap.return_value.waitForIdle = MagicMock() - self.device.server.jsonrpc_wrap.return_value.waitForIdle.return_value = False - self.assertFalse(self.device.wait("idle", timeout=10)) - self.device.server.jsonrpc_wrap.return_value.waitForIdle.assert_called_once_with(10) - - def test_wait_update(self): - self.device.server.jsonrpc_wrap.return_value.waitForWindowUpdate = MagicMock() - self.device.server.jsonrpc_wrap.return_value.waitForWindowUpdate.return_value = True - self.assertTrue(self.device.wait.update(timeout=10, package_name="android")) - self.device.server.jsonrpc_wrap.return_value.waitForWindowUpdate.assert_called_once_with("android", 10) - - self.device.server.jsonrpc_wrap.return_value.waitForWindowUpdate = MagicMock() - self.device.server.jsonrpc_wrap.return_value.waitForWindowUpdate.return_value = False - self.assertFalse(self.device.wait("update", timeout=100, package_name="android")) - self.device.server.jsonrpc_wrap.return_value.waitForWindowUpdate.assert_called_once_with("android", 100) - - def test_get_info_attr(self): - info = {"test_a": 1, "test_b": "string", "displayWidth": 720, "displayHeight": 1024} - self.device.server.jsonrpc.deviceInfo = MagicMock() - self.device.server.jsonrpc.deviceInfo.return_value = info - for k in info: - self.assertEqual(getattr(self.device, k), info[k]) - self.assertEqual(self.device.width, info["displayWidth"]) - self.assertEqual(self.device.height, info["displayHeight"]) - with self.assertRaises(AttributeError): - self.device.not_exists - - def test_device_obj(self): - with patch("uiautomator.AutomatorDeviceObject") as AutomatorDeviceObject: - kwargs = {"text": "abc", "description": "description...", "clickable": True} - self.device(**kwargs) - AutomatorDeviceObject.assert_called_once_with(self.device, Selector(**kwargs)) - - with patch("uiautomator.AutomatorDeviceObject") as AutomatorDeviceObject: - AutomatorDeviceObject.return_value.exists = True - self.assertTrue(self.device.exists(clickable=True)) - AutomatorDeviceObject.return_value.exists = False - self.assertFalse(self.device.exists(text="...")) - - -class TestDeviceWithSerial(unittest.TestCase): - - def test_serial(self): - with patch('uiautomator.AutomatorServer') as AutomatorServer: - AutomatorDevice("abcdefhijklmn") - AutomatorServer.assert_called_once_with(serial="abcdefhijklmn", local_port=None, adb_server_host=None, adb_server_port=None) diff --git a/test/test_device_obj.py b/test/test_device_obj.py deleted file mode 100644 index 568dab5..0000000 --- a/test/test_device_obj.py +++ /dev/null @@ -1,469 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import unittest -from mock import MagicMock, call -from uiautomator import AutomatorDeviceObject, Selector, AutomatorDeviceNamedUiObject - - -class TestDeviceObjInit(unittest.TestCase): - - def setUp(self): - self.device = MagicMock() - self.device.server.jsonrpc = MagicMock() - - def test_init(self): - kwargs = {"text": "text", "className": "android"} - self.device_obj = AutomatorDeviceObject(self.device, - Selector(**kwargs)) - self.assertEqual(self.device_obj.selector, - Selector(**kwargs)) - self.assertEqual(self.device_obj.jsonrpc, - self.device.server.jsonrpc) - - -class TestDeviceObj(unittest.TestCase): - - def setUp(self): - self.device = MagicMock() - self.jsonrpc = self.device.server.jsonrpc = MagicMock() - self.jsonrpc_wrap = self.device.server.jsonrpc_wrap = MagicMock() - self.kwargs = {"text": "text", "className": "android"} - self.obj = AutomatorDeviceObject(self.device, - Selector(**self.kwargs)) - - def test_child_selector(self): - kwargs = {"text": "child text", "className": "android"} - obj = self.obj.child_selector(**kwargs) - self.assertEqual(len(obj.selector['childOrSibling']), 1) - self.assertEqual(obj.selector['childOrSibling'][0], 'child') - self.assertEqual(len(obj.selector['childOrSiblingSelector']), 1) - self.assertEqual(obj.selector['childOrSiblingSelector'][0], Selector(**kwargs)) - - def test_from_parent(self): - kwargs = {"text": "parent text", "className": "android"} - obj = self.obj.from_parent(**kwargs) - self.assertEqual(len(obj.selector['childOrSibling']), 1) - self.assertEqual(obj.selector['childOrSibling'][0], 'sibling') - self.assertEqual(len(obj.selector['childOrSiblingSelector']), 1) - self.assertEqual(obj.selector['childOrSiblingSelector'][0], Selector(**kwargs)) - - def test_exists(self): - self.jsonrpc.exist = MagicMock() - self.jsonrpc.exist.return_value = True - self.assertTrue(self.obj.exists) - - self.jsonrpc.exist.return_value = False - self.assertFalse(self.obj.exists) - - self.assertEqual(self.jsonrpc.exist.call_args_list, - [call(self.obj.selector), - call(self.obj.selector)]) - - def test_info(self): - info = {"text": "item text"} - self.jsonrpc.objInfo.return_value = info - self.assertEqual(self.obj.info, - info) - self.jsonrpc.objInfo.assert_called_once_with(self.obj.selector) - - def test_info_attr(self): - info = {'contentDescription': '', - 'checked': False, - 'scrollable': False, - 'text': '', - 'packageName': 'android', - 'selected': False, - 'enabled': True, - 'bounds': {'top': 0, - 'left': 0, - 'right': 720, - 'bottom': 1184}, - 'className': - 'android.widget.FrameLayout', - 'focusable': False, - 'focused': False, - 'clickable': False, - 'checkable': False, - 'chileCount': 2, - 'longClickable': False, - 'visibleBounds': {'top': 0, - 'left': 0, - 'right': 720, - 'bottom': 1184}} - self.jsonrpc.objInfo.return_value = info - self.assertEqual(self.obj.info, info) - self.jsonrpc.objInfo.assert_called_once_with(self.obj.selector) - self.assertEqual(self.obj.description, info["contentDescription"]) - for k in info: - self.assertEqual(getattr(self.obj, k), info[k]) - - with self.assertRaises(AttributeError): - self.obj.not_exists - - def test_text(self): - self.jsonrpc.clearTextField = MagicMock() - self.obj.set_text(None) - self.obj.set_text("") - self.obj.clear_text() - self.assertEqual(self.jsonrpc.clearTextField.call_args_list, - [call(self.obj.selector), call(self.obj.selector), call(self.obj.selector)]) - - self.jsonrpc.setText.return_value = False - texts = ["abc", "123", "()#*$&"] - for text in texts: - self.assertFalse(self.obj.set_text(text)) - self.assertEqual(self.jsonrpc.setText.call_args_list, - [call(self.obj.selector, t) for t in texts]) - - def test_click(self): - self.jsonrpc.click.return_value = False - corners = ["tl", "topleft", "br", "bottomright"] - for c in corners: - self.assertFalse(self.obj.click(c)) - self.assertEqual(self.jsonrpc.click.call_args_list, - [call(self.obj.selector, c) for c in corners]) - - self.jsonrpc.click = MagicMock() - self.jsonrpc.click.return_value = True - corners = ["tl", "topleft", "br", "bottomright"] - for c in corners: - self.assertTrue(getattr(self.obj.click, c)()) - self.assertEqual(self.jsonrpc.click.call_args_list, - [call(self.obj.selector, c) for c in corners]) - - self.jsonrpc.click = MagicMock() - self.jsonrpc.click.return_value = True - self.assertTrue(self.obj.click()) - self.jsonrpc.click.assert_called_once_with(self.obj.selector) - - def test_click_wait(self): - self.jsonrpc.clickAndWaitForNewWindow.return_value = True - self.assertTrue(self.obj.click.wait(timeout=321)) - self.jsonrpc.clickAndWaitForNewWindow.assert_called_once_with(self.obj.selector, 321) - - def test_long_click(self): - self.jsonrpc.longClick.return_value = False - corners = ["tl", "topleft", "br", "bottomright"] - for c in corners: - self.assertFalse(self.obj.long_click(c)) - self.assertEqual(self.jsonrpc.longClick.call_args_list, - [call(self.obj.selector, c) for c in corners]) - - self.jsonrpc.longClick = MagicMock() - self.jsonrpc.longClick.return_value = True - corners = ["tl", "topleft", "br", "bottomright"] - for c in corners: - self.assertTrue(getattr(self.obj.long_click, c)()) - self.assertEqual(self.jsonrpc.longClick.call_args_list, - [call(self.obj.selector, c) for c in corners]) - - self.jsonrpc.longClick = MagicMock() - self.jsonrpc.longClick.return_value = True - self.assertTrue(self.obj.long_click()) - self.jsonrpc.longClick.assert_called_once_with(self.obj.selector) - - def test_long_click_using_swipe(self): - self.device.long_click.return_value = False - self.jsonrpc.objInfo.return_value = { - 'longClickable': False, - 'visibleBounds': { - 'top': 0, - 'bottom': 60, - 'left': 0, - 'right': 60 - } - } - corners = ["tl", "topleft", "br", "bottomright"] - for c in corners: - self.assertFalse(self.obj.long_click(c)) - self.assertEqual(self.device.long_click.call_args_list, - [call(10, 10), call(10, 10), call(50, 50), call(50, 50)]) - - self.device.long_click = MagicMock() - self.device.long_click.return_value = True - corners = ["tl", "topleft", "br", "bottomright"] - for c in corners: - self.assertTrue(getattr(self.obj.long_click, c)()) - self.assertEqual(self.device.long_click.call_args_list, - [call(10, 10), call(10, 10), call(50, 50), call(50, 50)]) - - self.device.long_click = MagicMock() - self.device.long_click.return_value = True - self.assertTrue(self.obj.long_click()) - self.device.long_click.assert_called_once_with(30, 30) - - def test_drag_to(self): - self.jsonrpc.dragTo.return_value = False - self.assertFalse(self.obj.drag.to(10, 20, steps=10)) - self.jsonrpc.dragTo.return_value = True - self.assertTrue(self.obj.drag.to(x=10, y=20, steps=20)) - - sel = {"text": "text..."} - self.assertTrue(self.obj.drag.to(steps=30, **sel)) - self.assertEqual(self.jsonrpc.dragTo.call_args_list, - [call(self.obj.selector, 10, 20, 10), - call(self.obj.selector, 10, 20, 20), - call(self.obj.selector, Selector(**sel), 30)]) - - def test_gesture(self): - self.jsonrpc.gesture.return_value = True - self.assertTrue(self.obj.gesture((1, 1), (2, 2), (3, 3), (4, 4), 100)) - self.assertTrue(self.obj.gesture(4, 3).to(2, 1, 20)) - self.assertEqual(self.jsonrpc.gesture.call_args_list, - [call(self.obj.selector, {'x':1, 'y': 1}, {'x':2, 'y':2}, {'x':3, 'y':3}, {'x':4, 'y':4}, 100), call(self.obj.selector, 4, 3, 2, 1, 20)]) - - def test_pinch(self): - self.jsonrpc.pinchIn.return_value = True - self.assertTrue(self.obj.pinch.In(percent=90, steps=30)) - self.assertTrue(self.obj.pinch("in", 80, 40)) - self.assertTrue(self.obj.pinch("In", 70, 50)) - self.assertEqual(self.jsonrpc.pinchIn.call_args_list, - [call(self.obj.selector, 90, 30), call(self.obj.selector, 80, 40), call(self.obj.selector, 70, 50)]) - - self.jsonrpc.pinchOut.return_value = True - self.assertTrue(self.obj.pinch.Out(percent=90, steps=30)) - self.assertTrue(self.obj.pinch("out", 80, 40)) - self.assertTrue(self.obj.pinch("Out", 70, 50)) - self.assertEqual(self.jsonrpc.pinchIn.call_args_list, - [call(self.obj.selector, 90, 30), call(self.obj.selector, 80, 40), call(self.obj.selector, 70, 50)]) - - def test_swipe(self): - self.jsonrpc.swipe.return_value = True - dirs = ["up", "down", "right", "left"] - for d in dirs: - self.assertTrue(self.obj.swipe(d, 30)) - self.assertEqual(self.jsonrpc.swipe.call_args_list, - [call(self.obj.selector, d, 30) for d in dirs]) - - self.jsonrpc.swipe = MagicMock() - self.jsonrpc.swipe.return_value = True - dirs = ["up", "down", "right", "left"] - for d in dirs: - self.assertTrue(getattr(self.obj.swipe, d)(steps=30)) - self.assertEqual(self.jsonrpc.swipe.call_args_list, - [call(self.obj.selector, d, 30) for d in dirs]) - - def test_fling(self): - self.jsonrpc.flingForward.return_value = True - self.assertTrue(self.obj.fling.horiz.forward()) - self.assertTrue(self.obj.fling.horizentally.forward()) - self.assertTrue(self.obj.fling.vert.forward()) - self.assertTrue(self.obj.fling()) - self.assertEqual(self.jsonrpc.flingForward.call_args_list, - [call(self.obj.selector, False), call(self.obj.selector, False), call(self.obj.selector, True), call(self.obj.selector, True)]) - - self.jsonrpc.flingBackward.return_value = True - self.assertTrue(self.obj.fling.horiz.backward()) - self.assertTrue(self.obj.fling.horizentally.backward()) - self.assertTrue(self.obj.fling.vert.backward()) - self.assertTrue(self.obj.fling.vertically.backward()) - self.assertEqual(self.jsonrpc.flingBackward.call_args_list, - [call(self.obj.selector, False), call(self.obj.selector, False), call(self.obj.selector, True), call(self.obj.selector, True)]) - - max_swipes = 1000 - self.jsonrpc.flingToBeginning.return_value = True - self.assertTrue(self.obj.fling.horiz.toBeginning()) - self.assertTrue(self.obj.fling.horizentally.toBeginning()) - self.assertTrue(self.obj.fling.vert.toBeginning()) - self.assertTrue(self.obj.fling.vertically.toBeginning(max_swipes=100)) - self.assertEqual(self.jsonrpc.flingToBeginning.call_args_list, - [call(self.obj.selector, False, max_swipes), call(self.obj.selector, False, max_swipes), call(self.obj.selector, True, max_swipes), call(self.obj.selector, True, 100)]) - - self.jsonrpc.flingToEnd.return_value = True - self.assertTrue(self.obj.fling.horiz.toEnd()) - self.assertTrue(self.obj.fling.horizentally.toEnd()) - self.assertTrue(self.obj.fling.vert.toEnd()) - self.assertTrue(self.obj.fling.vertically.toEnd(max_swipes=100)) - self.assertEqual(self.jsonrpc.flingToEnd.call_args_list, - [call(self.obj.selector, False, max_swipes), call(self.obj.selector, False, max_swipes), call(self.obj.selector, True, max_swipes), call(self.obj.selector, True, 100)]) - - def test_scroll(self): - steps = 100 - max_swipes = 1000 - self.jsonrpc.scrollForward.return_value = True - self.assertTrue(self.obj.scroll.horiz.forward()) - self.assertTrue(self.obj.scroll.horizentally.forward()) - self.assertTrue(self.obj.scroll.vert.forward()) - self.assertTrue(self.obj.scroll(steps=20)) - self.assertEqual(self.jsonrpc.scrollForward.call_args_list, - [call(self.obj.selector, False, steps), call(self.obj.selector, False, steps), call(self.obj.selector, True, steps), call(self.obj.selector, True, 20)]) - - self.jsonrpc.scrollBackward.return_value = True - self.assertTrue(self.obj.scroll.horiz.backward()) - self.assertTrue(self.obj.scroll.horizentally.backward()) - self.assertTrue(self.obj.scroll.vert.backward()) - self.assertTrue(self.obj.scroll.vertically.backward(steps=20)) - self.assertEqual(self.jsonrpc.scrollBackward.call_args_list, - [call(self.obj.selector, False, steps), call(self.obj.selector, False, steps), call(self.obj.selector, True, steps), call(self.obj.selector, True, 20)]) - - self.jsonrpc.scrollToBeginning.return_value = True - self.assertTrue(self.obj.scroll.horiz.toBeginning()) - self.assertTrue(self.obj.scroll.horizentally.toBeginning()) - self.assertTrue(self.obj.scroll.vert.toBeginning()) - self.assertTrue(self.obj.scroll.vertically.toBeginning(steps=20, max_swipes=100)) - self.assertEqual(self.jsonrpc.scrollToBeginning.call_args_list, - [call(self.obj.selector, False, max_swipes, steps), call(self.obj.selector, False, max_swipes, steps), call(self.obj.selector, True, max_swipes, steps), call(self.obj.selector, True, 100, 20)]) - - self.jsonrpc.scrollToEnd.return_value = True - self.assertTrue(self.obj.scroll.horiz.toEnd()) - self.assertTrue(self.obj.scroll.horizentally.toEnd()) - self.assertTrue(self.obj.scroll.vert.toEnd()) - self.assertTrue(self.obj.scroll.vertically.toEnd(steps=20, max_swipes=100)) - self.assertEqual(self.jsonrpc.scrollToEnd.call_args_list, - [call(self.obj.selector, False, max_swipes, steps), call(self.obj.selector, False, max_swipes, steps), call(self.obj.selector, True, max_swipes, steps), call(self.obj.selector, True, 100, 20)]) - - info = {"text": "..."} - self.jsonrpc.scrollTo.return_value = True - self.assertTrue(self.obj.scroll.horiz.to(**info)) - self.assertTrue(self.obj.scroll.horizentally.to(**info)) - self.assertTrue(self.obj.scroll.vert.to(**info)) - self.assertTrue(self.obj.scroll.vertically.to(**info)) - self.assertEqual(self.jsonrpc.scrollTo.call_args_list, - [call(self.obj.selector, Selector(**info), False), call(self.obj.selector, Selector(**info), False), call(self.obj.selector, Selector(**info), True), call(self.obj.selector, Selector(**info), True)]) - - def test_wait(self): - timeout = 3000 - self.jsonrpc_wrap.return_value.waitUntilGone.return_value = True - self.assertTrue(self.obj.wait.gone()) - self.jsonrpc_wrap.return_value.waitUntilGone.assert_called_once_with(self.obj.selector, timeout) - self.jsonrpc_wrap.return_value.waitForExists.return_value = True - self.assertTrue(self.obj.wait.exists(timeout=10)) - self.jsonrpc_wrap.return_value.waitForExists.assert_called_once_with(self.obj.selector, 10) - - def test_child_by_text(self): - self.jsonrpc.childByText.return_value = "myname" - kwargs = {"className": "android", "text": "patern match text"} - generic_obj = self.obj.child_by_text("child text", **kwargs) - self.jsonrpc.childByText.assert_called_once_with(Selector(**self.kwargs), Selector(**kwargs), "child text") - self.assertEqual("myname", generic_obj.selector) - - def test_child_by_text_allow_scroll_search(self): - self.jsonrpc.childByText.return_value = "myname" - kwargs = {"className": "android", "text": "patern match text"} - generic_obj = self.obj.child_by_text("child text", allow_scroll_search=False, **kwargs) - self.jsonrpc.childByText.assert_called_once_with( - Selector(**self.kwargs), Selector(**kwargs), "child text", False) - self.assertEqual("myname", generic_obj.selector) - - def test_child_by_description(self): - self.jsonrpc.childByDescription.return_value = "myname" - kwargs = {"className": "android", "text": "patern match text"} - generic_obj = self.obj.child_by_description("child text", **kwargs) - self.jsonrpc.childByDescription.assert_called_once_with( - Selector(**self.kwargs), Selector(**kwargs), "child text") - self.assertEqual("myname", generic_obj.selector) - - def test_child_by_description_allow_scroll_search(self): - self.jsonrpc.childByDescription.return_value = "myname" - kwargs = {"className": "android", "text": "patern match text"} - generic_obj = self.obj.child_by_description("child text", allow_scroll_search=False, **kwargs) - self.jsonrpc.childByDescription.assert_called_once_with( - Selector(**self.kwargs), Selector(**kwargs), "child text", False) - self.assertEqual("myname", generic_obj.selector) - - def test_child_by_instance(self): - self.jsonrpc.childByInstance.return_value = "myname" - kwargs = {"className": "android", "text": "patern match text"} - generic_obj = self.obj.child_by_instance(1234, **kwargs) - self.jsonrpc.childByInstance.assert_called_once_with(Selector(**self.kwargs), Selector(**kwargs), 1234) - self.assertEqual("myname", generic_obj.selector) - - def test_count(self): - self.jsonrpc.count.return_value = 10 - self.assertEqual(self.obj.count, 10) - self.jsonrpc.count.assert_called_once_with(Selector(**self.kwargs)) - - def test_len(self): - self.jsonrpc.count.return_value = 10 - self.assertEqual(len(self.obj), 10) - - def test_instance_list(self): - count = 10 - self.jsonrpc.count.return_value = count - for i in range(count): - self.assertEqual(self.obj[i].selector["instance"], i) - with self.assertRaises(IndexError): - self.obj[count] - self.jsonrpc.count.return_value = 1 - self.assertEqual(self.obj[0], self.obj) - - def test_instance_iter(self): - count = 10 - self.jsonrpc.count.return_value = count - for index, inst in enumerate(self.obj): - self.assertEqual(inst.selector["instance"], index) - - def test_left(self): - self.jsonrpc.objInfo.side_effect = [ - {"bounds": {'top': 200, 'bottom': 250, 'left': 100, 'right': 150}}, - {"bounds": {'top': 250, 'bottom': 300, 'left': 150, 'right': 200}}, - {"bounds": {'top': 200, 'bottom': 300, 'left': 150, 'right': 200}}, - {"bounds": {'top': 200, 'bottom': 300, 'left': 50, 'right': 100}} - ] - self.jsonrpc.count.return_value = 3 - self.assertEqual(self.obj.left().selector["instance"], 2) - - def test_right(self): - self.jsonrpc.objInfo.side_effect = [ - {"bounds": {'top': 200, 'bottom': 250, 'left': 100, 'right': 150}}, - {"bounds": {'top': 250, 'bottom': 300, 'left': 150, 'right': 200}}, - {"bounds": {'top': 200, 'bottom': 300, 'left': 50, 'right': 100}}, - {"bounds": {'top': 200, 'bottom': 300, 'left': 150, 'right': 200}} - ] - self.jsonrpc.count.return_value = 3 - self.assertEqual(self.obj.right().selector["instance"], 2) - - def test_up(self): - self.jsonrpc.objInfo.side_effect = [ - {"bounds": {'top': 200, 'bottom': 250, 'left': 100, 'right': 150}}, - {"bounds": {'top': 250, 'bottom': 300, 'left': 100, 'right': 150}}, - {"bounds": {'top': 150, 'bottom': 200, 'left': 150, 'right': 200}}, - {"bounds": {'top': 150, 'bottom': 200, 'left': 100, 'right': 200}} - ] - self.jsonrpc.count.return_value = 3 - self.assertEqual(self.obj.up().selector["instance"], 2) - - def test_down(self): - self.jsonrpc.objInfo.side_effect = [ - {"bounds": {'top': 200, 'bottom': 250, 'left': 100, 'right': 150}}, - {"bounds": {'top': 250, 'bottom': 300, 'left': 150, 'right': 200}}, - {"bounds": {'top': 150, 'bottom': 200, 'left': 150, 'right': 200}}, - {"bounds": {'top': 250, 'bottom': 300, 'left': 100, 'right': 150}} - ] - self.jsonrpc.count.return_value = 3 - self.assertEqual(self.obj.down().selector["instance"], 2) - - def test_multiple_matched_down(self): - self.jsonrpc.objInfo.side_effect = [ - {"bounds": {'top': 200, 'bottom': 250, 'left': 100, 'right': 150}}, - {"bounds": {'top': 250, 'bottom': 300, 'left': 150, 'right': 200}}, - {"bounds": {'top': 150, 'bottom': 200, 'left': 150, 'right': 200}}, - {"bounds": {'top': 275, 'bottom': 300, 'left': 100, 'right': 150}}, - {"bounds": {'top': 300, 'bottom': 350, 'left': 100, 'right': 150}}, - {"bounds": {'top': 250, 'bottom': 275, 'left': 100, 'right': 150}} - ] - self.jsonrpc.count.return_value = 5 - self.assertEqual(self.obj.down().selector["instance"], 4) - -class TestAutomatorDeviceNamedUiObject(unittest.TestCase): - - def setUp(self): - self.device = MagicMock() - self.jsonrpc = self.device.server.jsonrpc = MagicMock() - self.name = "my-name" - self.obj = AutomatorDeviceNamedUiObject(self.device, self.name) - - def test_child(self): - self.jsonrpc.getChild.return_value = "another-name" - kwargs = {"className": "android", "text": "patern match text"} - generic_obj = self.obj.child(**kwargs) - self.jsonrpc.getChild.assert_called_once_with(self.name, Selector(**kwargs)) - self.assertEqual(generic_obj.selector, self.jsonrpc.getChild.return_value) - - def test_sibling(self): - self.jsonrpc.getFromParent.return_value = "another-name" - kwargs = {"className": "android", "text": "patern match text"} - generic_obj = self.obj.sibling(**kwargs) - self.jsonrpc.getFromParent.assert_called_once_with(self.name, Selector(**kwargs)) - self.assertEqual(generic_obj.selector, self.jsonrpc.getFromParent.return_value) diff --git a/test/test_jsonrpc.py b/test/test_jsonrpc.py deleted file mode 100644 index e72c643..0000000 --- a/test/test_jsonrpc.py +++ /dev/null @@ -1,126 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import unittest -from mock import MagicMock, patch -from uiautomator import JsonRPCMethod, JsonRPCClient -import os - - -class TestJsonRPCMethod_id(unittest.TestCase): - - def test_id(self): - method = JsonRPCMethod("", "method", 30) - self.assertTrue(isinstance(method.id(), str)) - self.assertTrue(len(method.id()) > 0) - for i in range(100): - self.assertNotEqual(method.id(), method.id()) - - -class TestJsonRPCMethod_call(unittest.TestCase): - - def setUp(self): - self.url = "http://localhost/jsonrpc" - self.timeout = 20 - self.method_name = "ping" - self.id = "fGasV62G" - self.method = JsonRPCMethod(self.url, self.method_name, self.timeout) - self.method.id = MagicMock() - self.method.id.return_value = self.id - try: - import urllib2 - self.urlopen_patch = patch('urllib2.urlopen') - except: - self.urlopen_patch = patch('urllib.request.urlopen') - finally: - self.urlopen = self.urlopen_patch.start() - - def tearDown(self): - self.urlopen_patch.stop() - - def test_normal_call(self): - return_mock = self.urlopen.return_value - return_mock.getcode.return_value = 200 - - return_mock.read.return_value = b'{"result": "pong", "error": null, "id": "DKNCJDLDJJ"}' - self.assertEqual("pong", self.method()) - self.method.id.assert_called_once_with() - - return_mock.read.return_value = b'{"result": "pong", "id": "JDLSFJLILJEMNC"}' - self.assertEqual("pong", self.method()) - self.assertEqual("pong", self.method(1, 2, "str", {"a": 1}, ["1"])) - self.assertEqual("pong", self.method(a=1, b=2)) - - def test_normal_call_error(self): - return_mock = self.urlopen.return_value - - return_mock.getcode.return_value = 500 - with self.assertRaises(Exception): - self.method() - - return_mock.getcode.return_value = 200 - return_mock.read.return_value = b'{"result": "pong", "error": {"code": -513937, "message": "error message."}, "id": "fGasV62G"}' - with self.assertRaises(Exception): - self.method() - return_mock.read.assert_called_with() - - return_mock.getcode.return_value = 200 - return_mock.read.return_value = b'{"result": null, "error": null, "id": "fGasV62G"}' - with self.assertRaises(SyntaxError): - self.method(1, 2, kwarg1="") - - -class TestJsonRPCClient(unittest.TestCase): - - def setUp(self): - self.url = "http://localhost/jsonrpc" - self.timeout = 20 - - def test_jsonrpc(self): - with patch('uiautomator.JsonRPCMethod') as JsonRPCMethod: - client = JsonRPCClient(self.url, self.timeout, JsonRPCMethod) - JsonRPCMethod.return_value = "Ok" - self.assertEqual(client.ping, "Ok") - JsonRPCMethod.assert_called_once_with(self.url, "ping", timeout=self.timeout) - - JsonRPCMethod.return_value = {"width": 10, "height": 20} - self.assertEqual(client.info, {"width": 10, "height": 20}) - JsonRPCMethod.assert_called_with(self.url, "info", timeout=self.timeout) - - -class TestJsonRPCMethod_call_on_windows(unittest.TestCase): - - def setUp(self): - self.os_name = os.name - os.name = "nt" - self.url = "http://localhost/jsonrpc" - self.timeout = 20 - self.method_name = "ping" - self.id = "fGasV62G" - self.method = JsonRPCMethod(self.url, self.method_name, self.timeout) - self.method.pool = MagicMock() - self.method.id = MagicMock() - self.method.id.return_value = self.id - - def tearDown(self): - os.name = self.os_name - - def test_normal_call(self): - urlopen = self.method.pool.urlopen - urlopen.return_value.status = 200 - - urlopen.return_value.data = b'{"result": "pong", "error": null, "id": "DKNCJDLDJJ"}' - self.assertEqual("pong", self.method()) - self.method.id.assert_called_once_with() - - urlopen.return_value.data = b'{"result": "pong", "id": "JDLSFJLILJEMNC"}' - self.assertEqual("pong", self.method()) - self.assertEqual("pong", self.method(1, 2, "str", {"a": 1}, ["1"])) - self.assertEqual("pong", self.method(a=1, b=2)) - - def test_normal_call_error(self): - urlopen = self.method.pool.urlopen - urlopen.return_value.status = 500 - - with self.assertRaises(Exception): - self.method() diff --git a/test/test_misc.py b/test/test_misc.py deleted file mode 100644 index 4f7d971..0000000 --- a/test/test_misc.py +++ /dev/null @@ -1,48 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import unittest -import uiautomator -import os -from mock import patch - - -class TestMisc(unittest.TestCase): - - def test_proxy(self): - self.assertTrue('no_proxy' in os.environ) - self.assertTrue('localhost' in os.environ.get('no_proxy', '')) - - def test_load(self): - try: - from imp import reload - except: - pass - reload(uiautomator) - self.assertIsNotNone(uiautomator.device) - self.assertIsNotNone(uiautomator.rect) - self.assertIsNotNone(uiautomator.point) - - def test_rect(self): - import random - for i in range(10): - top, left = random.randint(0, 100), random.randint(0, 100) - bottom, right = random.randint(101, 1024), random.randint(101, 720) - self.assertEqual(uiautomator.rect(top, left, bottom, right), {"top": top, "left": left, "bottom": bottom, "right": right}) - - def test_point(self): - import random - for i in range(10): - x, y = random.randint(0, 1024), random.randint(0, 720) - self.assertEqual(uiautomator.point(x, y), {"x": x, "y": y}) - - def test_next_port(self): - with patch('socket.socket') as socket: - socket.return_value.connect_ex.side_effect = [0, 0, 1] - uiautomator._init_local_port = 9007 - self.assertEqual(uiautomator.next_local_port(), 9010) - - with patch('socket.socket') as socket: - socket.return_value.connect_ex.return_value = 1 - uiautomator._init_local_port = 32764 - self.assertEqual(uiautomator.next_local_port(), 9008) diff --git a/test/test_param_to_property.py b/test/test_param_to_property.py deleted file mode 100644 index 16e3dca..0000000 --- a/test/test_param_to_property.py +++ /dev/null @@ -1,62 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import unittest -from uiautomator import param_to_property - - -class TestParamToProperty(unittest.TestCase): - - def test_props(self): - args_value = None - kwargs_value = None - - @param_to_property("one", "two", "three") - def func(*args, **kwargs): - self.assertEqual(args, args_value) - self.assertEqual(kwargs, kwargs_value) - - args_value = ("one", "two", "three") - kwargs_value = {"test": 1} - func.one.two.three(test=1) - args_value = ("one", "three") - kwargs_value = {"another_test": 100} - func.one("three", another_test=100) - args_value = ("one", "two", "three") - kwargs_value = {} - func("one", "two", "three") - args_value = ("three", "one", "two") - kwargs_value = {} - func.three("one", "two") - - def test_kwprops(self): - args_value = None - kwargs_value = None - - @param_to_property(key=["one", "two", "three"]) - def func(*args, **kwargs): - self.assertEqual(args, args_value) - self.assertEqual(kwargs, kwargs_value) - - args_value = (1,) - kwargs_value = {"key": "one"} - func.one(1) - args_value = (2, 3) - kwargs_value = {"key": "two"} - func.two(2, 3) - args_value = () - kwargs_value = {} - func() - - def test_error(self): - @param_to_property(key=["one", "two", "three"]) - def func(*args, **kwargs): - pass - - with self.assertRaises(AttributeError): - func.one.one - - with self.assertRaises(SyntaxError): - @param_to_property("a", "b", key=["one", "two", "three"]) - def func(*args, **kwargs): - pass diff --git a/test/test_selector.py b/test/test_selector.py deleted file mode 100644 index ff07d8f..0000000 --- a/test/test_selector.py +++ /dev/null @@ -1,106 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import unittest -from uiautomator import Selector - - -class TestSelector(unittest.TestCase): - - fields = { - "text": (0x01, None), # MASK_TEXT, - "textContains": (0x02, None), # MASK_TEXTCONTAINS, - "textMatches": (0x04, None), # MASK_TEXTMATCHES, - "textStartsWith": (0x08, None), # MASK_TEXTSTARTSWITH, - "className": (0x10, None), # MASK_CLASSNAME - "classNameMatches": (0x20, None), # MASK_CLASSNAMEMATCHES - "description": (0x40, None), # MASK_DESCRIPTION - "descriptionContains": (0x80, None), # MASK_DESCRIPTIONCONTAINS - "descriptionMatches": (0x0100, None), # MASK_DESCRIPTIONMATCHES - "descriptionStartsWith": (0x0200, None), # MASK_DESCRIPTIONSTARTSWITH - "checkable": (0x0400, False), # MASK_CHECKABLE - "checked": (0x0800, False), # MASK_CHECKED - "clickable": (0x1000, False), # MASK_CLICKABLE - "longClickable": (0x2000, False), # MASK_LONGCLICKABLE, - "scrollable": (0x4000, False), # MASK_SCROLLABLE, - "enabled": (0x8000, False), # MASK_ENABLED, - "focusable": (0x010000, False), # MASK_FOCUSABLE, - "focused": (0x020000, False), # MASK_FOCUSED, - "selected": (0x040000, False), # MASK_SELECTED, - "packageName": (0x080000, None), # MASK_PACKAGENAME, - "packageNameMatches": (0x100000, None), # MASK_PACKAGENAMEMATCHES, - "resourceId": (0x200000, None), # MASK_RESOURCEID, - "resourceIdMatches": (0x400000, None), # MASK_RESOURCEIDMATCHES, - "index": (0x800000, 0), # MASK_INDEX, - "instance": (0x01000000, 0) # MASK_INSTANCE, - } - mask = "mask" - - def test_init(self): - sel = Selector() - self.assertEqual(sel[self.mask], 0) - self.assertEqual(sel["childOrSibling"], []) - self.assertEqual(sel["childOrSiblingSelector"], []) - - def test_add(self): - for k, v in self.fields.items(): - kwargs = {k: v[1]} - sel = Selector(**kwargs) - self.assertEqual(sel[self.mask], v[0]) - - for k1, v1 in self.fields.items(): - for k2, v2 in self.fields.items(): - if k1 != k2: - kwargs = {k1: v1[1], k2: v2[1]} - sel = Selector(**kwargs) - self.assertEqual(sel[self.mask], v1[0] | v2[0]) - - def test_delete(self): - for k, v in self.fields.items(): - kwargs = {k: v[1]} - sel = Selector(**kwargs) - del sel[k] - self.assertEqual(sel[self.mask], 0) - - for k1, v1 in self.fields.items(): - for k2, v2 in self.fields.items(): - if k1 != k2: - kwargs = {k1: v1[1], k2: v2[1]} - sel = Selector(**kwargs) - del sel[k1] - self.assertEqual(sel[self.mask], v2[0]) - del sel[k2] - self.assertEqual(sel[self.mask], 0) - - def test_error(self): - with self.assertRaises(ReferenceError): - Selector(text1="") - - def test_child_and_sibling(self): - sel = Selector() - sel.child(text="...") - self.assertEqual(sel["childOrSibling"], ["child"]) - self.assertEqual(sel["childOrSiblingSelector"], [Selector(text="...")]) - - sel.sibling(text="---") - self.assertEqual(sel["childOrSibling"], ["child", "sibling"]) - self.assertEqual(sel["childOrSiblingSelector"], [Selector(text="..."), Selector(text="---")]) - - def test_clone(self): - kwargs = { - "text": "1234", - "description": "desc...", - "clickable": True, - "focusable": False, - "packageName": "android" - } - sel = Selector(**kwargs) - sel.child(text="1") - sel.sibling(text="1") - sel.child(text="1") - - clone = sel.clone() - for k in kwargs: - self.assertEqual(sel[k], clone[k]) - self.assertEqual(sel["childOrSibling"], clone["childOrSibling"]) - self.assertEqual(sel["childOrSiblingSelector"], clone["childOrSiblingSelector"]) diff --git a/test/test_server.py b/test/test_server.py deleted file mode 100644 index e5e7169..0000000 --- a/test/test_server.py +++ /dev/null @@ -1,192 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import unittest -from mock import MagicMock, patch, call -from uiautomator import AutomatorServer, JsonRPCError - - -class TestAutomatorServer(unittest.TestCase): - - def setUp(self): - self.Adb_patch = patch('uiautomator.Adb') - self.Adb = self.Adb_patch.start() - - def tearDown(self): - self.Adb.stop() - - def test_local_port(self): - self.assertEqual(AutomatorServer("1234", 9010).local_port, 9010) - self.Adb.assert_called_once_with(serial="1234", adb_server_host=None, adb_server_port=None) - - def test_local_port_forwarded(self): - self.Adb.return_value.forward_list.return_value = [ - ("1234", "tcp:1001", "tcp:9009"), - ("1234", "tcp:1000", "tcp:9008") - ] - self.Adb.return_value.device_serial.return_value = "1234" - self.assertEqual(AutomatorServer("1234").local_port, 1000) - - def test_local_port_scanning(self): - with patch('uiautomator.next_local_port') as next_local_port: - self.Adb.return_value.forward_list.return_value = [] - next_local_port.return_value = 1234 - self.assertEqual(AutomatorServer("abcd", None).local_port, - next_local_port.return_value) - - next_local_port.return_value = 14321 - self.Adb.return_value.forward_list.return_value = Exception("error") - self.assertEqual(AutomatorServer("abcd", None).local_port, - next_local_port.return_value) - - def test_device_port(self): - self.assertEqual(AutomatorServer().device_port, 9008) - - def test_start_success(self): - server = AutomatorServer() - server.push = MagicMock() - server.push.return_value = ["bundle.jar", "uiautomator-stub.jar"] - server.ping = MagicMock() - server.ping.return_value = "pong" - server.adb = MagicMock() - server.start() - server.adb.cmd.assert_valled_onec_with('shell', 'uiautomator', 'runtest', 'bundle.jar', 'uiautomator-stub.jar', '-c', 'com.github.uiautomatorstub.Stub') - - def test_start_error(self): - server = AutomatorServer() - server.push = MagicMock() - server.push.return_value = ["bundle.jar", "uiautomator-stub.jar"] - server.ping = MagicMock() - server.ping.return_value = None - server.adb = MagicMock() - with patch("time.sleep"): - with self.assertRaises(IOError): - server.start() - - def test_auto_start(self): - try: - import urllib2 - except ImportError: - import urllib.request as urllib2 - with patch("uiautomator.JsonRPCMethod") as JsonRPCMethod: - returns = [urllib2.URLError("error"), "ok"] - def side_effect(): - result = returns.pop(0) - if isinstance(result, Exception): - raise result - return result - JsonRPCMethod.return_value.side_effect = side_effect - server = AutomatorServer() - server.start = MagicMock() - server.stop = MagicMock() - self.assertEqual("ok", server.jsonrpc.any_method()) - server.start.assert_called_once_with(timeout=30) - with patch("uiautomator.JsonRPCMethod") as JsonRPCMethod: - returns = [JsonRPCError(-32000-1, "error msg"), "ok"] - def side_effect(): - result = returns.pop(0) - if isinstance(result, Exception): - raise result - return result - JsonRPCMethod.return_value.side_effect = side_effect - server = AutomatorServer() - server.start = MagicMock() - server.stop = MagicMock() - self.assertEqual("ok", server.jsonrpc.any_method()) - server.start.assert_called_once_with() - with patch("uiautomator.JsonRPCMethod") as JsonRPCMethod: - JsonRPCMethod.return_value.side_effect = JsonRPCError(-32000-2, "error msg") - server = AutomatorServer() - server.start = MagicMock() - server.stop = MagicMock() - with self.assertRaises(JsonRPCError): - server.jsonrpc.any_method() - - def test_start_ping(self): - with patch("uiautomator.JsonRPCClient") as JsonRPCClient: - JsonRPCClient.return_value.ping.return_value = "pong" - server = AutomatorServer() - server.adb = MagicMock() - server.adb.forward.return_value = 0 - self.assertEqual(server.ping(), "pong") - - def test_start_ping_none(self): - with patch("uiautomator.JsonRPCClient") as JsonRPCClient: - JsonRPCClient.return_value.ping.side_effect = Exception("error") - server = AutomatorServer() - self.assertEqual(server.ping(), None) - - -class TestAutomatorServer_Stop(unittest.TestCase): - - def setUp(self): - try: - import urllib2 - self.urlopen_patch = patch('urllib2.urlopen') - except: - self.urlopen_patch = patch('urllib.request.urlopen') - finally: - self.urlopen = self.urlopen_patch.start() - - def tearDown(self): - self.urlopen_patch.stop() - - def test_screenshot(self): - server = AutomatorServer() - server.sdk_version = MagicMock() - server.sdk_version.return_value = 17 - self.assertEqual(server.screenshot(), None) - - server.sdk_version.return_value = 18 - self.urlopen.return_value.read = MagicMock() - self.urlopen.return_value.read.return_value = b"123456" - self.assertEqual(server.screenshot(), b"123456") - self.assertEqual(server.screenshot("/tmp/test.txt"), "/tmp/test.txt") - - def test_push(self): - jars = ["bundle.jar", "uiautomator-stub.jar"] - server = AutomatorServer() - server.adb = MagicMock() - self.assertEqual(set(server.push()), set(jars)) - for args in server.adb.cmd.call_args_list: - self.assertEqual(args[0][0], "push") - self.assertEqual(args[0][2], "/data/local/tmp/") - - def test_stop_started_server(self): - server = AutomatorServer() - server.adb = MagicMock() - server.uiautomator_process = process = MagicMock() - process.poll.return_value = None - server.stop() - process.wait.assert_called_once_with() - - server.uiautomator_process = process = MagicMock() - process.poll.return_value = None - self.urlopen.side_effect = IOError("error") - server.stop() - process.kill.assert_called_once_with() - - def test_stop(self): - results = [ - b"USER PID PPID VSIZE RSS WCHAN PC NAME\n\rsystem 372 126 635596 104808 ffffffff 00000000 S uiautomator", - b"USER PID PPID VSIZE RSS WCHAN PC NAME\r\nsystem 372 126 635596 104808 ffffffff 00000000 S uiautomator", - b"USER PID PPID VSIZE RSS WCHAN PC NAME\nsystem 372 126 635596 104808 ffffffff 00000000 S uiautomator", - b"USER PID PPID VSIZE RSS WCHAN PC NAME\rsystem 372 126 635596 104808 ffffffff 00000000 S uiautomator" - ] - for r in results: - server = AutomatorServer() - server.adb = MagicMock() - server.adb.cmd.return_value.communicate.return_value = (r, "") - server.stop() - self.assertEqual(server.adb.cmd.call_args_list, - [call("shell", "ps", "-C", "uiautomator"), call("shell", "kill", "-9", "372")]) - - -class TestJsonRPCError(unittest.TestCase): - - def testJsonRPCError(self): - e = JsonRPCError(200, "error") - self.assertEqual(200, e.code) - self.assertTrue(len(str(e)) > 0) - e = JsonRPCError("200", "error") - self.assertEqual(200, e.code) diff --git a/uiautomator/__init__.py b/uiautomator/__init__.py index 677b0b7..1748504 100644 --- a/uiautomator/__init__.py +++ b/uiautomator/__init__.py @@ -5,22 +5,21 @@ import sys import os +import traceback import subprocess import time +import base64 import itertools import json import hashlib -import socket -import re +import socket,threading +import re,tempfile import collections import xml.dom.minidom - -DEVICE_PORT = int(os.environ.get('UIAUTOMATOR_DEVICE_PORT', '9008')) -LOCAL_PORT = int(os.environ.get('UIAUTOMATOR_LOCAL_PORT', '9008')) - -if 'localhost' not in os.environ.get('no_proxy', ''): - os.environ['no_proxy'] = "localhost,%s" % os.environ.get('no_proxy', '') - +from functools import wraps +from imgUtil import ImageUtil +from comparison import isMatch, getMatchedCenterOffset +from chromdriver import ChromeDriver try: import urllib2 except ImportError: @@ -34,11 +33,31 @@ import urllib3 except: # to fix python setup error on Windows. pass +# Set default logging handler to avoid "No handler found" warnings. +import logging __author__ = "Xiaocong He" __all__ = ["device", "Device", "rect", "point", "Selector", "JsonRPCError"] +logger = logging.getLogger('auto_runner') +if len(logger.handlers) == 0: + handler = logging.StreamHandler() + handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s %(message)s')) + logger.addHandler(handler) + logger.setLevel(logging.INFO) + +DEVICE_PORT = int(os.environ.get('UIAUTOMATOR_DEVICE_PORT', '9008')) +LOCAL_PORT = int(os.environ.get('UIAUTOMATOR_LOCAL_PORT', '9008')) + +if 'localhost' not in os.environ.get('no_proxy', ''): + os.environ['no_proxy'] = "localhost,%s" % os.environ.get('no_proxy', '') + +u2_version_code=21 + +reload(sys) +sys.setdefaultencoding('utf-8') + def U(x): if sys.version_info.major == 2: return x.decode('utf-8') if type(x) is str else x @@ -77,6 +96,27 @@ def __call__(self, *args, **kwargs): return self.func(*new_args, **kwargs) return Wrapper +def stopUiautomator(url): + port = url.split(":")[2].split("/")[0] + serial = None + try: + lines = systemCmd(['adb','forward','--list']).communicate()[0].decode("utf-8").strip().splitlines() + for s, lp, rp in [line.strip().split() for line in lines]: + if lp == 'tcp:%s'%port and rp=='tcp:9008': + serial = s + break + except: + pass + if serial: + os.system("adb -s %s shell am force-stop com.github.uiautomator"%serial) + + +def systemCmd(cmd_line): + '''exec system cmd, paramas list''' + if os.name != "nt": + cmd_line = [" ".join(cmd_line)] + return subprocess.Popen(cmd_line, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + class JsonRPCError(Exception): @@ -86,8 +126,7 @@ def __init__(self, code, message): def __str__(self): return "JsonRPC Error code: %d, Message: %s" % (self.code, self.message) - - + class JsonRPCMethod(object): if os.name == 'nt': @@ -98,7 +137,7 @@ class JsonRPCMethod(object): def __init__(self, url, method, timeout=30): self.url, self.method, self.timeout = url, method, timeout - + def __call__(self, *args, **kwargs): if args and kwargs: raise SyntaxError("Could not accept both *args and **kwargs as JSONRPC parameters.") @@ -107,25 +146,38 @@ def __call__(self, *args, **kwargs): data["params"] = args elif kwargs: data["params"] = kwargs - jsonresult = {"result": ""} + params = data.get('params')[0] if data.get('params') else "" + if params: + try: + params = json.dumps({'parmas':params},ensure_ascii=False) + except: + params = str(params) + logger.info('exec u2 cmd: %s %s'%(self.method, params)) + result = None if os.name == "nt": res = self.pool.urlopen("POST", - self.url, - headers={"Content-Type": "application/json"}, - body=json.dumps(data).encode("utf-8"), - timeout=self.timeout) - jsonresult = json.loads(res.data.decode("utf-8")) + self.url, + headers={"Content-Type": "application/json"}, + body=json.dumps(data).encode("utf-8"), + timeout=self.timeout) + content_type = res.headers['Content-Type'] + result = res.data else: - result = None + res = None try: req = urllib2.Request(self.url, - json.dumps(data).encode("utf-8"), - {"Content-type": "application/json"}) - result = urllib2.urlopen(req, timeout=self.timeout) - jsonresult = json.loads(result.read().decode("utf-8")) + json.dumps(data).encode("utf-8"), + {"Content-type": "application/json"}) + res = urllib2.urlopen(req, timeout=self.timeout) + content_type = res.info().getheader('Content-Type') + result = res.read() finally: - if result is not None: - result.close() + if res is not None: + res.close() + if self.method == "screenshot": + if content_type == "image/png": + return result + jsonresult = json.loads(result.decode("utf-8")) if "error" in jsonresult and jsonresult["error"]: raise JsonRPCError( jsonresult["error"]["code"], @@ -136,6 +188,7 @@ def __call__(self, *args, **kwargs): def id(self): m = hashlib.md5() m.update(("%s at %f" % (self.method, time.time())).encode("utf-8")) +# m.update("i am uiautomator".encode("utf-8")) return m.hexdigest() @@ -220,7 +273,7 @@ def child(self, **kwargs): def sibling(self, **kwargs): self[self.__childOrSibling].append("sibling") self[self.__childOrSiblingSelector].append(Selector(**kwargs)) - return self + return self child_selector, from_parent = child, sibling @@ -289,16 +342,17 @@ def raw_cmd(self, *args): cmd_line = [self.adb()] + self.adbHostPortOptions + list(args) if os.name != "nt": cmd_line = [" ".join(cmd_line)] - return subprocess.Popen(cmd_line, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + logger.info('exec adb cmd: %s'%" ".join(cmd_line)) + return subprocess.Popen(cmd_line, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) def device_serial(self): if not self.default_serial: devices = self.devices() if devices: - if len(devices) is 1: + if len(devices): self.default_serial = list(devices.keys())[0] else: - raise EnvironmentError("Multiple devices attached but default android serial not set.") + raise EnvironmentError("not device found.") else: raise EnvironmentError("Device not attached.") return self.default_serial @@ -314,7 +368,11 @@ def devices(self): def forward(self, local_port, device_port): '''adb port forward. return 0 if success, else non-zero.''' - return self.cmd("forward", "tcp:%d" % local_port, "tcp:%d" % device_port).wait() + return self.cmd("forward", "tcp:%s" % local_port, "tcp:%s" % device_port).wait() + + def forward_localabstract(self,local_port, localabstract): + '''adb port forward. return 0 if success, else non-zero.''' + return self.cmd("forward", "tcp:%s" % local_port, localabstract).wait() def forward_list(self): '''adb forward --list''' @@ -323,12 +381,126 @@ def forward_list(self): raise EnvironmentError("Low adb version.") lines = self.raw_cmd("forward", "--list").communicate()[0].decode("utf-8").strip().splitlines() return [line.strip().split() for line in lines] + + def remove_forward_port(self,port): + self.cmd("forward", "--remove", "tcp:%s" % port).wait() def version(self): '''adb version''' match = re.search(r"(\d+)\.(\d+)\.(\d+)", self.raw_cmd("version").communicate()[0].decode("utf-8")) return [match.group(i) for i in range(4)] - + + def getVersionCode(self, packageName): + '''adb dumpsys package myPackageName''' + versionCode = 0 + try: + out = self.cmd('shell','dumpsys', 'package', packageName).communicate()[0] + for line in out.strip().splitlines(): + tmp = line.strip() + if tmp.startswith("versionCode="): + versionCode = int(tmp.split(" ")[0].split("=")[1]) + break + except: + pass + return versionCode + + def checkPackageStatus(self, packageName='com.github.uiautomator'): # 包已卸载,需要确定文件实体 + try: + out = self.cmd('shell','dumpsys', 'package', packageName).communicate()[0] + for line in out.strip().splitlines(): + tmp = line.strip() + if tmp.find('Unable to find package: %s'%packageName) == -1: + return True + except: + pass + return False + + + def current_app(self): + '''return packagename activity''' + out = self.cmd('shell','dumpsys', 'window', 'w').communicate()[0] + flag = False + packageName = None + for line in out.strip().splitlines(): + if 'mCurrentFocus' in line: + current_info = line[:-1].split(" ")[4] + if "/" in current_info: + return (current_info.split('/')[0],current_info.split('/')[1]) + else: + if current_info.split('/')[0] == "StatusBar": + return (current_info.split('/')[0],None) + else: + flag = True + packageName = current_info.split('/')[0] + if flag and "mFocusedApp" in line: + return (packageName, line[line.find(packageName)+len(packageName)+1:].split(" ")[0]) + + def start_app(self, package_activity): + '''start app''' + out = self.cmd('shell','am', 'start', package_activity).communicate()[0] + result = [] + for line in out.strip().splitlines(): + tmp = line.strip() + result.append(tmp) + return result + + def shell(self, *args, **kwargs): + '''adb shell command''' + self.cmd(*['shell'] + list(args)).wait() + + def force_stop(self, packageName): + '''force stop package''' + self.shell('am','force-stop', packageName) + + def install(self, params, apkpath): + out = self.cmd('install', params, apkpath).communicate()[0].strip().splitlines() + return out + + def stop_third_app(self, ignore_filter=["com.tencent.mm"]): + '''force stop third app''' + ignore_filter_target = ['com.github.uiautomator','com.github.uiautomator.test'] + ignore_filter_target += ignore_filter + for line in self.cmd('shell','pm','list','package','-3').communicate()[0].strip().splitlines(): + if 'package:' in line: + package_name = line[len('package:'):] + if not package_name in ignore_filter_target: + self.force_stop(package_name) + @property + def airplane_mode(self): + my_self = self + class _AirplaneMode(object): + def on(self): + my_self.shell('settings put global airplane_mode_on 1') + time.sleep(2) + my_self.shell('am broadcast -a android.intent.action.AIRPLANE_MODE --ez state true') + def off(self): + my_self.shell('settings put global airplane_mode_on 0') + time.sleep(2) + my_self.shell('am broadcast -a android.intent.action.AIRPLANE_MODE --ez state false') + return _AirplaneMode() + + + @property + def ime(self): # 输入法相关操作 + myself = self + class IME(object): + def availables(self): + return filter(lambda x:True if x else False, myself.cmd('shell','ime','list','-s').communicate()[0].strip().splitlines()) + + def default(self): + return myself.cmd('shell','settings','get','secure','default_input_method').communicate()[0].strip().splitlines() + + def enable(self, imeId): + myself.shell('ime', 'enable', imeId) + + def disable(self, imeId): + myself.shell('ime', 'disable', imeId) + + def set(self, imeId): + myself.shell('ime', 'set', imeId) + + return IME() + _init_local_port = LOCAL_PORT - 1 @@ -384,13 +556,17 @@ def __init__(self, serial=None, local_port=None, device_port=None, adb_server_ho else: try: # first we will try to use the local port already adb forwarded for s, lp, rp in self.adb.forward_list(): - if s == self.adb.device_serial() and rp == 'tcp:%d' % self.device_port: + if s == self.adb.device_serial() and rp == 'tcp:%s'%self.device_port: self.local_port = int(lp[4:]) break else: self.local_port = next_local_port(adb_server_host) except: self.local_port = next_local_port(adb_server_host) + self.wait_time = 0 + + def set_think_time(self, wait_time): + self.wait_time = wait_time def push(self): base_dir = os.path.dirname(__file__) @@ -402,10 +578,16 @@ def push(self): def install(self): base_dir = os.path.dirname(__file__) for apk in self.__apk_files: - self.adb.cmd("install", "-r -t", os.path.join(base_dir, apk)).wait() + self.adb.cmd("install", "-r", "-t", os.path.join(base_dir, apk)).wait() + + def uninstall(self): + self.adb.cmd('uninstall','com.github.uiautomator').wait() + self.adb.cmd('uninstall', 'com.github.uiautomator.test').wait() @property def jsonrpc(self): + if self.wait_time != 0: + time.sleep(self.wait_time) return self.jsonrpc_wrap(timeout=int(os.environ.get("jsonrpc_timeout", 90))) def jsonrpc_wrap(self, timeout): @@ -457,26 +639,39 @@ def sdk_version(self): except: pass return self.__sdk - - def start(self, timeout=5): - if self.sdk_version() < 18: + + def start(self,timeout=5): + '''add retry 2 times''' + try: + time.sleep(4) # startup delay 4 seconds + self._start(timeout) + except: + self.uninstall() + self.stop() + time.sleep(4) + self._start(timeout) + + def _start(self, timeout=5): + sdk = self.sdk_version() + if sdk != 0 and sdk < 18: files = self.push() cmd = list(itertools.chain( ["shell", "uiautomator", "runtest"], files, - ["-c", "com.github.uiautomatorstub.Stub"] + ["-c", "com.github.uiautomatorstub.Stub"], + ["--nohup"] )) else: - self.install() - cmd = ["shell", "am", "instrument", "-w", - "com.github.uiautomator.test/android.support.test.runner.AndroidJUnitRunner"] - + if self.checkVersion(): + self.install() + cmd = ["shell", "am", "instrument", "-r", "-w", + "com.github.uiautomator.test/android.support.test.runner.AndroidJUnitRunner"] self.uiautomator_process = self.adb.cmd(*cmd) self.adb.forward(self.local_port, self.device_port) - + time.sleep(4) while not self.alive and timeout > 0: - time.sleep(0.1) - timeout -= 0.1 + time.sleep(0.5) + timeout -= 0.5 if not self.alive: raise IOError("RPC server not started!") @@ -484,7 +679,13 @@ def ping(self): try: return self.__jsonrpc().ping() except: - return None + pass + + def checkVersion(self): + ''' check uiautomator apk version ''' + version_code = self.adb.getVersionCode('com.github.uiautomator') + package_status = self.adb.checkPackageStatus('com.github.uiautomator') + return True if (u2_version_code > version_code) or not package_status else False @property def alive(self): @@ -513,6 +714,11 @@ def stop(self): self.adb.cmd("shell", "kill", "-9", line.split()[index]).wait() except: pass + try: + self.adb.cmd("shell", "am", "force-stop", 'com.github.uiautomator').wait() + except: + pass + @property def stop_uri(self): @@ -525,11 +731,12 @@ def rpc_uri(self): @property def screenshot_uri(self): return "http://%s:%d/screenshot/0" % (self.adb.adb_server_host, self.local_port) + def screenshot(self, filename=None, scale=1.0, quality=100): if self.sdk_version() >= 18: try: - req = urllib2.Request("%s?scale=%f&quality=%f" % (self.screenshot_uri, scale, quality)) + req = urllib2.Request("%s?scale=%s&quality=%s" % (self.screenshot_uri, scale, quality)) result = urllib2.urlopen(req, timeout=30) if filename: with open(filename, 'wb') as f: @@ -564,7 +771,17 @@ def __init__(self, serial=None, local_port=None, adb_server_host=None, adb_serve adb_server_host=adb_server_host, adb_server_port=adb_server_port ) - + self.adb = self.server.adb + self.webdriver = None + + def set_think_time(self,wait_time): + '''uiautomator steps wait time''' + self.server.set_think_time(wait_time) + + def set_debug(self, mode): + '''uiautomator debug mode pring log''' + logger.setLevel(mode) + def __call__(self, **kwargs): return AutomatorDeviceObject(self, Selector(**kwargs)) @@ -587,9 +804,9 @@ def click(self, x, y): '''click at arbitrary coordinates.''' return self.server.jsonrpc.click(x, y) - def long_click(self, x, y): + def long_click(self, x, y, duration=0): '''long click at arbitrary coordinates.''' - return self.swipe(x, y, x + 1, y + 1) + return self.server.jsonrpc.long_click(x, y, duration) def swipe(self, sx, sy, ex, ey, steps=100): return self.server.jsonrpc.swipe(sx, sy, ex, ey, steps) @@ -616,20 +833,50 @@ def dump(self, filename=None, compressed=True, pretty=True): content = U(xml_text.toprettyxml(indent=' ')) return content - def screenshot(self, filename, scale=1.0, quality=100): + def screenshot(self, filename=None, scale=1.0, quality=100): '''take screenshot.''' result = self.server.screenshot(filename, scale, quality) if result: return result - - device_file = self.server.jsonrpc.takeScreenshot("screenshot.png", - scale, quality) - if not device_file: - return None - p = self.server.adb.cmd("pull", device_file, filename) - p.wait() - self.server.adb.cmd("shell", "rm", device_file).wait() - return filename if p.returncode is 0 else None + if filename is None: + filename = tempfile.mktemp() + png = "/data/local/tmp/screen_shot.png" + self.server.adb.cmd("shell", "screencap", "-p", png).wait() + self.server.adb.cmd("pull", png, filename).wait() + self.server.adb.cmd("shell", "rm", png).wait() + if os.path.exists(filename): + with open(filename,'rb') as f: + return f.read() + + def screenshot_custom(self, filename='task_image_name.jpg', fomart='jpeg', quality=100): + ''' + take screenshot custom + fomart: 'jpeg, png, webp' + quality: compress ratio + ''' + result = self.server.jsonrpc.screenshot_custom(filename, fomart, quality) + return result + + @property + def takeScreenshot(self): + my_self = self + class _ScreenShot(object): + def device(self, filename=None, scale=1.0, quality=100): + return my_self.screenshot(filename, scale, quality) + + def custom(self, filename='task_image_name.jpg', fomart='jpeg', quality=100): + ''' + take screenshot custom + fomart: 'jpeg, png, webp' + quality: compress ratio + ''' + return my_self.server.jsonrpc.screenshot_custom(filename, fomart, quality) + + def crop(self, left, top, width, height, customWidth, customHeight, filename="screenshot.png", imageFormat="png"): + return my_self.server.jsonrpc.takeScreenshot(left, top, width, height, customWidth, customHeight, filename, imageFormat) + + return _ScreenShot() + def freeze_rotation(self, freeze=True): '''freeze or unfreeze the device rotation in current status.''' @@ -665,6 +912,10 @@ def last_traversed_text(self): def clear_traversed_text(self): '''clear the last traversed text.''' self.server.jsonrpc.clearLastTraversedText() + + def set_text(self, content): + '''shell input set test''' + self.adb.shell('input text %s'%content) @property def open(self): @@ -678,8 +929,10 @@ def open(self): def _open(action): if action == "notification": return self.server.jsonrpc.openNotification() - else: + elif action == "quick_settings": return self.server.jsonrpc.openQuickSettings() + elif action == 'recent_apps': + return self.server.jsonrpc.openRecentApps() return _open @property @@ -780,12 +1033,25 @@ def press(self): "menu", "search", "enter", "delete", "del", "recent", "volume_up", "volume_down", "volume_mute", "camera", "power"] ) - def _press(key, meta=None): + def _press(key, meta=None, num=1): if isinstance(key, int): return self.server.jsonrpc.pressKeyCode(key, meta) if meta else self.server.jsonrpc.pressKeyCode(key) else: + if key == "back": + return self.back(num) return self.server.jsonrpc.pressKey(str(key)) return _press + + def back(self, num=1): + '''force back''' + def _back(): + self.adb.shell("input keyevent 4") + while num>0: + t = threading.Thread(target=_back) + t.setDaemon(True) + t.start() + time.sleep(0.2) + num -= 1 def wakeup(self): '''turn on screen in case of screen off.''' @@ -794,6 +1060,30 @@ def wakeup(self): def sleep(self): '''turn off screen in case of screen on.''' self.server.jsonrpc.sleep() + + def start_activity(self, packageActivity): + '''start activity''' + self.adb.start_app(packageActivity) + + def wait_time(self, wait_time): + '''wait time relate python sleep''' + time.sleep(wait_time) + + @property + def clipboard(self): + devive_self = self + class _Clipboard(object): + def set(self, content, content_type='text'): + return devive_self.server.jsonrpc.setClipboard(content_type, content) + + def get(self, content_type="text"): + return devive_self.server.jsonrpc.getClipboard(content_type) + + def clear(self): + return devive_self.server.jsonrpc.clearClipboard() + + return _Clipboard() + @property def screen(self): @@ -861,6 +1151,301 @@ def _wait(action, timeout=1000, package_name=None): def exists(self, **kwargs): '''Check if the specified ui object by kwargs exists.''' return self(**kwargs).exists + + def stop_third_app(self,ignore_filter=["com.tencent.mm"]): + '''停止第三方app''' + self.adb.stop_third_app(ignore_filter) + + @property + def configurator(self): + ''' + :Args: + actionAcknowledgmentTimeout, default:3000ms + keyInjectionDelay, default:0ms + scrollAcknowledgmentTimeout, default: 200ms + waitForIdleTimeout default: 10000ms + waitForSelectorTimeout default: 10000ms + :Usage: + d.configurator.set() + d.configurator.info() + d.configurator.restore() + ''' + device.self = self + class _ConfiguratorInfo(object): + def info(self): + return device.self.server.jsonrpc.getConfigurator() + def set(self, **kwargs): + config_info = {} + for k in kwargs: + config_info[k] = kwargs[k] + return device.self.server.jsonrpc.setConfigurator(config_info) + def restore(self): + return device.self.server.jsonrpc.setConfigurator({'flag':True}) + return _ConfiguratorInfo() + + @property + def toast(self): + device_self = self + class _Toast(object): + def on(self): + return device_self.server.jsonrpc.toast('on') + def off(self): + return device_self.server.jsonrpc.toast('off') + return _Toast() + + @property + def img_tz(self): + device_self = self + class _Img(object): + def exists(self, query, origin=None, interval=2, timeout=4, algorithm='sift', threshold=0.75, colormode=0): + if origin: + try: + pos = ImageUtil.find_image_positon(query, origin, algorithm, threshold,colormode) + if pos: + return True + except: + pass + return False + begin = time.time() + isExists = False + src_img_path = tempfile.mktemp() + device_self.screenshot(src_img_path) + while (time.time() - begin < timeout): + time.sleep(interval) + device_self.screenshot(src_img_path) + try: + pos = ImageUtil.find_image_positon(query, src_img_path, algorithm, threshold, colormode) + if pos: + isExists = True + except: + pass + if not isExists: + time.sleep(interval) + del_file(src_img_path) + continue + del_file(src_img_path) + return isExists + + def click(self, query, origin=None, algorithm='sift', threshold=0.75, colormode=0): + pos = self.get_location(query, origin, algorithm, threshold, colormode) + if pos: + device_self.click(pos[0],pos[1]) + else: + raise AssertionError("not find sub img on big img") + + def get_location(self, query, origin=None, algorithm='sift', threshold=0.75, colormode=0): + src_img_path = origin + if src_img_path is None: + src_img_path = tempfile.mktemp() + device_self.screenshot(src_img_path) + if not os.path.exists(src_img_path): + raise IOError('path not origin img') + try: + pos = ImageUtil.find_image_positon(query, src_img_path, algorithm, threshold, colormode) + return pos + except: + raise + finally: + if origin is None: + del_file(src_img_path) + + return _Img() + + @property + def img(self): + device_self = self + class _Img(object): + def exists(self, query, origin=None, interval=2, timeout=4, threshold=0.99,colormode=0): + threshold = 1 - threshold + if origin: + return isMatch(query, origin, threshold,colormode) + begin = time.time() + isExists = False + tmp = tempfile.mktemp() + while (time.time() - begin < timeout): + device_self.screenshot(tmp) + isExists = isMatch(query, tmp, threshold,colormode) + if not isExists: + time.sleep(interval) + del_file(tmp) + continue + del_file(tmp) + return isExists + + def click(self, query, origin=None, threshold=0.99, rotation=0,colormode=0): + threshold = 1 - threshold + pos = self.get_location(query, origin, threshold, rotation, colormode) + if pos: + device_self.click(pos[0], pos[1]) + else: + raise AssertionError("not find sub img on big img") + + def get_location(self, query, origin=None, threshold=0.99, rotation=0, colormode=0): + threshold = 1 - threshold + src_img_path = origin + if src_img_path is None: + src_img_path = tempfile.mktemp() + device_self.screenshot(src_img_path) + if not os.path.exists(src_img_path): + raise IOError('path not origin img') + try: + pos = getMatchedCenterOffset(query, src_img_path, threshold, rotation, colormode) + return pos + except: + raise + finally: + if origin is None: + del_file(src_img_path) + return _Img() + + @property + def webview(self): + if self.webdriver: + return self.webdriver + self.webdriver = ChromeDriver(self) + return self.webdriver + + def quit(self): + self.server.stop() + try: + if self.webdriver: + self.webdriver.quit() + except: + pass + + + def touchAction(self): + device_self = self + class _TouchAction(object): + def __init__(self): + self._actions = [] + self._x = 0 + self._y = 0 + def down(self,x,y): + self._add_action("touchDown", self._get_optx({'x':x,'y':y})) + return self + def up(self): + self._add_action("touchUp", {'x':self._x,'y':self._y}) + return self + def move_to(self,x,y): + self._add_action("moveTo", self._get_optx({'x':x,'y':y})) + return self + def wait(self,ms): + self._add_action("wait", {'s':ms}) + return self + def _add_action(self, action, options): + gesture = { + 'action': action, + 'options': options, + } + self._actions.append(gesture) + def _get_optx(self, opt): + self._x = opt['x'] + self._y = opt['y'] + return opt + + def perform(self): + try: + for action in self._actions: + act = action.get('action') + opt = action.get('options') + if act == "touchDown": + device_self.server.jsonrpc.touchDown(opt['x'],opt['y']) + if act == "moveTo": + device_self.server.jsonrpc.moveTo(opt['x'],opt['y']) + if act == "touchUp": + device_self.server.jsonrpc.touchUp(opt['x'],opt['y']) + if act == "wait": + ms = opt.get("s") + time.sleep(ms) + finally: + self._actions = [] + + return _TouchAction() + + def getPhoneInfo(self, simType=0): + """获取手机卡相关信息,参数为0,1,主卡,副卡""" + return self.server.jsonrpc.getPhoneInfo(simType) + + def default_sms(self, sms_app=None): + """获取或设置默认短信应该""" + return self.server.jsonrpc.defaultSms(sms_app) + + def getSmsInfo(self, num=1): + """获取短信相关内容""" + return self.server.jsonrpc.getSms(num) + + def writeSms(self, address, body, readStatus=0, type=1): + """写入短信 + :param address 1860299678 + :param body test + :param readStatus default 0 未读, 1 已读 + :param type default 1 收 2 发 + """ + return self.server.jsonrpc.writeSms(address, body, readStatus, type) + + def jumpAppDetail(self,packageName=None): + """跳应用详情""" + self.server.jsonrpc.jumpApp(packageName) + + def checkPermission(self, permission): + """检查权限""" + return self.server.jsonrpc.checkPermission(permission) + + def open_brower(self,url): + self.adb.shell('am','start', '-a', 'android.intent.action.VIEW', '-d', url) + + def remove_app(self, del_list=[]): + """移除指定app""" + if del_list: + for package_name in del_list: + try: + self.adb.shell('pm', 'uninstall', package_name) + except: + pass + + def double_click(self, x, y, ms=0.2): + """adb 命令行发行双击""" + def tt(x,y): + self.adb.shell("input tap {0} {1}".format(x,y)) + threading.Thread(target=tt, args=(x,y)).start() + time.sleep(ms) # 默认间隔200ms + threading.Thread(target=tt, args=(x,y)).start() + + @property + def request(self): + device_self = self + class _Request(object): + def get(self, url, data=None, files=None, headers=None): + result = device_self.server.jsonrpc.httpRequest("get", url, headers, data, files) + if result: + return json.loads(result) + + def post(self, url, data=None, files=None, headers=None): + result = device_self.server.jsonrpc.httpRequest("post", url, headers, data, files) + if result: + return json.loads(result) + + def send_file(self, file_path, filename=None): + if os.path.exists(file_path): + with open(file_path, "rb") as f: + content = base64.b64encode(f.read()) + return device_self.server.jsonrpc.sendFile(filename, content) + def get_cookie(self, url): + return device_self.server.jsonrpc.getCookie(url) + + def get_html(self, url): + return device_self.server.jsonrpc.getHtml(url) + return _Request() + + + def get_app_info(self, appname): + """通过appname获取app相关信息""" + return self.server.jsonrpc.getAppinfo(appname) + +def del_file(path): + if os.path.exists(path): + os.remove(path) Device = AutomatorDevice @@ -896,6 +1481,15 @@ def __getattr__(self, attr): def info(self): '''ui object info.''' return self.jsonrpc.objInfo(self.selector) + + @property + def location(self): + '''ui object location.''' + info = self.info + bounds = info.get("visibleBounds") or info.get("bounds") + x = (bounds["left"] + bounds["right"]) / 2 + y = (bounds["top"] + bounds["bottom"]) / 2 + return x,y def set_text(self, text): '''set the text field.''' @@ -927,6 +1521,11 @@ def _click(action=None, timeout=3000): else: return self.jsonrpc.clickAndWaitForNewWindow(self.selector, timeout) return _click + + def long_press(self, duration=0): + '''long press obj''' + return self.jsonrpc.longClick(self.selector, duration) + @property def long_click(self): @@ -937,14 +1536,14 @@ def long_click(self): d(text="Image").long_click.topleft() # long click on the topleft of the ui object d(text="Image").long_click.bottomright() # long click on the topleft of the ui object ''' - @param_to_property(corner=["tl", "topleft", "br", "bottomright"]) - def _long_click(corner=None): + @param_to_property(corner=["tl", "topleft", "br", "bottomright", "wait"]) + def _long_click(corner=None, duration=0): info = self.info if info["longClickable"]: - if corner: + if corner in ["tl", "topleft", "br", "bottomright"]: return self.jsonrpc.longClick(self.selector, corner) else: - return self.jsonrpc.longClick(self.selector) + return self.jsonrpc.longClick(self.selector, duration) else: bounds = info.get("visibleBounds") or info.get("bounds") if corner in ["tl", "topleft"]: @@ -956,7 +1555,7 @@ def _long_click(corner=None): else: x = (bounds["left"] + bounds["right"]) / 2 y = (bounds["top"] + bounds["bottom"]) / 2 - return self.device.long_click(x, y) + return self.device.long_click(x, y, duration) return _long_click @property @@ -1058,7 +1657,15 @@ def _wait(action, timeout=3000): ).waitUntilGone if action == "gone" else self.device.server.jsonrpc_wrap(timeout=http_timeout).waitForExists return method(self.selector, timeout) return _wait - + + def screenshot(self,filename=None, scale=1.0, quality=100): + '''element screen shot''' + result = self.jsonrpc.screenshot(self.selector, scale, quality) + if filename is None: + filename = tempfile.mktemp() + with open(filename, 'wb') as f: + f.write(result) + return filename class AutomatorDeviceNamedUiObject(AutomatorDeviceUiObject): @@ -1275,7 +1882,7 @@ def _scroll(dimention="vert", action="forward", **kwargs): elif action == "toBeginning": return __scroll_to_beginning(vertical, **kwargs) elif action == "toEnd": - return __scroll_to_end(vertical, **kwargs) + return __scroll_to_end(vertical, **kwargs) elif action == "to": return __scroll_to(vertical, **kwargs) return _scroll diff --git a/uiautomator/chromdriver.py b/uiautomator/chromdriver.py new file mode 100644 index 0000000..a7f6944 --- /dev/null +++ b/uiautomator/chromdriver.py @@ -0,0 +1,367 @@ +#coding=utf-8 +''' +Created on 2018年10月30日 +''' + +import subprocess,time +import json,os,sys,re,socket +try: + from selenium import webdriver +except: + pass +from hashlib import md5 +from distutils.version import StrictVersion + +try: + import urllib2 +except ImportError: + import urllib.request as urllib2 + +class RestartException(Exception): + pass + +# CHROM_VERSION_MAP = { +# # Chromedriver version map +# "2.43": "69-71", +# "2.42": "68-70", +# "2.41": "67-69", +# "2.40": "66-68", +# "2.39": "66-68", +# "2.38": "65-67", +# "2.37": "64-66", +# "2.36": "63-65", +# "2.35": "62-64", +# "2.34": "61-63", +# "2.33": "60-62", +# "2.32": "59-61", +# "2.31": "58-60", +# "2.30": "58-60", +# "2.29": "56-58", +# "2.28": "55-57", +# "2.27": "54-56", +# "2.26": "53-55", +# "2.25": "53-55", +# "2.24": "52-54", +# "2.23": "51-53", +# "2.22": "49-52", +# "2.21": "46-50", +# "2.20": "43-48", +# "2.19": "43-47", +# "2.18": "43-46", +# "2.17": "42-43", +# "2.13": "42-45", +# "2.15": "40-43", +# "2.14": "39-42", +# "2.13": "38-41", +# "2.12": "36-40", +# "2.11": "36-40", +# "2.10": "33-36", +# "2.9": "31-34", +# "2.8": "30-33", +# "2.7": "30-33", +# "2.6": "29-32", +# "2.5": "29-32", +# "2.4": "29-32", +# } +CHROM_VERSION_MAP = { + # Chromedriver version: minumum Chrome version + '2.42': '68.0.3440', + '2.41': '67.0.3396', + '2.40': '66.0.3359', + '2.39': '66.0.3359', + '2.38': '65.0.3325', + '2.37': '64.0.3282', + '2.36': '63.0.3239', + '2.35': '62.0.3202', + '2.34': '61.0.3163', + '2.33': '60.0.3112', + '2.32': '59.0.3071', + '2.31': '58.0.3029', + '2.30': '58.0.3029', + '2.29': '57.0.2987', + '2.28': '55.0.2883', + '2.27': '54.0.2840', + '2.26': '53.0.2785', + '2.25': '53.0.2785', + '2.24': '52.0.2743', + '2.23': '51.0.2704', + '2.22': '49.0.2623', + '2.21': '46.0.2490', + '2.20': '43.0.2357', + '2.19': '43.0.2357', + '2.18': '43.0.2357', + '2.17': '42.0.2311', + '2.16': '42.0.2311', + '2.15': '40.0.2214', + '2.14': '39.0.2171', + '2.13': '38.0.2125', + '2.12': '36.0.1985', + '2.11': '36.0.1985', + '2.10': '33.0.1751', + '2.9': '31.0.1650', + '2.8': '30.0.1573', + '2.7': '30.0.1573', + '2.6': '29.0.1545', + '2.5': '29.0.1545', + '2.4': '29.0.1545', + '2.3': '28.0.1500', + '2.2': '27.0.1453', + '2.1': '27.0.1453', + '2.0': '27.0.1453', + } + +LOCAL_PORT = 8000 +_init_local_port = 8000 + + +def catchAttr(func): + def wrapper(self, *args, **kargs): + try: + return func(self, *args, **kargs) + except(AssertionError, AttributeError): + raise + except: + self.driver() + return object.__getattribute__(self.wd, args[0]) + return wrapper + +def is_port_listening(port, adbHost=None): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + result = s.connect_ex((str(adbHost) if adbHost else '127.0.0.1', port)) + s.close() + return result == 0 + +def next_local_port(adbHost=None): + global _init_local_port + _init_local_port = _init_local_port + 1 if _init_local_port < 8500 else LOCAL_PORT + while is_port_listening(_init_local_port): + _init_local_port += 1 + return _init_local_port + +class ChromeDriver(object): + + def __init__(self, d): + self.chrome_version = None + self.process = None + self.port = None + self.wd = None + self.d = d + self.serial = self.d.adb.device_serial() + self.url_prefix = md5(self.serial.encode("utf-8")).hexdigest() + + def _launch_webdriver(self): + if is_port_listening(int(self.port)): + self.port = next_local_port() + cmd_line = ["chromedriver" + self.chrome_version, + '--port=%s'%self.port, '--adb-port=5037', + '--url-base=%s/hub'%self.url_prefix] + if os.name != "nt": + cmd_line = [" ".join(cmd_line)] + self.process = subprocess.Popen(cmd_line, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + + def find_chrom_driver(self, pid): + '''in order of find chrom driver, match webview version of current phone ''' + if is_port_listening(int(self.port)): + self.port = next_local_port() + self.d.adb.forward_localabstract(self.port,"localabstract:webview_devtools_remote_" +str(pid)) + info = self.get_http("http://localhost:"+str(self.port)+"/json/version") + try: + if info is None: + raise AssertionError('not get webview version on %s'%self.serial) +# version = info['Browser'].split('/')[1][:2] # first map + version = info['Browser'].split('/')[1] # second map + map_version = self._getChromVersionMap(version) + if map_version: + return map_version + else: + raise RuntimeError('not match webview version=%s on %s'%(version, self.serial)) + finally: + self.d.adb.remove_forward_port(self.port) + + + def get_app_process(self, packageName): + '''get pid by package name''' + lines = self.d.adb.cmd('shell','ps').communicate()[0].decode("utf-8").strip().splitlines() + for line in lines: + if line.endswith(packageName): + return re.split("\s+",line)[1] + + def has_webview(self, pid): + '''consider webview exists''' + try: + lines = self.d.adb.cmd('shell','cat', '/proc/net/unix').communicate()[0].decode("utf-8").strip().splitlines() + for line in lines: + if line.endswith("webview_devtools_remote_" +str(pid)): + return True + except: + pass + +# def _getChromVersionMap(self, version): #36,37 +# '''get chrome driver refer to version first map''' +# for key,value in sorted(CHROM_VERSION_MAP.items(),key=lambda x:int(x[0].replace(".","")),reverse=True): +# t1 = version.split('.') +# t2 = value.split('-') +# if t1[0] >= t2[0] and t1[0]<= t2[1]: +# return key + + def _getChromVersionMap(self, version): # 37.0.0.1 + '''get chrome driver refer to version by second map''' + master = '.'.join(version.split('.')[:3]) + for key,value in sorted(CHROM_VERSION_MAP.items(),key=lambda x:int(x[0].replace(".",""))): + if StrictVersion(value) >= StrictVersion(master): + return key + + def start_server(self, packageName): # chromedriver version issue, about weixin small progress + if self.chrome_version is None: + pid = self.get_app_process(packageName) + if self.has_webview(pid): + self.chrome_version = self.find_chrom_driver(pid) + if self.chrome_version: + self._launch_webdriver() + else: + raise AssertionError('not exist webview on %s'%self.serial) + + + def driver(self, package=None, activity=None, attach=True, process=None): + """ + Args: + - package(string): default current running app + - attach(bool): default true, Attach to an already-running app instead of launching the app with a clear data directory + - activity(string): Name of the Activity hosting the WebView. + - process(string): Process name of the Activity hosting the WebView (as given by ps). + If not given, the process name is assumed to be the same as androidPackage. + Returns: + selenium driver + """ + app = self.d.adb.current_app() + self.getNextPort() + self.quit() + self.start_server(package or app[0]) + timeout = 5 + while timeout > 0: + if self.ping(): + break + time.sleep(0.1) + timeout -= 0.1 + else: + raise RuntimeError("chromedriver server not start, chrome version=%s, check chromedriver file exists?"%self.chrome_version) + capabilities = { + 'chromeOptions': { + 'androidDeviceSerial': self.serial, + 'androidPackage': package or app[0], + 'androidUseRunningApp': attach, + 'androidProcess': process or app[0], + 'androidActivity': activity or app[1], + } + } + self.wd = webdriver.Remote('http://localhost:%s/%s/hub'%(self.port, self.url_prefix), capabilities) + self.wd.implicitly_wait(10) + return self + + def refresh(self, **kargs): + self.driver(**kargs) + + def quit(self): + '''exit and release resource''' + self._release_port() + if self.process: + self.process.terminate() + self._clear_chrome_driver() + + def getNextPort(self): + if "win" in sys.platform: + self.port = self.__winport() + else: + self.port = self.__uinxport() + if self.port is None: + self.port = next_local_port() + + def __uinxport(self): + out = self.cmd('ps','aux', '|grep -v grep', '|grep %s'%self.url_prefix).communicate()[0] + for line in out.strip().splitlines(): + if "chromedriver" in line: + result = re.search(r"--port=(\d+)", line) + if result: + return result.group(1) + + def __winport(self): + out = self.cmd('wmic','process', 'where "commandline like \'%{}%\'"'.format(self.url_prefix),'get commandline').communicate()[0] + for line in out.strip().splitlines(): + if line.startswith('chromedriver'): + result = re.search(r"--port=(\d+)", line) + if result: + return result.group(1) + + def cmd(self, *args): + cmd_line = " ".join(list(args)) + if os.name != "nt": + cmd_line = [cmd_line] + return subprocess.Popen(cmd_line, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + def _release_port(self): + try: + for s, lp, rp in self.d.adb.forward_list(): + if s == self.serial and ("localabstract:webview_devtools_remote" in rp): + self.d.adb.cmd("forward","--remove", lp) + except: + pass + + def _clear_chrome_driver(self): + '''clear chrome driver process''' + if "win" in sys.platform: +# cmd = 'FOR /F "usebackq tokens=5" %a in (`netstat -nao ^|findstr /R /C:{0}`) \ +# do (FOR /F "usebackq" %b in (`TASKLIST /FI "PID eq %a" ^| findstr /I chromedriver{1}.exe`) \ +# do (IF NOT %b=="" TASKKILL /F /PID %a))'.format(self.port, self.chrome_version) + cmd = 'wmic process where "commandline like \'%{0}%\'" call terminate'.format(self.url_prefix) + os.system(cmd) + else: + os.system('pkill -9 -f "chromedriver.*--url-base=%s/hub"'%self.url_prefix) + + + def ping(self, timeout=5): + try: + result = self.get_http("http://localhost:%s/%s/hub/status"%(self.port, self.url_prefix), timeout) + if result: + if "status" in result.keys(): + return True + except: + pass + + def get_http(self,url,timeout=5): + result = None + try: + req = urllib2.Request(url) + result = urllib2.urlopen(req, timeout=timeout) + return json.loads(result.read()) + finally: + if result is not None: + result.close() + + @catchAttr + def __getattr__(self, attr): + method = object.__getattribute__(self.wd, attr) + if method: + if hasattr(method, '__call__'): + return self._catchExcept(method) + return method + else: + raise AttributeError("selenium not this attr") + + def _catchExcept(self, func): + '''deal callable exception''' + def wrapper(*args, **kargs): + try: + return func(*args, **kargs) + except: + try: + self.driver() + method = object.__getattribute__(self.wd, func.__name__) + return method(*args, **kargs) + except: + raise + return wrapper + + def __exit__(self): + self.quit() + + diff --git a/uiautomator/comparison.py b/uiautomator/comparison.py new file mode 100644 index 0000000..0c8b564 --- /dev/null +++ b/uiautomator/comparison.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python +#-*- coding: utf-8 -*- + +''' +image comparision module +depend: + +options: +[sudo apt-get install python-numpy] +''' + +__version__ = "2.0.1" +__author__ = "bhb" +__all__ = ['isMatch', 'getMatchedCenterOffset'] + +import os +try: + import cv2 +except ImportError, e: + print e + +def isMatch(subPath, srcPath, threshold=0.01,colormode=1): + ''' + check wether the subPath image exists in the srcPath image. + @type subPath: string + @params subPath: the path of searched template. It must be not greater than the source image and have the same data type. + @type srcPath: string + @params srcPath: the path of the source image where the search is running. + @type threshold: float + @params threshold: the minixum value which used to increase or decrease the matching threshold. 0.01 means at most 1% difference. default is 0.01. + @rtype: boolean + @return: true if the sub image founded in the src image. return false if sub image not found or any exception. + ''' + for img in [subPath, srcPath]: assert os.path.exists(img) , 'No such image: %s' % (img) + method = cv2.cv.CV_TM_SQDIFF_NORMED #Parameter specifying the comparison method + try: + subImg = cv2.imread(subPath,colormode) #Load the sub image + srcImg = cv2.imread(srcPath,colormode) #Load the src image + result = cv2.matchTemplate(subImg, srcImg, method) #comparision + minVal = cv2.minMaxLoc(result)[0] #Get the minimum squared difference + if minVal <= threshold: #Compared with the expected similarity + return True + else: + return False + except: + return False + +def getMatchedCenterOffset(subPath, srcPath, threshold=0.01, rotation=0, colormode=1): + ''' + get the coordinate of the mathced sub image center point. + @type subPath: string + @params subPath: the path of searched template. It must be not greater than the source image and have the same data type. + @type srcPath: string + @params srcPath: the path of the source image where the search is running. + @type threshold: float + @params threshold: the minixum value which used to increase or decrease the matching threshold. 0.01 means at most 1% difference. + default is 0.01. + @type rotation: int + @params rotation: the degree of rotation. default is closewise. must be oone of 0, 90, 180, 270 + @rtype: tuple + @return: (x, y) the coordniate tuple of the matched sub image center point. return None if sub image not found or any exception. + ''' + for img in [subPath, srcPath]: assert os.path.exists(img) , "No such image: %s" % (img) + method = cv2.cv.CV_TM_SQDIFF_NORMED #Parameter specifying the comparison method + try: + subImg = cv2.imread(subPath,colormode) #Load the sub image + srcImg = cv2.imread(srcPath,colormode) #Load the src image + result = cv2.matchTemplate(subImg, srcImg, method) #comparision + minVal, maxVal, minLoc, maxLoc = cv2.minMaxLoc(result) #Get the minimum squared difference + if minVal <= threshold: #Compared with the expected similarity + minLocXPoint, minLocYPoint = minLoc + subImgRow, subImgColumn = subImg.shape[:2] + centerPointX = minLocXPoint + int(subImgColumn/2) + centerPointY = minLocYPoint + int(subImgRow/2) + #if image is binary format shape return (w, h) else return (w, h, d) + (height, width) = srcImg.shape[:2] + centerPoint = (minLocXPoint if centerPointX > width else centerPointX, minLocYPoint if centerPointY > height else centerPointY ) + return adaptRotation(coord=centerPoint, size=(height, width), rotation=rotation) + else: + return None + except Exception, e: + return None + +def adaptRotation(coord, size, rotation=0): + if rotation == 0: + return coord + elif rotation == 90: + height, width = size + x_coord, y_coord = coord + x = y_coord + y = width - x_coord + return (x, y) + elif rotation == 180: + height, width = size + x_coord, y_coord = coord + x = x_coord + y = y_coord + return (x, y) + elif rotation == 270: + height, width = size + x_coord, y_coord = coord + x = height - y_coord + y = x_coord + return (x, y) + else: + return None diff --git a/uiautomator/find_img.py b/uiautomator/find_img.py new file mode 100644 index 0000000..cc1ec24 --- /dev/null +++ b/uiautomator/find_img.py @@ -0,0 +1,115 @@ +#coding=utf-8 +try: + import cv2 +except ImportError, e: + print e + +import numpy as np + + +FLANN_INDEX_KDTREE = 1 # bug: flann enums are missing +FLANN_INDEX_LSH = 6 + +def _middlePoint(pts): + '''get center positon by given points''' + def add(p1, p2): + return (p1[0]+p2[0], p1[1]+p2[1]) + def distance(p1, p2): + import math + l2 = (p1[0]-p2[0])**2 + (p1[1]-p2[1])**2 + return math.sqrt(l2) + length = len(pts) # point set length + sumx, sumy = reduce(add, pts) + point = sumx/length, sumy/length + # filter out ok points + avg_distance = sum([distance(point, p) for p in pts])/length + good = [] + sumx, sumy = 0.0, 0.0 + for p in pts: +# print 'point: %s, distance: %.2f' %(p, distance(p, point)) + if distance(p, point) < 1.2*avg_distance: + good.append(p) + sumx += p[0] + sumy += p[1] + else: + pass +# print 'not good', p + point = map(long, (sumx/len(good), sumy/len(good))) + return point + +def init_feature(name): + '''choice algorithm''' + chunks = name.split('-') + if chunks[0] == 'sift': + detector = cv2.SIFT() + norm = cv2.NORM_L2 + elif chunks[0] == 'surf': + detector = cv2.SURF(800) + norm = cv2.NORM_L2 + elif chunks[0] == 'orb': + detector = cv2.ORB(400) + norm = cv2.NORM_HAMMING + else: + return None, None + if 'flann' in chunks: + if norm == cv2.NORM_L2: + flann_params = dict(algorithm = FLANN_INDEX_KDTREE, trees = 5) + else: + flann_params= dict(algorithm = FLANN_INDEX_LSH, + table_number = 6, # 12 + key_size = 12, # 20 + multi_probe_level = 1) #2 + matcher = cv2.FlannBasedMatcher(flann_params, {}) # bug : need to pass empty dict (#1329) + else: + matcher = cv2.BFMatcher(norm) + return detector, matcher + + +def filter_matches(kp1, kp2, matches, ratio = 0.75): + '''过滤匹配点''' + mkp1, mkp2 = [], [] + for m in matches: + if len(m) == 2 and m[0].distance < m[1].distance * ratio: + m = m[0] + mkp1.append( kp1[m.queryIdx] ) + mkp2.append( kp2[m.trainIdx] ) + p1 = np.float32([kp.pt for kp in mkp1]) + p2 = np.float32([kp.pt for kp in mkp2]) + return p1, p2 + +def _find_position(img1, H = None): + '''find img position''' + h1, w1 = img1.shape[:2] + corners = np.float32([[0, 0], [w1, 0], [w1, h1], [0, h1]]) + corners = np.int32( cv2.perspectiveTransform(corners.reshape(1, -1, 2), H).reshape(-1, 2)) +# arr = corners < 0 +# if True in arr: +# return None + return _middlePoint(corners) + +def find_img_position(query, origin, algorithm='sift',radio=0.75, colormode=1): + ''' + return position of query in origin,by ratio and algorithm + :Args: + - origin - raw picture + - query - need search picture + - ratio - similarity + - algorithm - using algorithm + :Usage: + find_img_position('query.png','qq.png','sift', 0.75) + ''' + img1 = cv2.imread(query, colormode) + img2 = cv2.imread(origin, colormode) + detector, matcher = init_feature(algorithm) + kp1, desc1 = detector.detectAndCompute(img1, None) + kp2, desc2 = detector.detectAndCompute(img2, None) + raw_matches = matcher.knnMatch(desc1, trainDescriptors = desc2, k = 2) #2 + p1, p2 = filter_matches(kp1, kp2, raw_matches) + if len(p1) >= 4: + H, status = cv2.findHomography(p1, p2, cv2.RANSAC, 5.0) + if round(np.sum(status)/float(len(status)),2) < radio: + return None + return _find_position(img1, H) + return None + + diff --git a/uiautomator/imgUtil.py b/uiautomator/imgUtil.py new file mode 100644 index 0000000..37463f7 --- /dev/null +++ b/uiautomator/imgUtil.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +"""Python img deal tools.""" +import os,base64,tempfile +from PIL import Image, ImageChops +from find_img import find_img_position + +def is_base64(strword): + try: + if len(strword) < 200: + if os.path.exists(strword): + return False + else: + base64.b64decode(strword) + return True + except: + pass + return False + +class ImageUtil: + ''' + image deal class + ''' + @staticmethod + def find_image_positon(query, origin, algorithm='sift', radio=0.75, colormode=1): + if not os.path.exists(query): + raise IOError,('No such file or directory:%s'%query) + position = find_img_position(query, origin, algorithm, radio, colormode) + return position + + @staticmethod + def compare_stream(strStream, target_file): + ''' + file stream compare + :Args: + - strStream: strStrem by driver.get_screenshot_as_png. + - target_file: need compared target file. + ''' + temp=tempfile.mktemp() + with open(temp,"wb") as f: + f.write(strStream) + simily = ImageUtil.compare(target_file, temp) + if os.path.exists(temp):os.remove(temp) + return simily + + @staticmethod + def compare(f1, f2): + """ + Calculate the similarity between f1 and f2 + return similarity 0-100 + """ + img1 = Image.open(f1) + img2 = Image.open(f2,'r') + # if image size is not equal, return 1 + if img1.size[0] != img2.size[0] or img1.size[1] != img2.size[1]: + return 0 + size = (256, 256) + img1 = img1.resize(size).convert('RGB') + img2 = img2.resize(size).convert('RGB') + # # get the difference between the two images + h = ImageChops.difference(img1, img2) + size = float(img1.size[0] * img1.size[1]) + diff = 0 + for p in list(h.getdata()): + if p != (0, 0, 0): + diff += 1 + return round((1 - (diff / size)) * 100, 2) + + @staticmethod + def crop(startx, starty, endx, endy, scrfile, destfile): + """ + cut img by the given coordinates and picture, then make target file + """ + box = (startx, starty, endx, endy) + img = Image.open(scrfile) + cut_img = img.crop(box) + if cut_img: + cut_img.save(destfile) + return True + else: + return False \ No newline at end of file diff --git a/uiautomator/libs/app-uiautomator-test.apk b/uiautomator/libs/app-uiautomator-test.apk index b9934e2..399144b 100644 Binary files a/uiautomator/libs/app-uiautomator-test.apk and b/uiautomator/libs/app-uiautomator-test.apk differ diff --git a/uiautomator/libs/app-uiautomator.apk b/uiautomator/libs/app-uiautomator.apk index 411582f..bb9f8ff 100644 Binary files a/uiautomator/libs/app-uiautomator.apk and b/uiautomator/libs/app-uiautomator.apk differ diff --git a/xiaocong.txt b/xiaocong.txt new file mode 100644 index 0000000..689a75e --- /dev/null +++ b/xiaocong.txt @@ -0,0 +1,19 @@ +[distutils] +index-servers = + pypi + pypitest + +[pypi] +repository=https://upload.pypi.org/legacy/ +username=xiaocong +password=Xcbdf1q2b + +[pypitest] +repository=https://test.pypi.org/legacy/ +username=xiaocong +password=Xcbdf1q2b + + +python2 -m twine upload dist/* --repository-url https://test.pypi.org/legacy/ + +pip install -U uiautomator -i https://test.pypi.org/simple/ \ No newline at end of file