Skip to content

Commit

Permalink
Reland #4565 and #4528 and fix issues. (#4570)
Browse files Browse the repository at this point in the history
Only #4565 was broken. #4528 is actually needed to prevent congestion.
Fix the issue that the combination of them caused, python having too
many parallelism APIs.
  • Loading branch information
jonathanmetzman authored Dec 30, 2024
1 parent 8d7b9ab commit cbba25e
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 73 deletions.
23 changes: 3 additions & 20 deletions src/clusterfuzz/_internal/base/concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,35 +21,18 @@
POOL_SIZE = multiprocessing.cpu_count()


class SingleThreadPool:
"""Single thread pool for when it's not worth using Python's thread
implementation."""

def __init__(self, size):
del size

def map(self, f, l):
return list(map(f, l))

def imap_unordered(self, f, l):
return list(map(f, l))


@contextlib.contextmanager
def make_pool(pool_size=POOL_SIZE, cpu_bound=False, max_pool_size=None):
def make_pool(pool_size=POOL_SIZE, max_pool_size=None):
"""Returns a pool that can (usually) execute tasks concurrently."""
if max_pool_size is not None:
pool_size = min(pool_size, max_pool_size)

# Don't use processes on Windows and unittests to avoid hangs.
if (environment.get_value('PY_UNITTESTS') or
environment.platform() == 'WINDOWS'):
if cpu_bound:
yield SingleThreadPool(pool_size)
else:
yield futures.ThreadPoolExecutor(pool_size)
yield futures.ThreadPoolExecutor(pool_size)
else:
yield multiprocessing.Pool(pool_size)
yield futures.ProcessPoolExecutor(pool_size)


# TODO(metzman): Find out if batching makes things even faster.
59 changes: 52 additions & 7 deletions src/clusterfuzz/_internal/base/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@
MAX_LEASED_TASKS_LIMIT = 1000
MAX_TASKS_LIMIT = 100000

MAX_PUBSUB_MESSAGES_PER_REQ = 1000
# The stated limit is 1000, but in reality meassages do not get delivered
# around this limit. We should probably switch to the real client library.
MAX_PUBSUB_MESSAGES_PER_REQ = 250

# Various variables for task leasing and completion times (in seconds).
TASK_COMPLETION_BUFFER = 90 * 60
Expand Down Expand Up @@ -320,7 +322,7 @@ def get_preprocess_task():
messages = pubsub_puller.get_messages(max_messages=1)
if not messages:
return None
task = get_task_from_message(messages[0])
task = get_task_from_message(messages[0], task_cls=PubSubTTask)
if task:
logs.info('Pulled from preprocess queue.')
return task
Expand Down Expand Up @@ -514,12 +516,45 @@ def dont_retry(self):
self._pubsub_message.ack()


def get_task_from_message(message, can_defer=True) -> Optional[PubSubTask]:
class PubSubTTask(PubSubTask):
"""TTask that won't repeat on timeout."""
TTASK_TIMEOUT = 30 * 60

@contextlib.contextmanager
def lease(self, _event=None): # pylint: disable=arguments-differ
"""Maintain a lease for the task."""
task_lease_timeout = TASK_LEASE_SECONDS_BY_COMMAND.get(
self.command, get_task_lease_timeout())

environment.set_value('TASK_LEASE_SECONDS', task_lease_timeout)
track_task_start(self, task_lease_timeout)
if _event is None:
_event = threading.Event()
if self.command != 'fuzz':
leaser_thread = _PubSubLeaserThread(self._pubsub_message, _event,
task_lease_timeout)
else:
leaser_thread = _PubSubLeaserThread(
self._pubsub_message, _event, self.TTASK_TIMEOUT, ack_on_timeout=True)
leaser_thread.start()
try:
yield leaser_thread
finally:
_event.set()
leaser_thread.join()

# If we get here the task succeeded in running. Acknowledge the message.
self._pubsub_message.ack()
track_task_end()


def get_task_from_message(message, can_defer=True,
task_cls=None) -> Optional[PubSubTask]:
"""Returns a task constructed from the first of |messages| if possible."""
if message is None:
return None
try:
task = initialize_task(message)
task = initialize_task(message, task_cls=task_cls)
except KeyError:
logs.error('Received an invalid task, discarding...')
message.ack()
Expand Down Expand Up @@ -558,11 +593,13 @@ def handle_multiple_utask_main_messages(messages) -> List[PubSubTask]:
return tasks


def initialize_task(message) -> PubSubTask:
def initialize_task(message, task_cls=None) -> PubSubTask:
"""Creates a task from |messages|."""
if task_cls is None:
task_cls = PubSubTask

if message.attributes.get('eventType') != 'OBJECT_FINALIZE':
return PubSubTask(message)
return task_cls(message)

# Handle postprocess task.
# The GCS API for pub/sub notifications uses the data field unlike
Expand Down Expand Up @@ -596,13 +633,18 @@ class _PubSubLeaserThread(threading.Thread):

EXTENSION_TIME_SECONDS = 10 * 60 # 10 minutes.

def __init__(self, message, done_event, max_lease_seconds):
def __init__(self,
message,
done_event,
max_lease_seconds,
ack_on_timeout=False):
super().__init__()

self.daemon = True
self._message = message
self._done_event = done_event
self._max_lease_seconds = max_lease_seconds
self._ack_on_timeout = ack_on_timeout

def run(self):
"""Run the leaser thread."""
Expand All @@ -614,6 +656,9 @@ def run(self):
if time_left <= 0:
logs.info('Lease reached maximum lease time of {} seconds, '
'stopping renewal.'.format(self._max_lease_seconds))
if self._ack_on_timeout:
logs.info('Acking on timeout')
self._message.ack()
break

extension_seconds = min(self.EXTENSION_TIME_SECONDS, time_left)
Expand Down
136 changes: 109 additions & 27 deletions src/clusterfuzz/_internal/cron/schedule_fuzz.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,67 @@
"""Cron job to schedule fuzz tasks that run on batch."""

import collections
import multiprocessing
import random
import time
from typing import Dict
from typing import List

from google.cloud import monitoring_v3
from googleapiclient import discovery

from clusterfuzz._internal.base import tasks
from clusterfuzz._internal.base import utils
from clusterfuzz._internal.config import local_config
from clusterfuzz._internal.datastore import data_types
from clusterfuzz._internal.datastore import ndb_utils
from clusterfuzz._internal.google_cloud_utils import batch
from clusterfuzz._internal.google_cloud_utils import credentials
from clusterfuzz._internal.metrics import logs

# TODO(metzman): Actually implement this.
CPUS_PER_FUZZ_JOB = 2

def _get_quotas(project, region):
gcp_credentials = credentials.get_default()[0]
compute = discovery.build('compute', 'v1', credentials=gcp_credentials)

def _get_quotas(creds, project, region):
compute = discovery.build('compute', 'v1', credentials=creds)
return compute.regions().get( # pylint: disable=no-member
region=region, project=project).execute()['quotas']


def get_available_cpus_for_region(project: str, region: str) -> int:
def count_unacked(creds, project_id, subscription_id):
"""Counts the unacked messages in |subscription_id|."""
# TODO(metzman): Not all of these are fuzz_tasks. Deal with that.
metric = 'pubsub.googleapis.com/subscription/num_undelivered_messages'
query_filter = (f'metric.type="{metric}" AND '
f'resource.labels.subscription_id="{subscription_id}"')
time_now = time.time()
# Get the last 5 minutes.
time_interval = monitoring_v3.TimeInterval(
end_time={'seconds': int(time_now)},
start_time={'seconds': int(time_now - 5 * 60)},
)
client = monitoring_v3.MetricServiceClient(credentials=creds)
results = client.list_time_series(
request={
'filter': query_filter,
'interval': time_interval,
'name': f'projects/{project_id}',
'view': monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL,
})
# Get the latest point.
for result in results:
if len(result.points) == 0:
continue
result = int(result.points[0].value.int64_value)
logs.info(f'Unacked in {subscription_id}: {result}')
return result


def get_available_cpus_for_region(creds, project: str, region: str) -> int:
"""Returns the number of available CPUs in the current GCE region."""
quotas = _get_quotas(project, region)

quotas = _get_quotas(creds, project, region)

# Sometimes, the preemptible quota is 0, which means the number of preemptible
# CPUs is actually limited by the CPU quota.
Expand All @@ -59,13 +95,18 @@ def get_available_cpus_for_region(project: str, region: str) -> int:
assert preemptible_quota or cpu_quota

if not preemptible_quota['limit']:
# Preemptible quota is not set. Obey the CPU quota since that limitss us.
# Preemptible quota is not set. Obey the CPU quota since that limits us.
quota = cpu_quota
else:
quota = preemptible_quota
assert quota['limit'], quota

return quota['limit'] - quota['usage']
# TODO(metzman): Do this in a more configurable way.
# We need this because us-central1 and us-east4 have different numbers of
# cores alloted to us in their quota. Treat them the same to simplify things.
limit = quota['limit']
limit -= quota['usage']
return min(limit, 100_000)


class BaseFuzzTaskScheduler:
Expand All @@ -79,8 +120,7 @@ def get_fuzz_tasks(self):

def _get_cpus_per_fuzz_job(self, job_name):
del job_name
# TODO(metzman): Actually implement this.
return 2
return CPUS_PER_FUZZ_JOB


class FuzzTaskCandidate:
Expand Down Expand Up @@ -186,35 +226,77 @@ def get_batch_regions(batch_config):
subconf['name'] for subconf in batch_config.get(
'mapping.LINUX-PREEMPTIBLE-UNPRIVILEGED.subconfigs')
}

subconfs = batch_config.get('subconfigs')
return list(
set(subconfs[subconf]['region']
for subconf in subconfs
if subconf in fuzz_subconf_names))


def get_available_cpus(project: str, regions: List[str]) -> int:
"""Returns the available CPUs for fuzz tasks."""
# TODO(metzman): This doesn't distinguish between fuzz and non-fuzz
# tasks (nor preemptible and non-preemptible CPUs). Fix this.
# Get total scheduled and queued.
creds = credentials.get_default()[0]
count_args = ((project, region) for region in regions)
with multiprocessing.Pool(2) as pool:
# These calls are extremely slow (about 1 minute total).
result = pool.starmap_async( # pylint: disable=no-member
batch.count_queued_or_scheduled_tasks, count_args)
waiting_tasks = count_unacked(creds, project, 'preprocess')
waiting_tasks += count_unacked(creds, project, 'utask_main')
region_counts = zip(*result.get()) # Group all queued and all scheduled.

# Add up all queued and scheduled.
region_counts = [sum(tup) for tup in region_counts]
logs.info(f'Region counts: {region_counts}')
if region_counts[0] > 5000:
# Check queued tasks.
logs.info('Too many jobs queued, not scheduling more fuzzing.')
return 0
waiting_tasks += sum(region_counts) # Add up queued and scheduled.
soon_occupied_cpus = waiting_tasks * CPUS_PER_FUZZ_JOB
logs.info(f'Soon occupied CPUs: {soon_occupied_cpus}')
available_cpus = sum(
get_available_cpus_for_region(creds, project, region)
for region in regions)
logs.info('Actually free CPUs (before subtracting soon '
f'occupied): {available_cpus}')
available_cpus = max(available_cpus - soon_occupied_cpus, 0)

# Don't schedule more than 10K tasks at once. So we don't overload batch.
print('len_regions', len(regions))
available_cpus = min(available_cpus, 20_000 * len(regions))
return available_cpus


def schedule_fuzz_tasks() -> bool:
"""Schedules fuzz tasks."""
start = time.time()
multiprocessing.set_start_method('spawn')
batch_config = local_config.BatchConfig()
regions = set(get_batch_regions(batch_config))
project = batch_config.get('project')
available_cpus = sum(
get_available_cpus_for_region(project, region) for region in regions)
available_cpus = min(available_cpus, 3500 * len(regions))
fuzz_tasks = get_fuzz_tasks(available_cpus)
if not fuzz_tasks:
logs.error('No fuzz tasks found to schedule.')
return False

logs.info(f'Adding {fuzz_tasks} to preprocess queue.')
tasks.bulk_add_tasks(fuzz_tasks, queue=tasks.PREPROCESS_QUEUE, eta_now=True)
logs.info(f'Scheduled {len(fuzz_tasks)} fuzz tasks.')

end = time.time()
total = end - start
logs.info(f'Task scheduling took {total} seconds.')
return True
regions = get_batch_regions(batch_config)
while True:
start = time.time()
available_cpus = get_available_cpus(project, regions)
logs.error(f'{available_cpus} available CPUs.')
if not available_cpus:
continue

fuzz_tasks = get_fuzz_tasks(available_cpus)
if not fuzz_tasks:
logs.error('No fuzz tasks found to schedule.')
continue

logs.info(f'Adding {fuzz_tasks} to preprocess queue.')
tasks.bulk_add_tasks(fuzz_tasks, queue=tasks.PREPROCESS_QUEUE, eta_now=True)
logs.info(f'Scheduled {len(fuzz_tasks)} fuzz tasks.')

end = time.time()
total = end - start
logs.info(f'Task scheduling took {total} seconds.')


def main():
Expand Down
21 changes: 17 additions & 4 deletions src/clusterfuzz/_internal/google_cloud_utils/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,7 @@

def _create_batch_client_new():
"""Creates a batch client."""
creds, project = credentials.get_default()
if not project:
project = utils.get_application_id()

creds, _ = credentials.get_default()
return batch.BatchServiceClient(credentials=creds)


Expand Down Expand Up @@ -355,3 +352,19 @@ def _get_specs_from_config(batch_tasks) -> Dict:
)
specs[(task.command, task.job_type)] = spec
return specs


def count_queued_or_scheduled_tasks(project: str,
region: str) -> Tuple[int, int]:
"""Counts the number of queued and scheduled tasks."""
region = f'projects/{project}/locations/{region}'
jobs_filter = 'status.state="SCHEDULED" OR status.state="QUEUED"'
req = batch.types.ListJobsRequest(parent=region, filter=jobs_filter)
queued = 0
scheduled = 0
for job in _batch_client().list_jobs(request=req):
if job.status.state == 'SCHEDULED':
scheduled += job.task_groups[0].task_count
elif job.status.state == 'QUEUED':
queued += job.task_groups[0].task_count
return (queued, scheduled)
Loading

0 comments on commit cbba25e

Please sign in to comment.