Skip to content

Conversation

@javanlacerda
Copy link
Contributor

@javanlacerda javanlacerda commented Jan 8, 2026

This PR introduces full support for scheduling and managing fuzzing tasks on Kubernetes clusters,
specifically targeting GKE. It implements a new KubernetesService to
handle batch job creation, supports Kata Containers for isolation, and includes robust testing
and configuration mechanisms.

Key Features:

  • Kubernetes Service: A new backend for RemoteTaskInterface that schedules tasks as Kubernetes
    Jobs. It supports both standard and Kata Container runtimes, automatic Service Account
    creation with Workload Identity, and intelligent job limiting to prevent cluster overload.
  • Traffic Shifting (RemoteTaskGate): A new gating mechanism (RemoteTaskGate) that intelligently
    routes tasks between the legacy GCP Batch service and the new Kubernetes service based on
    configurable probabilities, allowing for a gradual, controlled migration.
  • Feature Flags: A new dynamic configuration system backed by Datastore to control runtime
    behaviors like job concurrency limits.

Detailed Changes by Module:

  • Kubernetes Integration (src/clusterfuzz/_internal/k8s/):

    • service.py: Implemented KubernetesService for job lifecycle management (creation,
      monitoring, limiting). Includes GKE credential loading, Kata Container spec generation,
      and Service Account provisioning.
    • Tests: Added k8s_service_test.py (unit), k8s_service_limit_test.py (limits), and
      k8s_service_e2e_test.py (integration on Kind).
  • Remote Task Management (src/clusterfuzz/_internal/remote_task/):

    • init.py: Introduced RemoteTaskGate, a smart router that implements
      RemoteTaskInterface. It initializes both GcpBatchService and KubernetesService and
      distributes tasks between them based on probabilities defined in job_frequency.py. This
      enables traffic splitting (e.g., 10% to K8s, 90% to Batch) for safe rollout.
    • job_frequency.py: Added logic to manage task scheduling frequency and split ratios.
    • Refactored core task logic to use the generic RemoteTask and RemoteTaskInterface
      abstractions.
  • Datastore & Configuration (src/clusterfuzz/_internal/datastore/):

    • data_types.py: Added FeatureFlag model to store configuration dynamically.
    • feature_flags.py: Added FeatureFlags enum/helper for type-safe access to flags (e.g.,
      K8S_PENDING_JOBS_LIMITER).
  • Batch & Legacy Refactoring (src/clusterfuzz/_internal/batch/):

    • service.py: Updated to align with the new RemoteTask interface.
    • Removed obsolete gcp.py and google_cloud_utils/batch.py utilities in favor of the new
      structure.
  • Infrastructure & CI:

    • .github/workflows/kubernetes-e2e-tests.yaml: New workflow for running E2E tests on a Kind
      cluster.
    • Pipfile / src/Pipfile: Added kubernetes client and updated Google Cloud dependencies.
  • Bot & Metrics:

    • src/python/bot/startup/run_bot.py: Updates to support K8s-based bot execution via the new
      gate.
    • src/clusterfuzz/_internal/metrics/: Enhanced logging and monitoring for remote tasks.

Evidences:

image Batch and Kata containers fuzzing hours, proving the Remote Gate, the Batch and Kubernetes services are working properly. The Feature Flag is used to set the job_frequency, then it proves the feature flag and its usage is working as well.

@javanlacerda javanlacerda changed the title Pr/dependencies Kubernetes Job service Jan 8, 2026
@javanlacerda javanlacerda force-pushed the pr/dependencies branch 5 times, most recently from 7edfb48 to b271b83 Compare January 10, 2026 22:46
@javanlacerda javanlacerda marked this pull request as ready for review January 10, 2026 22:50
@javanlacerda javanlacerda requested review from ViniciustCosta, decoNR, hunsche and jonathanmetzman and removed request for ViniciustCosta and hunsche January 10, 2026 22:50
Copy link
Collaborator

@jonathanmetzman jonathanmetzman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left some very surface level comments.

@javanlacerda javanlacerda force-pushed the pr/dependencies branch 4 times, most recently from d5684e5 to 44737d3 Compare January 15, 2026 00:44
This commit introduces the Kubernetes job client and service, providing a mechanism to schedule tasks on Kubernetes clusters (including GKE and Kind), supporting both standard and Kata Containers.

Key Features & Changes:
- **Kubernetes Service**: Implemented `KubernetesService` in `clusterfuzz._internal.k8s.service` to manage job creation.
- **Kata Support**: Added specialized job creation for Kata Containers (`create_kata_container_job`) with required security context (`privileged`, `capabilities: ALL`), networking (`hostNetwork: True`), and environment variables (`HOST_UID`).
- **Dependency Management**: Added `kubernetes` and necessary Google Cloud dependencies (`google-api-python-client`, `google-cloud-storage`, `google-cloud-ndb`, etc.) to `Pipfile`.
- **E2E Testing**:
    - Created `tests.core.k8s.k8s_service_e2e_test` to verify job lifecycle on a local Kind cluster.
    - Updated `local/tests/kubernetes_e2e_test.bash` to provision the test environment.
    - Updated CI workflow (`.github/workflows/kubernetes-e2e-tests.yaml`) to install JDK 21 (required for Datastore emulator).
    - Tests now verify job "Running" status to avoid timeouts with long-running commands.
    - `KubernetesService` skips default credential loading when `K8S_E2E` is set to utilize the test-provided kubeconfig.
- **Unit Tests**: Added comprehensive unit tests in `tests.core.k8s.k8s_service_test` and `tests.core.kubernetes.kubernetes_test`, including mocking of `load_kube_config` and `_load_gke_credentials` to ensure robust testing without external dependencies.
Signed-off-by: Javan Lacerda <javanlacerda@google.com>
Signed-off-by: Javan Lacerda <javanlacerda@google.com>
Signed-off-by: Javan Lacerda <javanlacerda@google.com>
Signed-off-by: Javan Lacerda <javanlacerda@google.com>
Signed-off-by: Javan Lacerda <javanlacerda@google.com>
Signed-off-by: Javan Lacerda <javanlacerda@google.com>
Signed-off-by: Javan Lacerda <javanlacerda@google.com>
Signed-off-by: Javan Lacerda <javanlacerda@google.com>
@javanlacerda javanlacerda force-pushed the pr/dependencies branch 9 times, most recently from 78a7de2 to 0901b6d Compare January 16, 2026 13:00
Signed-off-by: Javan Lacerda <javanlacerda@google.com>
Signed-off-by: Javan Lacerda <javanlacerda@google.com>
Signed-off-by: Javan Lacerda <javanlacerda@google.com>
Comment on lines +18 to +25

pip install pipenv

# Install dependencies.
pipenv --python 3.11
pipenv install

class KubernetesJobClient(RemoteTaskInterface):
"""A remote task execution client for Kubernetes.
This class is a placeholder for a future implementation of a remote task
execution client that uses Kubernetes. It is not yet implemented.
"""
./local/install_deps.bash
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only intended to be used in CI?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes!


# If we get here the task succeeded in running. Acknowledge the message.
self._pubsub_message.ack()
if not self.do_not_ack:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its part of the job limiter for the Kubernetes service, we can probably use this for implement the job limiter for Batch as well, using the new feature they implemented for us. The rationale behind is if the task cannot be scheduled for Kubernetes because it already reached the limit of jobs, the message should not be acked, allowing the other adapter, such as Batch, to process the message.

@@ -0,0 +1,70 @@
# Copyright 2024 Google LLC
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Year

@@ -0,0 +1,14 @@
# Copyright 2025 Google LLC
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

year

logs.info(f'Scheduling {remote_task.command}, {remote_task.job_type}.')
config = configs[(remote_task.command, remote_task.job_type)]
job_specs[config].append(remote_task.input_download_url)
logs.info('Creating batch jobs.')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be k8s?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. fixing

namespace='default',
label_selector='app.kubernetes.io/name=clusterfuzz-kata-job',
field_selector='status.phase=Pending')
logs.info(f"Found {len(pods.items)} pending jobs.")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

single quotes.

service_account_name: str) -> dict:
"""Creates the body of a Kubernetes job."""

job_name = config.job_type.replace('_', '-') + '-' + str(uuid.uuid4()).split(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think at OSS-Fuzz scale there is actually a high chance of a collision in a single day (>50%, https://en.wikipedia.org/wiki/Birthday_problem).
How bad is that? Why can't we use the full uuid?
If there's a length problem maybe we want to truncate the job name?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also it seems like this is duplicated from create_kata_container_job. Let's try to share code.

"""Creates the body of a Kubernetes job."""

job_name = config.job_type.replace('_', '-') + '-' + str(uuid.uuid4()).split(
'-', maxsplit=1)[0]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do maxsplit? It seems unnatural and unnecessary

Comment on lines 189 to 194
logs.error(f"Cluster {CLUSTER_NAME} not found in project {project}.")
print(f"DEBUG: Cluster {CLUSTER_NAME} not found in project {project}.")
return

except Exception as e:
logs.error(f"Failed to list clusters in {project}: {e}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Single quotes.

from clusterfuzz._internal.datastore import ndb_utils
from clusterfuzz._internal.google_cloud_utils import credentials
from clusterfuzz._internal.metrics import logs
from clusterfuzz._internal.remote_task import types
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shadows a python builtin module. Can you rename it to remote_task_types.

separate batch job for each group. This allows tasks with similar
requirements to be processed together, which can improve efficiency.
"""
if feature_flags.FeatureFlags.K8S_PENDING_JOBS_LIMITER.enabled and \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition is very hard to read. Can you break it up?

jobs = []
logs.info('Batching utask_mains.')
for config, input_urls in job_specs.items():
# TODO(javanlacerda): Batch multiple tasks into a single job.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that actually need for kata?


if not remote_tasks:
return {}
#TODO(javanlacerda): Create remote task config
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Space after #

Pipfile Outdated
Jinja2 = "==3.1.2"
oauth2client = "==4.1.3"
requests = "==2.21.0"
PyYAML = "==6.0"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we using a different version here and in src/Pipfile?

Pipfile Outdated
google-cloud-datastore = "==2.16.1"
Jinja2 = "==3.1.2"
oauth2client = "==4.1.3"
requests = "==2.21.0"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we adding such old versions of requests and httplib2?

@@ -0,0 +1,61 @@
# Copyright 2026 Google LLC
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's difference between thsi and the next template?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might have been a good idea to consider knative instead of rebuilding batch.

emptyDir:
medium: Memory
sizeLimit: 1.9Gi
restartPolicy: Never
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI I do restart for non fuzz jobs.

- name: UPDATE_WEB_TESTS
value: 'False'
restartPolicy: Never
volumes:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we mount dev/shm here?

@jonathanmetzman
Copy link
Collaborator

This is cool. I maybe would tried cloud run before kata because 1. It is probably less management? 2. It might be more performant because as far as I know doesn't use nested virt.

@jonathanmetzman
Copy link
Collaborator

Are we using preemptibles btw?

Signed-off-by: Javan Lacerda <javanlacerda@google.com>
Signed-off-by: Javan Lacerda <javanlacerda@google.com>
Signed-off-by: Javan Lacerda <javanlacerda@google.com>

from enum import Enum

from clusterfuzz._internal.batch.service import GcpBatchService
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually import the module, not the class, per: go/pystyle#imports

In this case, as all backends would expose a service module, I guess you could do from clusturefuzz...batch import service as batch_service

from clusterfuzz._internal.k8s.service import KubernetesService


class RemoteTaskAdapters(Enum):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!


@abc.abstractmethod
def create_utask_main_jobs(self, remote_tasks: list[RemoteTask]):
"""Creates a many remote tasks for uworker main tasks."""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "Creates many remote jobs ..."

and returns a representation of the created job.
def __init__(self):
# Avoiding circular import
from clusterfuzz._internal.remote_task import remote_task_adapters
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the circular import avoided here? Is it the case where other module imports both this and the remote_task_adapters?

If that's the case, I think we can expect that other modules won't need to import remote_task_adapter as it should be an abstraction only used by the remote task gate, so maybe we don't need this local import.

raise unittest.SkipTest('K8S_E2E environment variable not set.')

cls.mock_batch_config = mock.Mock()
cls.mock_batch_config.get.return_value = 'test-project'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears this is overwritten on line 114 by cls.mock_batch_config.get.side_effect = get_batch_config.


# If we get here the task succeeded in running. Acknowledge the message.
self._pubsub_message.ack()
if not self.do_not_ack:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO readability would be improved by using ack instead of do_not_ack (go/tott/764).

tasks.TASK_LEASE_SECONDS)


WeightedSubconfig = collections.namedtuple('WeightedSubconfig',
Copy link
Contributor

@decoNR decoNR Jan 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Maybe this can be moved to the top of the file, below BatchWorkloadSpec.

Signed-off-by: Javan Lacerda <javanlacerda@google.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants