Add generic tworker. (#4418)

This tworker will only run preprocess and postprocess. It can do this
regardless of the platform of the task.

Also, get rid of some kludges used to test utasks in oss-fuzz.
b/380104573
This commit is contained in:
jonathanmetzman 2024-11-21 15:40:23 -05:00 committed by GitHub
parent cef1e4945e
commit ccb5ef631d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 102 additions and 125 deletions

View File

@ -27,6 +27,7 @@ IMAGES=(
gcr.io/clusterfuzz-images/oss-fuzz/worker
gcr.io/clusterfuzz-images/ci
gcr.io/clusterfuzz-images/utask-main-scheduler
gcr.io/clusterfuzz-images/tworker
gcr.io/clusterfuzz-images/fuchsia
)

17
docker/tworker/Dockerfile Normal file
View File

@ -0,0 +1,17 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM gcr.io/clusterfuzz-images/base
# Worker that only reads from queues, preprocesses and postprocesses.
ENV TWORKER=1

View File

@ -83,6 +83,7 @@ TASK_END_TIME_KEY = 'task_end_time'
POSTPROCESS_QUEUE = 'postprocess'
UTASK_MAINS_QUEUE = 'utask_main'
PREPROCESS_QUEUE = 'preprocess'
# See https://github.com/google/clusterfuzz/issues/3347 for usage
SUBQUEUE_IDENTIFIER = ':'
@ -281,8 +282,7 @@ class PubSubPuller:
def get_postprocess_task():
"""Gets a postprocess task if one exists."""
# This should only be run on non-preemptible bots.
if not (task_utils.is_remotely_executing_utasks() or
task_utils.get_opted_in_tasks()):
if not task_utils.is_remotely_executing_utasks():
return None
# Postprocess is platform-agnostic, so we run all such tasks on our
# most generic and plentiful bots only. In other words, we avoid
@ -304,9 +304,29 @@ def allow_all_tasks():
return not environment.get_value('PREEMPTIBLE')
def get_preprocess_task():
pubsub_puller = PubSubPuller(PREPROCESS_QUEUE)
messages = pubsub_puller.get_messages(max_messages=1)
if not messages:
return None
task = get_task_from_message(messages[0])
if task:
logs.info('Pulled from preprocess queue.')
return task
def tworker_get_task():
assert environment.is_tworker()
task = get_postprocess_task()
if task:
return task
return get_preprocess_task()
def get_task():
"""Returns an ordinary (non-postprocess, non-utask_main) task that is pulled
from a ClusterFuzz task queue."""
"""Returns an ordinary (non-utask_main) task that is pulled from a ClusterFuzz
task queue."""
task = get_command_override()
if task:
return task
@ -319,6 +339,7 @@ def get_task():
task = get_postprocess_task()
if task:
return task
# Check the high-end jobs queue for bots with multiplier greater than 1.
thread_multiplier = environment.get_value('THREAD_MULTIPLIER')
if thread_multiplier and thread_multiplier > 1:
@ -368,8 +389,7 @@ class Task:
eta=None,
is_command_override=False,
high_end=False,
extra_info=None,
is_from_queue=False):
extra_info=None):
self.command = command
self.argument = argument
self.job = job
@ -378,16 +398,6 @@ class Task:
self.high_end = high_end
self.extra_info = extra_info
# is_from_queue is a temporary hack to keep track of which fuzz tasks came
# from the queue. Previously all fuzz tasks were picked by the bot when
# there was nothing on the queue. With the rearchitecture, we want fuzz
# tasks that were put on the queue by the schedule_fuzz cron job to be
# executed on batch. is_from_queue is used to do this.
# TODO(b/378684001): This code is very ugly, get rid of it when no more
# fuzz tasks are executed on the bots themselves (i.e. when the rearch
# is complete).
self.is_from_queue = is_from_queue
def __repr__(self):
return f'Task: {self.command} {self.argument} {self.job}'
@ -428,13 +438,11 @@ class Task:
class PubSubTask(Task):
"""A Pub/Sub task."""
def __init__(self, pubsub_message, is_from_queue=False):
def __init__(self, pubsub_message):
self._pubsub_message = pubsub_message
super().__init__(
self.attribute('command'),
self.attribute('argument'),
self.attribute('job'),
is_from_queue=is_from_queue)
self.attribute('command'), self.attribute('argument'),
self.attribute('job'))
self.extra_info = {
key: value
@ -540,7 +548,7 @@ def initialize_task(message) -> PubSubTask:
"""Creates a task from |messages|."""
if message.attributes.get('eventType') != 'OBJECT_FINALIZE':
return PubSubTask(message, is_from_queue=True)
return PubSubTask(message)
# Handle postprocess task.
# The GCS API for pub/sub notifications uses the data field unlike
@ -549,7 +557,7 @@ def initialize_task(message) -> PubSubTask:
name = data['name']
bucket = data['bucket']
output_url_argument = storage.get_cloud_storage_file_path(bucket, name)
return PostprocessPubSubTask(output_url_argument, message, is_from_queue=True)
return PostprocessPubSubTask(output_url_argument, message)
class PostprocessPubSubTask(PubSubTask):
@ -558,21 +566,14 @@ class PostprocessPubSubTask(PubSubTask):
def __init__(self,
output_url_argument,
pubsub_message,
is_command_override=False,
is_from_queue=False):
is_command_override=False):
command = 'postprocess'
job_type = 'none'
eta = None
high_end = False
grandparent_class = super(PubSubTask, self)
grandparent_class.__init__(
command,
output_url_argument,
job_type,
eta,
is_command_override,
high_end,
is_from_queue=is_from_queue)
grandparent_class.__init__(command, output_url_argument, job_type, eta,
is_command_override, high_end)
self._pubsub_message = pubsub_message

View File

@ -15,7 +15,6 @@
any other module in tasks to prevent circular imports and issues with
appengine."""
from clusterfuzz._internal.config import local_config
from clusterfuzz._internal.system import environment
@ -26,25 +25,12 @@ def get_command_from_module(full_module_name: str) -> str:
return module_name[:-len('_task')]
def is_remotely_executing_utasks(task=None) -> bool:
def is_remotely_executing_utasks() -> bool:
"""Returns True if the utask_main portions of utasks are being remotely
executed on Google cloud batch."""
if bool(environment.is_production() and
environment.get_value('REMOTE_UTASK_EXECUTION')):
return True
if task is None:
return False
return bool(is_task_opted_into_uworker_execution(task))
def get_opted_in_tasks():
return local_config.ProjectConfig().get('uworker_tasks', [])
def is_task_opted_into_uworker_execution(task: str) -> bool:
# TODO(metzman): Remove this after OSS-Fuzz and Chrome are at parity.
uworker_tasks = get_opted_in_tasks()
return task in uworker_tasks
# TODO(metzman): REMOTE_UTASK_EXECUTION should be a config not an env var.
return (environment.is_production() and
environment.get_value('REMOTE_UTASK_EXECUTION'))
class UworkerMsgParseError(RuntimeError):

View File

@ -190,11 +190,25 @@ def start_web_server_if_needed():
logs.error('Failed to start web server, skipping.')
def get_command_object(task_name):
"""Returns the command object that execute can be called on."""
task = COMMAND_MAP.get(task_name)
if not environment.is_tworker():
return task
if isinstance(task, task_types.TrustedTask):
# We don't need to execute this remotely.
return task
# Force remote execution.
return task_types.UTask(task_name)
def run_command(task_name, task_argument, job_name, uworker_env):
"""Runs the command."""
task = COMMAND_MAP.get(task_name)
task = get_command_object(task_name)
if not task:
logs.error("Unknown command '%s'" % task_name)
logs.error(f'Unknown command "{task_name}"')
return None
# If applicable, ensure this is the only instance of the task running.
@ -253,10 +267,8 @@ def process_command(task):
logs.error('Empty task received.')
return None
# TODO(b/378684001): Remove is_from_queue kludge.
return process_command_impl(task.command, task.argument, task.job,
task.high_end, task.is_command_override,
task.is_from_queue)
task.high_end, task.is_command_override)
def _get_task_id(task_name, task_argument, job_name):
@ -267,13 +279,12 @@ def _get_task_id(task_name, task_argument, job_name):
# TODO(mbarbella): Rewrite this function to avoid nesting issues.
@set_task_payload
def process_command_impl(task_name, task_argument, job_name, high_end,
is_command_override, is_from_queue):
is_command_override):
"""Implementation of process_command."""
uworker_env = None
environment.set_value('TASK_NAME', task_name)
environment.set_value('TASK_ARGUMENT', task_argument)
environment.set_value('JOB_NAME', job_name)
environment.set_value('IS_FROM_QUEUE', is_from_queue)
if task_name in {'uworker_main', 'postprocess'}:
# We want the id of the task we are processing, not "uworker_main", or
# "postprocess".
@ -456,5 +467,3 @@ def process_command_impl(task_name, task_argument, job_name, high_end,
cleanup_task_state()
if 'CF_TASK_ID' in os.environ:
del os.environ['CF_TASK_ID']
if 'IS_FROM_QUEUE' in os.environ:
del os.environ['IS_FROM_QUEUE']

View File

@ -46,6 +46,7 @@ class TrustedTask(BaseTask):
def execute(self, task_argument, job_type, uworker_env):
# Simple tasks can just use the environment they don't need the uworker env.
del uworker_env
assert not environment.is_tworker()
self.module.execute_task(task_argument, job_type)
@ -58,6 +59,8 @@ class BaseUTask(BaseTask):
raise NotImplementedError('Child class must implement.')
def execute_locally(self, task_argument, job_type, uworker_env):
"""Executes the utask locally (on this machine, not on batch)."""
assert not environment.is_tworker()
uworker_input = utasks.tworker_preprocess_no_io(self.module, task_argument,
job_type, uworker_env)
if uworker_input is None:
@ -119,7 +122,7 @@ class UTask(BaseUTask):
@staticmethod
def is_execution_remote(command=None):
return task_utils.is_remotely_executing_utasks(command)
return task_utils.is_remotely_executing_utasks()
def execute(self, task_argument, job_type, uworker_env):
"""Executes a utask."""
@ -156,19 +159,6 @@ class UTask(BaseUTask):
return download_url
# TODO(b/378684001): Remove this, it's needed for testing but is otherwise a bad
# design.
class UTaskMostlyLocalExecutor(UTask):
@staticmethod
def is_execution_remote(command=None):
del command
if environment.get_value('IS_FROM_QUEUE'):
logs.info('IS FROM QUEUE')
return True
return False
class PostprocessTask(BaseTask):
"""Represents postprocessing of an untrusted task."""
@ -211,7 +201,7 @@ COMMAND_TYPES = {
'analyze': UTask,
'blame': TrustedTask,
'corpus_pruning': UTask,
'fuzz': UTaskMostlyLocalExecutor,
'fuzz': UTaskLocalExecutor,
'impact': TrustedTask,
'minimize': UTask,
'progression': UTask,

View File

@ -20,7 +20,6 @@ from typing import Dict
from googleapiclient import discovery
from clusterfuzz._internal.base import concurrency
from clusterfuzz._internal.base import tasks
from clusterfuzz._internal.base import utils
from clusterfuzz._internal.config import local_config
@ -167,14 +166,12 @@ class OssfuzzFuzzTaskScheduler(BaseFuzzTaskScheduler):
choices = random.choices(
fuzz_task_candidates, weights=weights, k=num_instances)
queues_to_tasks = collections.defaultdict(list)
for fuzz_task_candidate in choices:
queue_tasks = queues_to_tasks[fuzz_task_candidate.queue]
task = tasks.Task('fuzz', fuzz_task_candidate.fuzzer,
fuzz_task_candidate.job)
queue_tasks.append(task)
return queues_to_tasks
fuzz_tasks = [
tasks.Task('fuzz', fuzz_task_candidate.fuzzer, fuzz_task_candidate.job)
for fuzz_task_candidate in choices
]
# TODO(metzman): Remove the queue stuff if it's uneeded for Chrome.
return fuzz_tasks
def get_fuzz_tasks(available_cpus: int) -> [tasks.Task]:
@ -210,10 +207,8 @@ def schedule_fuzz_tasks() -> bool:
logs.error('No fuzz tasks found to schedule.')
return False
# TODO(b/378684001): Change this to using one queue when oss-fuzz's untrusted
# worker model is deleted.
with concurrency.make_pool() as pool:
list(pool.map(bulk_add, fuzz_tasks.items()))
logs.info(f'Adding {fuzz_tasks} to preprocess queue.')
tasks.bulk_add_tasks(fuzz_tasks, queue=tasks.PREPROCESS_QUEUE, eta_now=True)
logs.info(f'Scheduled {len(fuzz_tasks)} fuzz tasks.')
end = time.time()
@ -222,11 +217,5 @@ def schedule_fuzz_tasks() -> bool:
return True
def bulk_add(queue_and_tasks):
queue, task_list = queue_and_tasks
logs.info(f'Adding {task_list} to {queue}.')
tasks.bulk_add_tasks(task_list, queue=queue, eta_now=True)
def main():
return schedule_fuzz_tasks()

View File

@ -1177,3 +1177,7 @@ def can_testcase_run_on_platform(testcase_platform_id, current_platform_id):
current_platform_id)
return False
def is_tworker():
return get_value('TWORKER', False)

View File

@ -52,13 +52,11 @@ class GetTaskTest(unittest.TestCase):
'clusterfuzz._internal.base.persistent_cache.get_value',
'clusterfuzz._internal.base.persistent_cache.set_value',
'clusterfuzz._internal.base.utils.utcnow',
'clusterfuzz._internal.base.tasks.task_utils.get_opted_in_tasks',
'time.sleep',
])
self.mock.get_value.return_value = None
self.mock.sleep.return_value = None
self.mock.get_opted_in_tasks.return_value = False
data_types.Job(name='job').put()
client = pubsub.PubSubClient()

View File

@ -69,16 +69,12 @@ class OssfuzzFuzzTaskScheduler(unittest.TestCase):
num_cpus = 10
scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_cpus)
results = scheduler.get_fuzz_tasks()
tasks = scheduler.get_fuzz_tasks()
comparable_results = []
for tasks in results.values():
comparable_tasks = []
for task in tasks:
comparable_tasks.append((task.command, task.argument, task.job))
comparable_results.append(comparable_tasks)
for task in tasks:
comparable_results.append((task.command, task.argument, task.job))
expected_results = [[('fuzz', 'libFuzzer', 'myjob')] * 5]
self.assertListEqual(list(results.keys()), ['jobs-linux'])
expected_results = [('fuzz', 'libFuzzer', 'myjob')] * 5
self.assertListEqual(comparable_results, expected_results)

View File

@ -12,12 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for task_utils."""
import os
import unittest
from clusterfuzz._internal.base.tasks import task_utils
from clusterfuzz._internal.bot.tasks import commands
from clusterfuzz._internal.tests.test_libs import helpers
class GetCommandFromModuleTest(unittest.TestCase):
@ -35,19 +33,3 @@ class GetCommandFromModuleTest(unittest.TestCase):
task_utils.get_command_from_module('postprocess')
with self.assertRaises(ValueError):
task_utils.get_command_from_module('uworker_main')
class IsTaskOptedIntoUworkerExecution(unittest.TestCase):
"""Tests that is_task_opted_into_uworker_execution only returns True for the
tasks we are testing in oss-fuzz."""
def setUp(self):
helpers.patch_environ(self)
def test_opt_in(self):
os.environ['JOB_NAME'] = 'libfuzzer_asan_skia'
self.assertTrue(task_utils.is_task_opted_into_uworker_execution('analyze'))
def test_no_opt_in(self):
os.environ['JOB_NAME'] = 'libfuzzer_asan_skia'
self.assertFalse(task_utils.is_task_opted_into_uworker_execution('fuzz'))

View File

@ -29,7 +29,7 @@ import traceback
from clusterfuzz._internal.base import dates
from clusterfuzz._internal.base import errors
from clusterfuzz._internal.base import tasks as taskslib
from clusterfuzz._internal.base import tasks
from clusterfuzz._internal.base import untrusted
from clusterfuzz._internal.base import utils
from clusterfuzz._internal.base.tasks import task_utils
@ -89,7 +89,7 @@ def schedule_utask_mains():
from clusterfuzz._internal.google_cloud_utils import batch
logs.info('Attempting to combine batch tasks.')
utask_mains = taskslib.get_utask_mains()
utask_mains = tasks.get_utask_mains()
if not utask_mains:
logs.info('No utask mains.')
return
@ -135,7 +135,11 @@ def task_loop():
schedule_utask_mains()
continue
task = taskslib.get_task()
if environment.is_tworker():
task = tasks.tworker_get_task()
else:
task = tasks.get_task()
if not task:
continue