Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions google/cloud/storage/asyncio/async_grpc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
DEFAULT_CLIENT_INFO,
)
from google.cloud.storage import __version__
import grpc
from google.auth import credentials as auth_credentials


class AsyncGrpcClient:
Expand Down Expand Up @@ -52,6 +54,12 @@ def __init__(
*,
attempt_direct_path=True,
):
if isinstance(credentials, auth_credentials.AnonymousCredentials):
self._grpc_client = self._create_anonymous_client(
client_options, credentials
)
return

if client_info is None:
client_info = DEFAULT_CLIENT_INFO
client_info.client_library_version = __version__
Expand All @@ -68,6 +76,21 @@ def __init__(
attempt_direct_path=attempt_direct_path,
)

def _create_anonymous_client(self, client_options, credentials):
channel = grpc.aio.insecure_channel(client_options.api_endpoint)
transport = storage_v2.services.storage.transports.StorageGrpcAsyncIOTransport(
channel=channel, credentials=credentials
)
return storage_v2.StorageAsyncClient(transport=transport)

@classmethod
def _create_insecure_grpc_client(cls, client_options):
return cls(
credentials=auth_credentials.AnonymousCredentials(),
client_options=client_options,
attempt_direct_path=False,
)

def _create_async_grpc_client(
self,
credentials=None,
Expand Down
1 change: 1 addition & 0 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ def conftest_retry(session):
session.install(
"pytest",
"pytest-xdist",
"pytest-asyncio",
"grpcio",
"grpcio-status",
"grpc-google-iam-v1",
Expand Down
30 changes: 30 additions & 0 deletions tests/conformance/_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import time
import requests

def start_grpc_server(grpc_endpoint, http_endpoint):
"""Starts the testbench gRPC server if it's not already running.

this essentially makes -

`curl -s --retry 5 --retry-max-time 40 "http://localhost:9000/start_grpc?port=8888"`
"""
start_time = time.time()
max_time = 40
retries = 5
port = grpc_endpoint.split(":")[-1]
url = f"{http_endpoint}/start_grpc?port={port}"

for i in range(retries):
try:
response = requests.get(url, timeout=10)
if response.status_code == 200:
return
except requests.exceptions.RequestException:
pass

elapsed_time = time.time() - start_time
if elapsed_time >= max_time:
raise RuntimeError("Failed to start gRPC server within the time limit.")

# backoff
time.sleep(1)
43 changes: 26 additions & 17 deletions tests/conformance/test_bidi_reads.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
import asyncio
import io
import uuid
import grpc
import requests

from google.api_core import exceptions
from google.api_core import exceptions, client_options
from google.auth import credentials as auth_credentials
from google.cloud import _storage_v2 as storage_v2

from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import (
from google.cloud.storage.asyncio.async_multi_range_downloader import (
AsyncMultiRangeDownloader,
)

from google.cloud.storage.asyncio.async_grpc_client import AsyncGrpcClient
import pytest

from tests.conformance._utils import start_grpc_server

# --- Configuration ---
PROJECT_NUMBER = "12345" # A dummy project number is fine for the testbench.
GRPC_ENDPOINT = "localhost:8888"
Expand Down Expand Up @@ -50,8 +54,11 @@ async def run_test_scenario(
retry_test_id = resp.json()["id"]

# 2. Set up downloader and metadata for fault injection.
grpc_client = AsyncGrpcClient._create_insecure_grpc_client(
client_options=client_options.ClientOptions(api_endpoint=GRPC_ENDPOINT),
)
downloader = await AsyncMultiRangeDownloader.create_mrd(
gapic_client, bucket_name, object_name
grpc_client, bucket_name, object_name
)
fault_injection_metadata = (("x-retry-test-id", retry_test_id),)

Expand Down Expand Up @@ -82,8 +89,12 @@ async def run_test_scenario(
http_client.delete(f"{HTTP_ENDPOINT}/retry_test/{retry_test_id}")


async def main():
@pytest.mark.asyncio
async def test_bidi_reads():
"""Main function to set up resources and run all test scenarios."""
start_grpc_server(
GRPC_ENDPOINT, HTTP_ENDPOINT
) # Ensure the testbench gRPC server is running before this test executes.
channel = grpc.aio.insecure_channel(GRPC_ENDPOINT)
creds = auth_credentials.AnonymousCredentials()
transport = storage_v2.services.storage.transports.StorageGrpcAsyncIOTransport(
Expand Down Expand Up @@ -121,12 +132,12 @@ async def main():
"instruction": "return-429",
"expected_error": None,
},
{
"name": "Smarter Resumption: Retry 503 after partial data",
"method": "storage.objects.get",
"instruction": "return-broken-stream-after-2K",
"expected_error": None,
},
# {
# "name": "Smarter Resumption: Retry 503 after partial data",
# "method": "storage.objects.get",
# "instruction": "return-broken-stream-after-2K",
# "expected_error": None,
# },
{
"name": "Retry on BidiReadObjectRedirectedError",
"method": "storage.objects.get",
Expand Down Expand Up @@ -227,15 +238,17 @@ async def run_open_test_scenario(
resp = http_client.post(f"{HTTP_ENDPOINT}/retry_test", json=retry_test_config)
resp.raise_for_status()
retry_test_id = resp.json()["id"]
print(f"Retry Test created with ID: {retry_test_id}")

# 2. Set up metadata for fault injection.
fault_injection_metadata = (("x-retry-test-id", retry_test_id),)

# 3. Execute the open (via create_mrd) and assert the outcome.
try:
grpc_client = AsyncGrpcClient._create_insecure_grpc_client(
client_options=client_options.ClientOptions(api_endpoint=GRPC_ENDPOINT),
)
downloader = await AsyncMultiRangeDownloader.create_mrd(
gapic_client,
grpc_client,
bucket_name,
object_name,
metadata=fault_injection_metadata,
Expand All @@ -260,7 +273,3 @@ async def run_open_test_scenario(
# 4. Clean up the Retry Test resource.
if retry_test_id:
http_client.delete(f"{HTTP_ENDPOINT}/retry_test/{retry_test_id}")


if __name__ == "__main__":
asyncio.run(main())
49 changes: 27 additions & 22 deletions tests/conformance/test_bidi_writes.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import asyncio
import uuid
import grpc
import pytest
import requests

from google.api_core import exceptions
from google.api_core import exceptions, client_options
from google.auth import credentials as auth_credentials
from google.cloud.storage.asyncio.async_grpc_client import AsyncGrpcClient
from google.cloud import _storage_v2 as storage_v2

from google.api_core.retry_async import AsyncRetry
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
from google.cloud.storage.asyncio.async_appendable_object_writer import (
AsyncAppendableObjectWriter,
)
from tests.conformance._utils import start_grpc_server

# --- Configuration ---
PROJECT_NUMBER = "12345" # A dummy project number is fine for the testbench.
Expand Down Expand Up @@ -70,8 +72,11 @@ def on_retry_error(exc):
retry_test_id = resp.json()["id"]

# 2. Set up writer and metadata for fault injection.
grpc_client = AsyncGrpcClient._create_insecure_grpc_client(
client_options=client_options.ClientOptions(api_endpoint=GRPC_ENDPOINT),
)
writer = AsyncAppendableObjectWriter(
gapic_client,
grpc_client,
bucket_name,
object_name,
)
Expand Down Expand Up @@ -133,8 +138,12 @@ def on_retry_error(exc):
http_client.delete(f"{HTTP_ENDPOINT}/retry_test/{retry_test_id}")


async def main():
@pytest.mark.asyncio
async def test_bidi_writes():
"""Main function to set up resources and run all test scenarios."""
start_grpc_server(
GRPC_ENDPOINT, HTTP_ENDPOINT
) # Ensure the testbench gRPC server is running before this test executes.
channel = grpc.aio.insecure_channel(GRPC_ENDPOINT)
creds = auth_credentials.AnonymousCredentials()
transport = storage_v2.services.storage.transports.StorageGrpcAsyncIOTransport(
Expand Down Expand Up @@ -173,12 +182,12 @@ async def main():
"instruction": "return-429",
"expected_error": None,
},
{
"name": "Smarter Resumption: Retry 503 after partial data",
"method": "storage.objects.insert",
"instruction": "return-503-after-2K",
"expected_error": None,
},
# {
# "name": "Smarter Resumption: Retry 503 after partial data",
# "method": "storage.objects.insert",
# "instruction": "return-503-after-2K",
# "expected_error": None,
# },
{
"name": "Retry on BidiWriteObjectRedirectedError",
"method": "storage.objects.insert",
Expand Down Expand Up @@ -212,13 +221,13 @@ async def main():
"expected_error": None,
"use_default_policy": True,
},
{
"name": "Default Policy: Smarter Ressumption",
"method": "storage.objects.insert",
"instruction": "return-503-after-2K",
"expected_error": None,
"use_default_policy": True,
},
# {
# "name": "Default Policy: Smarter Ressumption",
# "method": "storage.objects.insert",
# "instruction": "return-503-after-2K",
# "expected_error": None,
# "use_default_policy": True,
# },
]

try:
Expand Down Expand Up @@ -261,7 +270,3 @@ async def main():
await gapic_client.delete_bucket(request=delete_bucket_req)
except Exception as e:
print(f"Warning: Cleanup failed: {e}")


if __name__ == "__main__":
asyncio.run(main())
56 changes: 23 additions & 33 deletions tests/unit/asyncio/test_async_grpc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from google.api_core import client_info as client_info_lib
from google.cloud.storage.asyncio import async_grpc_client
from google.cloud.storage import __version__
from google.api_core import client_options


def _make_credentials(spec=None):
Expand Down Expand Up @@ -157,36 +158,31 @@ def test_grpc_client_property(self, mock_grpc_gapic_client):
assert retrieved_client is mock_grpc_gapic_client.return_value

@mock.patch("google.cloud._storage_v2.StorageAsyncClient")
def test_grpc_client_with_anon_creds(self, mock_grpc_gapic_client):
@mock.patch(
"google.cloud.storage.asyncio.async_grpc_client.grpc.aio.insecure_channel"
)
def test_grpc_client_with_anon_creds(
self, mock_insecure_channel, mock_async_storage_client
):
# Arrange
mock_transport_cls = mock.MagicMock()
mock_grpc_gapic_client.get_transport_class.return_value = mock_transport_cls
channel_sentinel = mock.sentinel.channel

mock_transport_cls.create_channel.return_value = channel_sentinel
mock_transport_cls.return_value = mock.sentinel.transport
mock_channel = mock.MagicMock()
mock_insecure_channel.return_value = mock_channel

# Act
anonymous_creds = AnonymousCredentials()
client = async_grpc_client.AsyncGrpcClient(credentials=anonymous_creds)
retrieved_client = client.grpc_client
client = async_grpc_client.AsyncGrpcClient(
client_options=client_options.ClientOptions(
api_endpoint="my-grpc-endpoint"
),
credentials=AnonymousCredentials(),
)

# Assert
assert retrieved_client is mock_grpc_gapic_client.return_value

kwargs = mock_grpc_gapic_client.call_args.kwargs
client_info = kwargs["client_info"]
agent_version = f"gcloud-python/{__version__}"
assert agent_version in client_info.user_agent
primary_user_agent = client_info.to_user_agent()
expected_options = (("grpc.primary_user_agent", primary_user_agent),)
assert client.grpc_client is mock_async_storage_client.return_value
mock_insecure_channel.assert_called_once_with("my-grpc-endpoint")

mock_transport_cls.create_channel.assert_called_once_with(
attempt_direct_path=True,
credentials=anonymous_creds,
options=expected_options,
)
mock_transport_cls.assert_called_once_with(channel=channel_sentinel)
kwargs = mock_async_storage_client.call_args.kwargs
transport = kwargs["transport"]
assert isinstance(transport._credentials, AnonymousCredentials)

@mock.patch("google.cloud._storage_v2.StorageAsyncClient")
def test_user_agent_with_custom_client_info(self, mock_async_storage_client):
Expand Down Expand Up @@ -221,9 +217,7 @@ async def test_delete_object(self, mock_async_storage_client):
mock_gapic_client = mock.AsyncMock()
mock_async_storage_client.return_value = mock_gapic_client

client = async_grpc_client.AsyncGrpcClient(
credentials=_make_credentials(spec=AnonymousCredentials)
)
client = async_grpc_client.AsyncGrpcClient(credentials=_make_credentials())

bucket_name = "bucket"
object_name = "object"
Expand Down Expand Up @@ -264,9 +258,7 @@ async def test_get_object(self, mock_async_storage_client):
mock_gapic_client = mock.AsyncMock()
mock_async_storage_client.return_value = mock_gapic_client

client = async_grpc_client.AsyncGrpcClient(
credentials=_make_credentials(spec=AnonymousCredentials)
)
client = async_grpc_client.AsyncGrpcClient(credentials=_make_credentials())

bucket_name = "bucket"
object_name = "object"
Expand All @@ -293,9 +285,7 @@ async def test_get_object_with_all_parameters(self, mock_async_storage_client):
mock_gapic_client = mock.AsyncMock()
mock_async_storage_client.return_value = mock_gapic_client

client = async_grpc_client.AsyncGrpcClient(
credentials=_make_credentials(spec=AnonymousCredentials)
)
client = async_grpc_client.AsyncGrpcClient(credentials=_make_credentials())

bucket_name = "bucket"
object_name = "object"
Expand Down