-
Notifications
You must be signed in to change notification settings - Fork 572
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bound preprocess and parallel signing calls #4564
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -322,7 +322,7 @@ def get_preprocess_task(): | |
messages = pubsub_puller.get_messages(max_messages=1) | ||
if not messages: | ||
return None | ||
task = get_task_from_message(messages[0]) | ||
task = get_task_from_message(messages[0], task_cls=PubSubTTask) | ||
if task: | ||
logs.info('Pulled from preprocess queue.') | ||
return task | ||
|
@@ -516,12 +516,45 @@ def dont_retry(self): | |
self._pubsub_message.ack() | ||
|
||
|
||
def get_task_from_message(message, can_defer=True) -> Optional[PubSubTask]: | ||
class PubSubTTask(PubSubTask): | ||
"""TTask that won't repeat on timeout.""" | ||
TTASK_TIMEOUT = 30 * 60 | ||
|
||
@contextlib.contextmanager | ||
def lease(self, _event=None): # pylint: disable=arguments-differ | ||
"""Maintain a lease for the task.""" | ||
task_lease_timeout = TASK_LEASE_SECONDS_BY_COMMAND.get( | ||
self.command, get_task_lease_timeout()) | ||
|
||
environment.set_value('TASK_LEASE_SECONDS', task_lease_timeout) | ||
track_task_start(self, task_lease_timeout) | ||
if _event is None: | ||
_event = threading.Event() | ||
if self.command != 'fuzz': | ||
leaser_thread = _PubSubLeaserThread(self._pubsub_message, _event, | ||
task_lease_timeout) | ||
else: | ||
leaser_thread = _PubSubLeaserThread( | ||
self._pubsub_message, _event, self.TTASK_TIMEOUT, ack_on_timeout=True) | ||
leaser_thread.start() | ||
try: | ||
yield leaser_thread | ||
finally: | ||
_event.set() | ||
leaser_thread.join() | ||
|
||
# If we get here the task succeeded in running. Acknowledge the message. | ||
self._pubsub_message.ack() | ||
track_task_end() | ||
|
||
|
||
def get_task_from_message(message, can_defer=True, | ||
task_cls=None) -> Optional[PubSubTask]: | ||
"""Returns a task constructed from the first of |messages| if possible.""" | ||
if message is None: | ||
return None | ||
try: | ||
task = initialize_task(message) | ||
task = initialize_task(message, task_cls=task_cls) | ||
except KeyError: | ||
logs.error('Received an invalid task, discarding...') | ||
message.ack() | ||
|
@@ -560,11 +593,13 @@ def handle_multiple_utask_main_messages(messages) -> List[PubSubTask]: | |
return tasks | ||
|
||
|
||
def initialize_task(message) -> PubSubTask: | ||
def initialize_task(message, task_cls=None) -> PubSubTask: | ||
"""Creates a task from |messages|.""" | ||
if task_cls is None: | ||
task_cls = PubSubTask | ||
|
||
if message.attributes.get('eventType') != 'OBJECT_FINALIZE': | ||
return PubSubTask(message) | ||
return task_cls(message) | ||
|
||
# Handle postprocess task. | ||
# The GCS API for pub/sub notifications uses the data field unlike | ||
|
@@ -598,13 +633,18 @@ class _PubSubLeaserThread(threading.Thread): | |
|
||
EXTENSION_TIME_SECONDS = 10 * 60 # 10 minutes. | ||
|
||
def __init__(self, message, done_event, max_lease_seconds): | ||
def __init__(self, | ||
message, | ||
done_event, | ||
max_lease_seconds, | ||
ack_on_timeout=False): | ||
super().__init__() | ||
|
||
self.daemon = True | ||
self._message = message | ||
self._done_event = done_event | ||
self._max_lease_seconds = max_lease_seconds | ||
self._ack_on_timeout = ack_on_timeout | ||
|
||
def run(self): | ||
"""Run the leaser thread.""" | ||
|
@@ -616,6 +656,9 @@ def run(self): | |
if time_left <= 0: | ||
logs.info('Lease reached maximum lease time of {} seconds, ' | ||
'stopping renewal.'.format(self._max_lease_seconds)) | ||
if self._ack_on_timeout: | ||
logs.info('Acking on timeout') | ||
self._message.ack() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why acking? Why is this preferred to exiting and retrying later? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For fuzz tasks, I'd rather things starve for now than gum up the queue. |
||
break | ||
|
||
extension_seconds = min(self.EXTENSION_TIME_SECONDS, time_left) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,7 @@ | |
"""Functions for managing Google Cloud Storage.""" | ||
|
||
import collections | ||
from concurrent import futures | ||
import copy | ||
import datetime | ||
import json | ||
|
@@ -1367,7 +1368,7 @@ def _mappable_sign_urls_for_existing_file(url_and_include_delete_urls): | |
def sign_urls_for_existing_files(urls, include_delete_urls): | ||
logs.info('Signing URLs for existing files.') | ||
args = ((url, include_delete_urls) for url in urls) | ||
result = maybe_parallel_map(_sign_urls_for_existing_file, args) | ||
result = parallel_map(_sign_urls_for_existing_file, args) | ||
logs.info('Done signing URLs for existing files.') | ||
return result | ||
|
||
|
@@ -1377,17 +1378,24 @@ def get_arbitrary_signed_upload_url(remote_directory): | |
get_arbitrary_signed_upload_urls(remote_directory, num_uploads=1))[0] | ||
|
||
|
||
def maybe_parallel_map(func, arguments): | ||
def parallel_map(func, argument_list): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The if statement is being removed because the rearch is complete, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct! |
||
"""Wrapper around pool.map so we don't do it on OSS-Fuzz hosts which | ||
will OOM.""" | ||
if not environment.is_tworker(): | ||
# TODO(b/metzman): When the rearch is done, internal google CF won't have | ||
# tworkers, but maybe should be using parallel. | ||
return map(func, arguments) | ||
|
||
max_size = 2 | ||
with concurrency.make_pool(cpu_bound=True, max_pool_size=max_size) as pool: | ||
return pool.imap_unordered(func, arguments) | ||
timeout = 120 | ||
with concurrency.make_pool(max_pool_size=max_size) as pool: | ||
calls = {pool.submit(func, argument) for argument in argument_list} | ||
while calls: | ||
finished_calls, _ = futures.wait( | ||
calls, timeout=timeout, return_when=futures.FIRST_COMPLETED) | ||
if not finished_calls: | ||
logs.error('No call completed.') | ||
for call in calls: | ||
call.cancel() | ||
raise TimeoutError(f'Nothing completed within {timeout} seconds') | ||
for call in finished_calls: | ||
calls.remove(call) | ||
yield call.result(timeout=timeout) | ||
|
||
|
||
def get_arbitrary_signed_upload_urls(remote_directory: str, num_uploads: int): | ||
|
@@ -1406,6 +1414,6 @@ def get_arbitrary_signed_upload_urls(remote_directory: str, num_uploads: int): | |
|
||
urls = (f'{base_path}-{idx}' for idx in range(num_uploads)) | ||
logs.info('Signing URLs for arbitrary uploads.') | ||
result = maybe_parallel_map(get_signed_upload_url, urls) | ||
result = parallel_map(get_signed_upload_url, urls) | ||
logs.info('Done signing URLs for arbitrary uploads.') | ||
return result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why processes and not threads?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is mostly used for CPU bound tasks such as cryptographic signing. In addition, Python's threads suck and processes are usually faster for things like I/O bound stuff.