startop: Rewrite the run app bash script to python.

Test: python run_app_with_prefetch.py  -p com.android.settings -a com.android.settings.Settings -r fadvise -i input --debug --simulate
Test: python run_app_with_prefetch.py  -p com.android.settings -a com.android.settings.Settings -r fadvise -i input
Test: pytest run_app_with_prefetch_test.py

Bug: 135286022
Change-Id: I761e5d20292febcb47b7ca9f87d6847d77250f68
This commit is contained in:
Yan Wang
2019-06-20 15:16:22 -07:00
parent 7eaacb83f4
commit fb9bdd8da3
6 changed files with 936 additions and 0 deletions

View File

@@ -0,0 +1,63 @@
#!/usr/bin/env python3
#
# Copyright 2019, The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helper util libraries for calling adb command line."""
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(
os.path.abspath(__file__)))))
import lib.cmd_utils as cmd_utils
def logcat_save_timestamp() -> str:
"""Gets the current logcat timestamp.
Returns:
A string of timestamp.
"""
_, output = cmd_utils.run_adb_shell_command(
"date -u +\'%Y-%m-%d %H:%M:%S.%N\'")
return output
def vm_drop_cache():
"""Free pagecache and slab object."""
cmd_utils.run_adb_shell_command('echo 3 > /proc/sys/vm/drop_caches')
def root():
"""Roots adb and successive adb commands will run under root."""
cmd_utils.run_shell_command('adb root')
def disable_selinux():
"""Disables selinux setting."""
_, output = cmd_utils.run_adb_shell_command('getenforce')
if output == 'Permissive':
return
print('Disable selinux permissions and restart framework.')
cmd_utils.run_adb_shell_command('setenforce 0')
cmd_utils.run_adb_shell_command('stop')
cmd_utils.run_adb_shell_command('start')
cmd_utils.run_shell_command('adb wait-for-device')
def pkill(procname: str):
"""Kills a process in device by its package name."""
_, pids = cmd_utils.run_shell_command('adb shell ps | grep "{}" | '
'awk \'{{print $2;}}\''.
format(procname))
for pid in pids.split('\n'):
cmd_utils.run_adb_shell_command('kill {}'.format(pid))

View File

@@ -0,0 +1,320 @@
#!/usr/bin/env python3
#
# Copyright 2019, The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Runner of one test given a setting.
Run app and gather the measurement in a certain configuration.
Print the result to stdout.
See --help for more details.
Sample usage:
$> ./python run_app_with_prefetch.py -p com.android.settings -a
com.android.settings.Settings -r fadvise -i input
"""
import argparse
import os
import sys
import time
from typing import List, Tuple
from pathlib import Path
# local imports
import lib.adb_utils as adb_utils
# global variables
DIR = os.path.abspath(os.path.dirname(__file__))
IORAP_COMMON_BASH_SCRIPT = os.path.realpath(os.path.join(DIR,
'../iorap/common'))
sys.path.append(os.path.dirname(DIR))
import lib.print_utils as print_utils
import lib.cmd_utils as cmd_utils
import iorap.lib.iorapd_utils as iorapd_utils
def parse_options(argv: List[str] = None):
"""Parses command line arguments and return an argparse Namespace object."""
parser = argparse.ArgumentParser(
description='Run an Android application once and measure startup time.'
)
required_named = parser.add_argument_group('required named arguments')
required_named.add_argument('-p', '--package', action='store', dest='package',
help='package of the application', required=True)
# optional arguments
# use a group here to get the required arguments to appear 'above' the
# optional arguments in help.
optional_named = parser.add_argument_group('optional named arguments')
optional_named.add_argument('-a', '--activity', action='store',
dest='activity',
help='launch activity of the application')
optional_named.add_argument('-s', '--simulate', dest='simulate',
action='store_true',
help='simulate the process without executing '
'any shell commands')
optional_named.add_argument('-d', '--debug', dest='debug',
action='store_true',
help='Add extra debugging output')
optional_named.add_argument('-i', '--input', action='store', dest='input',
help='perfetto trace file protobuf',
default='TraceFile.pb')
optional_named.add_argument('-r', '--readahead', action='store',
dest='readahead',
help='which readahead mode to use',
default='cold',
choices=('warm', 'cold', 'mlock', 'fadvise'))
optional_named.add_argument('-t', '--timeout', dest='timeout', action='store',
type=int,
help='Timeout after this many seconds when '
'executing a single run.',
default=10)
optional_named.add_argument('--compiler-filter', dest='compiler_filter',
action='store',
help='Which compiler filter to use.',
default=None)
return parser.parse_args(argv)
def validate_options(opts: argparse.Namespace) -> bool:
"""Validates the activity and trace file if needed.
Returns:
A bool indicates whether the activity is valid and trace file exists if
necessary.
"""
needs_trace_file = (opts.readahead != 'cold' and opts.readahead != 'warm')
if needs_trace_file and (opts.input is None or
not os.path.exists(opts.input)):
print_utils.error_print('--input not specified!')
return False
# Install necessary trace file.
if needs_trace_file:
passed = iorapd_utils.iorapd_compiler_install_trace_file(
opts.package, opts.activity, opts.input)
if not cmd_utils.SIMULATE and not passed:
print_utils.error_print('Failed to install compiled TraceFile.pb for '
'"{}/{}"'.
format(opts.package, opts.activity))
return False
if opts.activity is not None:
return True
_, opts.activity = cmd_utils.run_shell_func(IORAP_COMMON_BASH_SCRIPT,
'get_activity_name',
[opts.package])
if not opts.activity:
print_utils.error_print('Activity name could not be found, '
'invalid package name?!')
return False
return True
def set_up_adb_env():
"""Sets up adb environment."""
adb_utils.root()
adb_utils.disable_selinux()
time.sleep(1)
def configure_compiler_filter(compiler_filter: str, package: str,
activity: str) -> bool:
"""Configures compiler filter (e.g. speed).
Returns:
A bool indicates whether configure of compiler filer succeeds or not.
"""
if not compiler_filter:
print_utils.debug_print('No --compiler-filter specified, don\'t'
' need to force it.')
return True
passed, current_compiler_filter_info = \
cmd_utils.run_shell_command(
'{} --package {}'.format(os.path.join(DIR, 'query_compiler_filter.py'),
package))
if passed != 0:
return passed
# TODO: call query_compiler_filter directly as a python function instead of
# these shell calls.
current_compiler_filter, current_reason, current_isa = current_compiler_filter_info.split(' ')
print_utils.debug_print('Compiler Filter={} Reason={} Isa={}'.format(
current_compiler_filter, current_reason, current_isa))
# Don't trust reasons that aren't 'unknown' because that means
# we didn't manually force the compilation filter.
# (e.g. if any automatic system-triggered compilations are not unknown).
if current_reason != 'unknown' or current_compiler_filter != compiler_filter:
passed, _ = adb_utils.run_shell_command('{}/force_compiler_filter '
'--compiler-filter "{}" '
'--package "{}"'
' --activity "{}'.
format(DIR, compiler_filter,
package, activity))
else:
adb_utils.debug_print('Queried compiler-filter matched requested '
'compiler-filter, skip forcing.')
passed = False
return passed
def parse_metrics_output(input: str,
simulate: bool = False) -> List[Tuple[str, str, str]]:
"""Parses ouput of app startup to metrics and corresponding values.
It converts 'a=b\nc=d\ne=f\n...' into '[(a,b,''),(c,d,''),(e,f,'')]'
Returns:
A list of tuples that including metric name, metric value and rest info.
"""
if simulate:
return [('TotalTime', '123')]
all_metrics = []
for line in input.split('\n'):
if not line:
continue
splits = line.split('=')
if len(splits) < 2:
print_utils.error_print('Bad line "{}"'.format(line))
continue
metric_name = splits[0]
metric_value = splits[1]
rest = splits[2] if len(splits) > 2 else ''
if rest:
print_utils.error_print('Corrupt line "{}"'.format(line))
print_utils.debug_print('metric: "{metric_name}", '
'value: "{metric_value}" '.
format(metric_name=metric_name,
metric_value=metric_value))
all_metrics.append((metric_name, metric_value))
return all_metrics
def run(readahead: str,
package: str,
activity: str,
timeout: int,
simulate: bool,
debug: bool) -> List[Tuple[str, str]]:
"""Runs app startup test.
Returns:
A list of tuples that including metric name, metric value and rest info.
"""
print_utils.debug_print('==========================================')
print_utils.debug_print('===== START =====')
print_utils.debug_print('==========================================')
if readahead != 'warm':
print_utils.debug_print('Drop caches for non-warm start.')
# Drop all caches to get cold starts.
adb_utils.vm_drop_cache()
print_utils.debug_print('Running with timeout {}'.format(timeout))
pre_launch_timestamp = adb_utils.logcat_save_timestamp()
passed, output = cmd_utils.run_shell_command('timeout {timeout} '
'"{DIR}/launch_application" '
'"{package}" '
'"{activity}" | '
'"{DIR}/parse_metrics" '
'--package {package} '
'--activity {activity} '
'--timestamp "{timestamp}"'
.format(timeout=timeout,
DIR=DIR,
package=package,
activity=activity,
timestamp=pre_launch_timestamp))
if not output and not simulate:
return None
results = parse_metrics_output(output, simulate)
passed = perform_post_launch_cleanup(
readahead, package, activity, timeout, debug, pre_launch_timestamp)
if not passed and not simulate:
print_utils.error_print('Cannot perform post launch cleanup!')
return None
adb_utils.pkill(package)
return results
def perform_post_launch_cleanup(readahead: str,
package: str,
activity: str,
timeout: int,
debug: bool,
logcat_timestamp: str) -> bool:
"""Performs cleanup at the end of each loop iteration.
Returns:
A bool indicates whether the cleanup succeeds or not.
"""
if readahead != 'warm' and readahead != 'cold':
return iorapd_utils.wait_for_iorapd_finish(package,
activity,
timeout,
debug,
logcat_timestamp)
return passed
# Don't need to do anything for warm or cold.
return True
def run_test(opts: argparse.Namespace) -> List[Tuple[str, str]]:
"""Runs one test using given options.
Returns:
A list of tuples that including metric name, metric value and anything left.
"""
print_utils.DEBUG = opts.debug
cmd_utils.SIMULATE = opts.simulate
passed = validate_options(opts)
if not passed:
return None
set_up_adb_env()
# Ensure the APK is currently compiled with whatever we passed in
# via --compiler-filter.
# No-op if this option was not passed in.
if not configure_compiler_filter(opts.compiler_filter, opts.package,
opts.activity):
return None
return run(opts.readahead, opts.package, opts.activity, opts.timeout,
opts.simulate, opts.debug)
def main():
args = parse_options()
result = run_test(args)
if result is None:
return 1
print(result)
return 0
if __name__ == '__main__':
sys.exit(main())

View File

@@ -0,0 +1,270 @@
#!/usr/bin/env python3
#
# Copyright 2019, The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Unit tests for the run_app_with_prefetch_test.py script.
Install:
$> sudo apt-get install python3-pytest ## OR
$> pip install -U pytest
See also https://docs.pytest.org/en/latest/getting-started.html
Usage:
$> ./run_app_with_prefetch_test.py
$> pytest run_app_with_prefetch_test.py
$> python -m pytest run_app_with_prefetch_test.py
See also https://docs.pytest.org/en/latest/usage.html
"""
import io
import os
import shlex
import sys
# global imports
from contextlib import contextmanager
# pip imports
import pytest
# local imports
import run_app_with_prefetch as run
from mock import Mock, call, patch
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
#
# Argument Parsing Helpers
#
@contextmanager
def ignore_stdout_stderr():
"""Ignore stdout/stderr output for duration of this context."""
old_stdout = sys.stdout
old_stderr = sys.stderr
sys.stdout = io.StringIO()
sys.stderr = io.StringIO()
try:
yield
finally:
sys.stdout = old_stdout
sys.stderr = old_stderr
@contextmanager
def argparse_bad_argument(msg):
"""Asserts that a SystemExit is raised when executing this context.
If the assertion fails, print the message 'msg'.
"""
with pytest.raises(SystemExit, message=msg):
with ignore_stdout_stderr():
yield
def assert_bad_argument(args, msg):
"""Asserts that the command line arguments in 'args' are malformed.
Prints 'msg' if the assertion fails.
"""
with argparse_bad_argument(msg):
parse_args(args)
def parse_args(args):
"""
:param args: command-line like arguments as a single string
:return: dictionary of parsed key/values
"""
# "-a b -c d" => ['-a', 'b', '-c', 'd']
return vars(run.parse_options(shlex.split(args)))
def default_dict_for_parsed_args(**kwargs):
"""Combines it with all of the "optional" parameters' default values."""
d = {
'readahead': 'cold',
'simulate': None,
'simulate': False,
'debug': False,
'input': 'TraceFile.pb',
'timeout': 10,
'compiler_filter': None,
'activity': None
}
d.update(kwargs)
return d
def default_mock_dict_for_parsed_args(include_optional=True, **kwargs):
"""Combines default dict with all optional parameters with some mock required
parameters.
"""
d = {'package': 'com.fake.package'}
if include_optional:
d.update(default_dict_for_parsed_args())
d.update(kwargs)
return d
def parse_optional_args(str):
"""
Parses an argument string which already includes all the required arguments
in default_mock_dict_for_parsed_args.
"""
req = '--package com.fake.package'
return parse_args('%s %s' % (req, str))
def test_argparse():
# missing arguments
assert_bad_argument('', '-p are required')
# required arguments are parsed correctly
ad = default_dict_for_parsed_args # assert dict
assert parse_args('--package xyz') == ad(package='xyz')
assert parse_args('-p xyz') == ad(package='xyz')
assert parse_args('-p xyz -s') == ad(package='xyz', simulate=True)
assert parse_args('-p xyz --simulate') == ad(package='xyz', simulate=True)
# optional arguments are parsed correctly.
mad = default_mock_dict_for_parsed_args # mock assert dict
assert parse_optional_args('--input trace.pb') == mad(input='trace.pb')
assert parse_optional_args('--compiler-filter speed') == \
mad(compiler_filter='speed')
assert parse_optional_args('-d') == mad(debug=True)
assert parse_optional_args('--debug') == mad(debug=True)
assert parse_optional_args('--timeout 123') == mad(timeout=123)
assert parse_optional_args('-t 456') == mad(timeout=456)
assert parse_optional_args('-r warm') == mad(readahead='warm')
assert parse_optional_args('--readahead warm') == mad(readahead='warm')
assert parse_optional_args('-a act') == mad(activity='act')
assert parse_optional_args('--activity act') == mad(activity='act')
def test_main():
args = '--package com.fake.package --activity act -s'
opts = run.parse_options(shlex.split(args))
result = run.run_test(opts)
assert result == [('TotalTime', '123')]
def test_set_up_adb_env():
with patch('lib.cmd_utils.run_shell_command',
new_callable=Mock) as mock_run_shell_command:
mock_run_shell_command.return_value = (True, '')
run.set_up_adb_env()
calls = [call('adb root'),
call('adb shell "getenforce"'),
call('adb shell "setenforce 0"'),
call('adb shell "stop"'),
call('adb shell "start"'),
call('adb wait-for-device')]
mock_run_shell_command.assert_has_calls(calls)
def test_set_up_adb_env_with_permissive():
with patch('lib.cmd_utils.run_shell_command',
new_callable=Mock) as mock_run_shell_command:
mock_run_shell_command.return_value = (True, 'Permissive')
run.set_up_adb_env()
calls = [call('adb root'), call('adb shell "getenforce"')]
mock_run_shell_command.assert_has_calls(calls)
def test_configure_compiler_filter():
with patch('lib.cmd_utils.run_shell_command',
new_callable=Mock) as mock_run_shell_command:
mock_run_shell_command.return_value = (True, 'speed arm64 kUpToDate')
run.configure_compiler_filter('speed', 'music', 'MainActivity')
calls = [call(os.path.join(run.DIR, 'query_compiler_filter.py') +
' --package music')]
mock_run_shell_command.assert_has_calls(calls)
def test_parse_metrics_output():
input = 'a1=b1\nc1=d1\ne1=f1'
ret = run.parse_metrics_output(input)
assert ret == [('a1', 'b1'), ('c1', 'd1'), ('e1', 'f1')]
def _mocked_run_shell_command(*args, **kwargs):
if args[0] == 'adb shell "date -u +\'%Y-%m-%d %H:%M:%S.%N\'"':
return (True, "123:123")
elif args[0] == 'adb shell ps | grep "music" | awk \'{print $2;}\'':
return (True, '9999')
else:
return (True, 'a1=b1\nc1=d1=d2\ne1=f1')
def test_run_no_vm_cache_drop():
with patch('lib.cmd_utils.run_shell_command',
new_callable=Mock) as mock_run_shell_command:
mock_run_shell_command.side_effect = _mocked_run_shell_command
run.run('warm',
'music',
'MainActivity',
timeout=10,
simulate=False,
debug=False)
calls = [call('adb shell "date -u +\'%Y-%m-%d %H:%M:%S.%N\'"'),
call(
'timeout {timeout} "{DIR}/launch_application" "{package}" "{activity}" | '
'"{DIR}/parse_metrics" --package {package} --activity {activity} '
'--timestamp "{timestamp}"'
.format(timeout=10,
DIR=run.DIR,
package='music',
activity='MainActivity',
timestamp='123:123')),
call('adb shell ps | grep "music" | awk \'{print $2;}\''),
call('adb shell "kill 9999"')]
mock_run_shell_command.assert_has_calls(calls)
def test_run_with_vm_cache_drop_and_post_launch_cleanup():
with patch('lib.cmd_utils.run_shell_command',
new_callable=Mock) as mock_run_shell_command:
mock_run_shell_command.side_effect = _mocked_run_shell_command
run.run('fadvise',
'music',
'MainActivity',
timeout=10,
simulate=False,
debug=False)
calls = [call('adb shell "echo 3 > /proc/sys/vm/drop_caches"'),
call('adb shell "date -u +\'%Y-%m-%d %H:%M:%S.%N\'"'),
call(
'timeout {timeout} "{DIR}/launch_application" "{package}" "{activity}" | '
'"{DIR}/parse_metrics" --package {package} --activity {activity} '
'--timestamp "{timestamp}"'
.format(timeout=10,
DIR=run.DIR,
package='music',
activity='MainActivity',
timestamp='123:123')),
call(
'bash -c "source {script_path}; '
'iorapd_readahead_wait_until_finished '
'\'{package}\' \'{activity}\' \'{timestamp}\' \'{timeout}\'"'.
format(timeout=10,
package='music',
activity='MainActivity',
timestamp='123:123',
script_path=run.IORAP_COMMON_BASH_SCRIPT)),
call('adb shell ps | grep "music" | awk \'{print $2;}\''),
call('adb shell "kill 9999"')]
mock_run_shell_command.assert_has_calls(calls)
if __name__ == '__main__':
pytest.main()

View File

@@ -0,0 +1,88 @@
#!/usr/bin/env python3
#
# Copyright 2019, The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helper util libraries for iorapd related operations."""
import os
import sys
from pathlib import Path
# up to two level, like '../../'
sys.path.append(Path(os.path.abspath(__file__)).parents[2])
import lib.cmd_utils as cmd_utils
IORAPID_LIB_DIR = os.path.abspath(os.path.dirname(__file__))
IORAPD_DATA_PATH = '/data/misc/iorapd'
IORAP_COMMON_BASH_SCRIPT = os.path.realpath(os.path.join(IORAPID_LIB_DIR,
'../common'))
def _iorapd_path_to_data_file(package: str, activity: str, suffix: str) -> str:
"""Gets conventional data filename.
Returns:
The path of iorapd data file.
"""
# Match logic of 'AppComponentName' in iorap::compiler C++ code.
return '{}/{}%2F{}.{}'.format(IORAPD_DATA_PATH, package, activity, suffix)
def iorapd_compiler_install_trace_file(package: str, activity: str,
input_file: str) -> bool:
"""Installs a compiled trace file.
Returns:
Whether the trace file is installed successful or not.
"""
# remote path calculations
compiled_path = _iorapd_path_to_data_file(package, activity,
'compiled_trace.pb')
if not os.path.exists(input_file):
print('Error: File {} does not exist'.format(input_file))
return False
passed, _ = cmd_utils.run_adb_shell_command(
'mkdir -p "$(dirname "{}")"'.format(compiled_path))
if not passed:
return False
passed, _ = cmd_utils.run_shell_command('adb push "{}" "{}"'.format(
input_file, compiled_path))
return passed
def wait_for_iorapd_finish(package: str,
activity: str,
timeout: int,
debug: bool,
logcat_timestamp: str)->bool:
"""Waits for the finish of iorapd.
Returns:
A bool indicates whether the iorapd is done successfully or not.
"""
# Set verbose for bash script based on debug flag.
if debug:
os.putenv('verbose', 'y')
# Validate that readahead completes.
# If this fails for some reason, then this will also discard the timing of
# the run.
passed, _ = cmd_utils.run_shell_func(IORAP_COMMON_BASH_SCRIPT,
'iorapd_readahead_wait_until_finished',
[package, activity, logcat_timestamp,
str(timeout)])
return passed

View File

@@ -0,0 +1,166 @@
#!/usr/bin/env python3
#
# Copyright 2019, The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helper util libraries for command line operations."""
import asyncio
import sys
import time
from typing import Tuple, Optional, List
import lib.print_utils as print_utils
TIMEOUT = 50
SIMULATE = False
def run_adb_shell_command(cmd: str) -> Tuple[bool, str]:
"""Runs command using adb shell.
Returns:
A tuple of running status (True=succeeded, False=failed or timed out) and
std output (string contents of stdout with trailing whitespace removed).
"""
return run_shell_command('adb shell "{}"'.format(cmd))
def run_shell_func(script_path: str,
func: str,
args: List[str]) -> Tuple[bool, str]:
"""Runs shell function with default timeout.
Returns:
A tuple of running status (True=succeeded, False=failed or timed out) and
std output (string contents of stdout with trailing whitespace removed) .
"""
cmd = 'bash -c "source {script_path}; {func} {args}"'.format(
script_path=script_path,
func=func,
args=' '.join("'{}'".format(arg) for arg in args))
print_utils.debug_print(cmd)
return run_shell_command(cmd)
def run_shell_command(cmd: str) -> Tuple[bool, str]:
"""Runs shell command with default timeout.
Returns:
A tuple of running status (True=succeeded, False=failed or timed out) and
std output (string contents of stdout with trailing whitespace removed) .
"""
return execute_arbitrary_command([cmd],
TIMEOUT,
shell=True,
simulate=SIMULATE)
def execute_arbitrary_command(cmd: List[str],
timeout: int,
shell: bool,
simulate: bool) -> Tuple[bool, str]:
"""Run arbitrary shell command with default timeout.
Mostly copy from
frameworks/base/startop/scripts/app_startup/app_startup_runner.py.
Args:
cmd: list of cmd strings.
timeout: the time limit of running cmd.
shell: indicate if the cmd is a shell command.
simulate: if it's true, do not run the command and assume the running is
successful.
Returns:
A tuple of running status (True=succeeded, False=failed or timed out) and
std output (string contents of stdout with trailing whitespace removed) .
"""
if simulate:
print(cmd)
return True, ''
print_utils.debug_print('[EXECUTE]', cmd)
# block until either command finishes or the timeout occurs.
loop = asyncio.get_event_loop()
(return_code, script_output) = loop.run_until_complete(
_run_command(*cmd, shell=shell, timeout=timeout))
script_output = script_output.decode() # convert bytes to str
passed = (return_code == 0)
print_utils.debug_print('[$?]', return_code)
if not passed:
print('[FAILED, code:%s]' % (return_code), script_output, file=sys.stderr)
return passed, script_output.rstrip()
async def _run_command(*args: List[str],
shell: bool = False,
timeout: Optional[int] = None) -> Tuple[int, bytes]:
if shell:
process = await asyncio.create_subprocess_shell(
*args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT)
else:
process = await asyncio.create_subprocess_exec(
*args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT)
script_output = b''
print_utils.debug_print('[PID]', process.pid)
timeout_remaining = timeout
time_started = time.time()
# read line (sequence of bytes ending with b'\n') asynchronously
while True:
try:
line = await asyncio.wait_for(process.stdout.readline(),
timeout_remaining)
print_utils.debug_print('[STDOUT]', line)
script_output += line
if timeout_remaining:
time_elapsed = time.time() - time_started
timeout_remaining = timeout - time_elapsed
except asyncio.TimeoutError:
print_utils.debug_print('[TIMEDOUT] Process ', process.pid)
print_utils.debug_print('[TIMEDOUT] Sending SIGTERM.')
process.terminate()
# 5 second timeout for process to handle SIGTERM nicely.
try:
(remaining_stdout,
remaining_stderr) = await asyncio.wait_for(process.communicate(), 5)
script_output += remaining_stdout
except asyncio.TimeoutError:
print_utils.debug_print('[TIMEDOUT] Sending SIGKILL.')
process.kill()
# 5 second timeout to finish with SIGKILL.
try:
(remaining_stdout,
remaining_stderr) = await asyncio.wait_for(process.communicate(), 5)
script_output += remaining_stdout
except asyncio.TimeoutError:
# give up, this will leave a zombie process.
print_utils.debug_print('[TIMEDOUT] SIGKILL failed for process ',
process.pid)
time.sleep(100)
return -1, script_output
else:
if not line: # EOF
break
code = await process.wait() # wait for child process to exit
return code, script_output

View File

@@ -0,0 +1,29 @@
#!/usr/bin/env python3
#
# Copyright 2019, The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helper util libraries for debug printing."""
import sys
DEBUG = False
def debug_print(*args, **kwargs):
"""Prints the args to sys.stderr if the DEBUG is set."""
if DEBUG:
print(*args, **kwargs, file=sys.stderr)
def error_print(*args, **kwargs):
print('[ERROR]:', *args, file=sys.stderr, **kwargs)