-
Notifications
You must be signed in to change notification settings - Fork 596
Kubernetes Job service #5113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Kubernetes Job service #5113
Conversation
7edfb48 to
b271b83
Compare
jonathanmetzman
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left some very surface level comments.
d5684e5 to
44737d3
Compare
This commit introduces the Kubernetes job client and service, providing a mechanism to schedule tasks on Kubernetes clusters (including GKE and Kind), supporting both standard and Kata Containers.
Key Features & Changes:
- **Kubernetes Service**: Implemented `KubernetesService` in `clusterfuzz._internal.k8s.service` to manage job creation.
- **Kata Support**: Added specialized job creation for Kata Containers (`create_kata_container_job`) with required security context (`privileged`, `capabilities: ALL`), networking (`hostNetwork: True`), and environment variables (`HOST_UID`).
- **Dependency Management**: Added `kubernetes` and necessary Google Cloud dependencies (`google-api-python-client`, `google-cloud-storage`, `google-cloud-ndb`, etc.) to `Pipfile`.
- **E2E Testing**:
- Created `tests.core.k8s.k8s_service_e2e_test` to verify job lifecycle on a local Kind cluster.
- Updated `local/tests/kubernetes_e2e_test.bash` to provision the test environment.
- Updated CI workflow (`.github/workflows/kubernetes-e2e-tests.yaml`) to install JDK 21 (required for Datastore emulator).
- Tests now verify job "Running" status to avoid timeouts with long-running commands.
- `KubernetesService` skips default credential loading when `K8S_E2E` is set to utilize the test-provided kubeconfig.
- **Unit Tests**: Added comprehensive unit tests in `tests.core.k8s.k8s_service_test` and `tests.core.kubernetes.kubernetes_test`, including mocking of `load_kube_config` and `_load_gke_credentials` to ensure robust testing without external dependencies.
Signed-off-by: Javan Lacerda <javanlacerda@google.com>
Signed-off-by: Javan Lacerda <javanlacerda@google.com>
Signed-off-by: Javan Lacerda <javanlacerda@google.com>
Signed-off-by: Javan Lacerda <javanlacerda@google.com>
Signed-off-by: Javan Lacerda <javanlacerda@google.com>
78a7de2 to
0901b6d
Compare
Signed-off-by: Javan Lacerda <javanlacerda@google.com>
0901b6d to
3692699
Compare
b771b50 to
ae2e936
Compare
Signed-off-by: Javan Lacerda <javanlacerda@google.com>
|
|
||
| pip install pipenv | ||
|
|
||
| # Install dependencies. | ||
| pipenv --python 3.11 | ||
| pipenv install | ||
|
|
||
| class KubernetesJobClient(RemoteTaskInterface): | ||
| """A remote task execution client for Kubernetes. | ||
| This class is a placeholder for a future implementation of a remote task | ||
| execution client that uses Kubernetes. It is not yet implemented. | ||
| """ | ||
| ./local/install_deps.bash |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only intended to be used in CI?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes!
|
|
||
| # If we get here the task succeeded in running. Acknowledge the message. | ||
| self._pubsub_message.ack() | ||
| if not self.do_not_ack: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its part of the job limiter for the Kubernetes service, we can probably use this for implement the job limiter for Batch as well, using the new feature they implemented for us. The rationale behind is if the task cannot be scheduled for Kubernetes because it already reached the limit of jobs, the message should not be acked, allowing the other adapter, such as Batch, to process the message.
| @@ -0,0 +1,70 @@ | |||
| # Copyright 2024 Google LLC | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Year
| @@ -0,0 +1,14 @@ | |||
| # Copyright 2025 Google LLC | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
year
| logs.info(f'Scheduling {remote_task.command}, {remote_task.job_type}.') | ||
| config = configs[(remote_task.command, remote_task.job_type)] | ||
| job_specs[config].append(remote_task.input_download_url) | ||
| logs.info('Creating batch jobs.') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be k8s?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. fixing
| namespace='default', | ||
| label_selector='app.kubernetes.io/name=clusterfuzz-kata-job', | ||
| field_selector='status.phase=Pending') | ||
| logs.info(f"Found {len(pods.items)} pending jobs.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
single quotes.
| service_account_name: str) -> dict: | ||
| """Creates the body of a Kubernetes job.""" | ||
|
|
||
| job_name = config.job_type.replace('_', '-') + '-' + str(uuid.uuid4()).split( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think at OSS-Fuzz scale there is actually a high chance of a collision in a single day (>50%, https://en.wikipedia.org/wiki/Birthday_problem).
How bad is that? Why can't we use the full uuid?
If there's a length problem maybe we want to truncate the job name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also it seems like this is duplicated from create_kata_container_job. Let's try to share code.
| """Creates the body of a Kubernetes job.""" | ||
|
|
||
| job_name = config.job_type.replace('_', '-') + '-' + str(uuid.uuid4()).split( | ||
| '-', maxsplit=1)[0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do maxsplit? It seems unnatural and unnecessary
| logs.error(f"Cluster {CLUSTER_NAME} not found in project {project}.") | ||
| print(f"DEBUG: Cluster {CLUSTER_NAME} not found in project {project}.") | ||
| return | ||
|
|
||
| except Exception as e: | ||
| logs.error(f"Failed to list clusters in {project}: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Single quotes.
| from clusterfuzz._internal.datastore import ndb_utils | ||
| from clusterfuzz._internal.google_cloud_utils import credentials | ||
| from clusterfuzz._internal.metrics import logs | ||
| from clusterfuzz._internal.remote_task import types |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shadows a python builtin module. Can you rename it to remote_task_types.
| separate batch job for each group. This allows tasks with similar | ||
| requirements to be processed together, which can improve efficiency. | ||
| """ | ||
| if feature_flags.FeatureFlags.K8S_PENDING_JOBS_LIMITER.enabled and \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This condition is very hard to read. Can you break it up?
| jobs = [] | ||
| logs.info('Batching utask_mains.') | ||
| for config, input_urls in job_specs.items(): | ||
| # TODO(javanlacerda): Batch multiple tasks into a single job. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that actually need for kata?
|
|
||
| if not remote_tasks: | ||
| return {} | ||
| #TODO(javanlacerda): Create remote task config |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Space after #
Pipfile
Outdated
| Jinja2 = "==3.1.2" | ||
| oauth2client = "==4.1.3" | ||
| requests = "==2.21.0" | ||
| PyYAML = "==6.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we using a different version here and in src/Pipfile?
Pipfile
Outdated
| google-cloud-datastore = "==2.16.1" | ||
| Jinja2 = "==3.1.2" | ||
| oauth2client = "==4.1.3" | ||
| requests = "==2.21.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we adding such old versions of requests and httplib2?
| @@ -0,0 +1,61 @@ | |||
| # Copyright 2026 Google LLC | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's difference between thsi and the next template?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might have been a good idea to consider knative instead of rebuilding batch.
| emptyDir: | ||
| medium: Memory | ||
| sizeLimit: 1.9Gi | ||
| restartPolicy: Never |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI I do restart for non fuzz jobs.
| - name: UPDATE_WEB_TESTS | ||
| value: 'False' | ||
| restartPolicy: Never | ||
| volumes: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we mount dev/shm here?
|
This is cool. I maybe would tried cloud run before kata because 1. It is probably less management? 2. It might be more performant because as far as I know doesn't use nested virt. |
|
Are we using preemptibles btw? |
Signed-off-by: Javan Lacerda <javanlacerda@google.com>
eb601c2 to
092e27f
Compare
8adc287 to
9b3a503
Compare
Signed-off-by: Javan Lacerda <javanlacerda@google.com>
6040f7d to
941c7d1
Compare
|
|
||
| from enum import Enum | ||
|
|
||
| from clusterfuzz._internal.batch.service import GcpBatchService |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We usually import the module, not the class, per: go/pystyle#imports
In this case, as all backends would expose a service module, I guess you could do from clusturefuzz...batch import service as batch_service
| from clusterfuzz._internal.k8s.service import KubernetesService | ||
|
|
||
|
|
||
| class RemoteTaskAdapters(Enum): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
|
|
||
| @abc.abstractmethod | ||
| def create_utask_main_jobs(self, remote_tasks: list[RemoteTask]): | ||
| """Creates a many remote tasks for uworker main tasks.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "Creates many remote jobs ..."
| and returns a representation of the created job. | ||
| def __init__(self): | ||
| # Avoiding circular import | ||
| from clusterfuzz._internal.remote_task import remote_task_adapters |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the circular import avoided here? Is it the case where other module imports both this and the remote_task_adapters?
If that's the case, I think we can expect that other modules won't need to import remote_task_adapter as it should be an abstraction only used by the remote task gate, so maybe we don't need this local import.
| raise unittest.SkipTest('K8S_E2E environment variable not set.') | ||
|
|
||
| cls.mock_batch_config = mock.Mock() | ||
| cls.mock_batch_config.get.return_value = 'test-project' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It appears this is overwritten on line 114 by cls.mock_batch_config.get.side_effect = get_batch_config.
|
|
||
| # If we get here the task succeeded in running. Acknowledge the message. | ||
| self._pubsub_message.ack() | ||
| if not self.do_not_ack: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO readability would be improved by using ack instead of do_not_ack (go/tott/764).
| tasks.TASK_LEASE_SECONDS) | ||
|
|
||
|
|
||
| WeightedSubconfig = collections.namedtuple('WeightedSubconfig', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Maybe this can be moved to the top of the file, below BatchWorkloadSpec.
Signed-off-by: Javan Lacerda <javanlacerda@google.com>
This PR introduces full support for scheduling and managing fuzzing tasks on Kubernetes clusters,
specifically targeting GKE. It implements a new KubernetesService to
handle batch job creation, supports Kata Containers for isolation, and includes robust testing
and configuration mechanisms.
Key Features:
Jobs. It supports both standard and Kata Container runtimes, automatic Service Account
creation with Workload Identity, and intelligent job limiting to prevent cluster overload.
routes tasks between the legacy GCP Batch service and the new Kubernetes service based on
configurable probabilities, allowing for a gradual, controlled migration.
behaviors like job concurrency limits.
Detailed Changes by Module:
Kubernetes Integration (
src/clusterfuzz/_internal/k8s/):monitoring, limiting). Includes GKE credential loading, Kata Container spec generation,
and Service Account provisioning.
k8s_service_e2e_test.py (integration on Kind).
Remote Task Management (
src/clusterfuzz/_internal/remote_task/):RemoteTaskInterface. It initializes both GcpBatchService and KubernetesService and
distributes tasks between them based on probabilities defined in job_frequency.py. This
enables traffic splitting (e.g., 10% to K8s, 90% to Batch) for safe rollout.
abstractions.
Datastore & Configuration (
src/clusterfuzz/_internal/datastore/):K8S_PENDING_JOBS_LIMITER).
Batch & Legacy Refactoring (
src/clusterfuzz/_internal/batch/):structure.
Infrastructure & CI:
cluster.
Bot & Metrics:
gate.
Evidences: