Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 34 additions & 6 deletions pathwaysutils/experimental/shared_pathways_service/gke_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""GKE utils for deploying and managing the Pathways proxy."""

import logging
import re
import socket
import subprocess
import time
Expand All @@ -14,20 +15,39 @@
# Python API for kubectl calls.


def _validate_k8s_name(name: str) -> None:
"""Validates that the name is a valid Kubernetes resource name.

Args:
name: The name to validate.

Raises:
ValueError: If the name is invalid.
"""
if not re.match(r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?$", name):
raise ValueError(
f"Invalid Kubernetes resource name: '{name}'. "
"Must consist of lower case alphanumeric characters or '-', and must "
"start and end with an alphanumeric character."
)


def fetch_cluster_credentials(
*, cluster_name: str, project_id: str, location: str
) -> None:
"""Fetches credentials for the GKE cluster."""
_validate_k8s_name(cluster_name)
_logger.info("Fetching credentials for '%s'.", cluster_name)
get_credentials_command = [
"gcloud",
"container",
"clusters",
"get-credentials",
cluster_name,
f"--location={location}",
f"--project={project_id}",
"--dns-endpoint"
"--dns-endpoint",
"--",
cluster_name,
]
try:
subprocess.run(
Expand Down Expand Up @@ -87,6 +107,7 @@ def get_pod_from_job(job_name: str) -> str:
RuntimeError: If the pod is missing or the pod name is not in the expected
format.
"""
_validate_k8s_name(job_name)
get_pod_command = [
"kubectl",
"get",
Expand Down Expand Up @@ -140,12 +161,14 @@ def check_pod_ready(pod_name: str, timeout: int = 30) -> str:
Raises:
RuntimeError: If the pod fails to become ready within the timeout.
"""
_validate_k8s_name(pod_name)
wait_command = [
"kubectl",
"wait",
"--for=condition=Ready",
f"pod/{pod_name}",
f"--timeout={timeout}s",
"--",
f"pod/{pod_name}",
]
try:
subprocess.run(wait_command, check=True, capture_output=True, text=True)
Expand Down Expand Up @@ -245,12 +268,14 @@ def enable_port_forwarding(
server_port,
)

_validate_k8s_name(pod_name)
port_forward_command = [
"kubectl",
"port-forward",
"--address",
"localhost",
pod_name,
"--",
f"pod/{pod_name}",
f"{port_available}:{server_port}",
]
try:
Expand Down Expand Up @@ -311,7 +336,8 @@ def stream_pod_logs(pod_name: str) -> subprocess.Popen[str]:
Raises:
Exception: If the log streaming fails.
"""
command = ["kubectl", "logs", "-f", pod_name]
_validate_k8s_name(pod_name)
command = ["kubectl", "logs", "-f", "--", f"pod/{pod_name}"]
try:
return subprocess.Popen(
command,
Expand All @@ -331,13 +357,15 @@ def delete_gke_job(job_name: str) -> None:
Args:
job_name: The name of the job.
"""
_validate_k8s_name(job_name)
_logger.info("Deleting job: %s", job_name)
delete_job_command = [
"kubectl",
"delete",
"job",
job_name,
"--ignore-not-found",
"--",
job_name,
]
try:
result = subprocess.run(
Expand Down
Loading