diff --git a/src/clusterfuzz/_internal/base/concurrency.py b/src/clusterfuzz/_internal/base/concurrency.py index 36fcb39e47..52ce2d4b78 100644 --- a/src/clusterfuzz/_internal/base/concurrency.py +++ b/src/clusterfuzz/_internal/base/concurrency.py @@ -21,22 +21,8 @@ POOL_SIZE = multiprocessing.cpu_count() -class SingleThreadPool: - """Single thread pool for when it's not worth using Python's thread - implementation.""" - - def __init__(self, size): - del size - - def map(self, f, l): - return list(map(f, l)) - - def imap_unordered(self, f, l): - return list(map(f, l)) - - @contextlib.contextmanager -def make_pool(pool_size=POOL_SIZE, cpu_bound=False, max_pool_size=None): +def make_pool(pool_size=POOL_SIZE, max_pool_size=None): """Returns a pool that can (usually) execute tasks concurrently.""" if max_pool_size is not None: pool_size = min(pool_size, max_pool_size) @@ -44,12 +30,9 @@ def make_pool(pool_size=POOL_SIZE, cpu_bound=False, max_pool_size=None): # Don't use processes on Windows and unittests to avoid hangs. if (environment.get_value('PY_UNITTESTS') or environment.platform() == 'WINDOWS'): - if cpu_bound: - yield SingleThreadPool(pool_size) - else: - yield futures.ThreadPoolExecutor(pool_size) + yield futures.ThreadPoolExecutor(pool_size) else: - yield multiprocessing.Pool(pool_size) + yield futures.ProcessPoolExecutor(pool_size) # TODO(metzman): Find out if batching makes things even faster. diff --git a/src/clusterfuzz/_internal/base/tasks/__init__.py b/src/clusterfuzz/_internal/base/tasks/__init__.py index bbf15e57ed..5cd85efd81 100644 --- a/src/clusterfuzz/_internal/base/tasks/__init__.py +++ b/src/clusterfuzz/_internal/base/tasks/__init__.py @@ -322,7 +322,7 @@ def get_preprocess_task(): messages = pubsub_puller.get_messages(max_messages=1) if not messages: return None - task = get_task_from_message(messages[0]) + task = get_task_from_message(messages[0], task_cls=PubSubTTask) if task: logs.info('Pulled from preprocess queue.') return task @@ -516,12 +516,45 @@ def dont_retry(self): self._pubsub_message.ack() -def get_task_from_message(message, can_defer=True) -> Optional[PubSubTask]: +class PubSubTTask(PubSubTask): + """TTask that won't repeat on timeout.""" + TTASK_TIMEOUT = 30 * 60 + + @contextlib.contextmanager + def lease(self, _event=None): # pylint: disable=arguments-differ + """Maintain a lease for the task.""" + task_lease_timeout = TASK_LEASE_SECONDS_BY_COMMAND.get( + self.command, get_task_lease_timeout()) + + environment.set_value('TASK_LEASE_SECONDS', task_lease_timeout) + track_task_start(self, task_lease_timeout) + if _event is None: + _event = threading.Event() + if self.command != 'fuzz': + leaser_thread = _PubSubLeaserThread(self._pubsub_message, _event, + task_lease_timeout) + else: + leaser_thread = _PubSubLeaserThread( + self._pubsub_message, _event, self.TTASK_TIMEOUT, ack_on_timeout=True) + leaser_thread.start() + try: + yield leaser_thread + finally: + _event.set() + leaser_thread.join() + + # If we get here the task succeeded in running. Acknowledge the message. + self._pubsub_message.ack() + track_task_end() + + +def get_task_from_message(message, can_defer=True, + task_cls=None) -> Optional[PubSubTask]: """Returns a task constructed from the first of |messages| if possible.""" if message is None: return None try: - task = initialize_task(message) + task = initialize_task(message, task_cls=task_cls) except KeyError: logs.error('Received an invalid task, discarding...') message.ack() @@ -560,11 +593,13 @@ def handle_multiple_utask_main_messages(messages) -> List[PubSubTask]: return tasks -def initialize_task(message) -> PubSubTask: +def initialize_task(message, task_cls=None) -> PubSubTask: """Creates a task from |messages|.""" + if task_cls is None: + task_cls = PubSubTask if message.attributes.get('eventType') != 'OBJECT_FINALIZE': - return PubSubTask(message) + return task_cls(message) # Handle postprocess task. # The GCS API for pub/sub notifications uses the data field unlike @@ -598,13 +633,18 @@ class _PubSubLeaserThread(threading.Thread): EXTENSION_TIME_SECONDS = 10 * 60 # 10 minutes. - def __init__(self, message, done_event, max_lease_seconds): + def __init__(self, + message, + done_event, + max_lease_seconds, + ack_on_timeout=False): super().__init__() self.daemon = True self._message = message self._done_event = done_event self._max_lease_seconds = max_lease_seconds + self._ack_on_timeout = ack_on_timeout def run(self): """Run the leaser thread.""" @@ -616,6 +656,9 @@ def run(self): if time_left <= 0: logs.info('Lease reached maximum lease time of {} seconds, ' 'stopping renewal.'.format(self._max_lease_seconds)) + if self._ack_on_timeout: + logs.info('Acking on timeout') + self._message.ack() break extension_seconds = min(self.EXTENSION_TIME_SECONDS, time_left) diff --git a/src/clusterfuzz/_internal/google_cloud_utils/storage.py b/src/clusterfuzz/_internal/google_cloud_utils/storage.py index 11107d956c..7a7bee5211 100644 --- a/src/clusterfuzz/_internal/google_cloud_utils/storage.py +++ b/src/clusterfuzz/_internal/google_cloud_utils/storage.py @@ -14,6 +14,7 @@ """Functions for managing Google Cloud Storage.""" import collections +from concurrent import futures import copy import datetime import json @@ -1367,7 +1368,7 @@ def _mappable_sign_urls_for_existing_file(url_and_include_delete_urls): def sign_urls_for_existing_files(urls, include_delete_urls): logs.info('Signing URLs for existing files.') args = ((url, include_delete_urls) for url in urls) - result = maybe_parallel_map(_sign_urls_for_existing_file, args) + result = parallel_map(_sign_urls_for_existing_file, args) logs.info('Done signing URLs for existing files.') return result @@ -1377,17 +1378,24 @@ def get_arbitrary_signed_upload_url(remote_directory): get_arbitrary_signed_upload_urls(remote_directory, num_uploads=1))[0] -def maybe_parallel_map(func, arguments): +def parallel_map(func, argument_list): """Wrapper around pool.map so we don't do it on OSS-Fuzz hosts which will OOM.""" - if not environment.is_tworker(): - # TODO(b/metzman): When the rearch is done, internal google CF won't have - # tworkers, but maybe should be using parallel. - return map(func, arguments) - max_size = 2 - with concurrency.make_pool(cpu_bound=True, max_pool_size=max_size) as pool: - return pool.imap_unordered(func, arguments) + timeout = 120 + with concurrency.make_pool(max_pool_size=max_size) as pool: + calls = {pool.submit(func, argument) for argument in argument_list} + while calls: + finished_calls, _ = futures.wait( + calls, timeout=timeout, return_when=futures.FIRST_COMPLETED) + if not finished_calls: + logs.error('No call completed.') + for call in calls: + call.cancel() + raise TimeoutError(f'Nothing completed within {timeout} seconds') + for call in finished_calls: + calls.remove(call) + yield call.result(timeout=timeout) def get_arbitrary_signed_upload_urls(remote_directory: str, num_uploads: int): @@ -1406,6 +1414,6 @@ def get_arbitrary_signed_upload_urls(remote_directory: str, num_uploads: int): urls = (f'{base_path}-{idx}' for idx in range(num_uploads)) logs.info('Signing URLs for arbitrary uploads.') - result = maybe_parallel_map(get_signed_upload_url, urls) + result = parallel_map(get_signed_upload_url, urls) logs.info('Done signing URLs for arbitrary uploads.') return result