Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 35 additions & 104 deletions pymongo/asynchronous/bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import copy
import datetime
import logging
from collections.abc import MutableMapping
from itertools import islice
from typing import (
Expand All @@ -45,6 +44,11 @@
_raise_bulk_write_error,
_Run,
)
from pymongo.command_helpers import (
_log_command_failed,
_log_command_started,
_log_command_succeeded,
)
from pymongo.common import (
validate_is_document_type,
validate_ok_for_replace,
Expand All @@ -57,7 +61,6 @@
OperationFailure,
)
from pymongo.helpers_shared import _RETRYABLE_ERROR_CODES
from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log
from pymongo.message import (
_DELETE,
_INSERT,
Expand Down Expand Up @@ -252,44 +255,15 @@ async def write_command(
) -> dict[str, Any]:
"""A proxy for SocketInfo.write_command that handles event publishing."""
cmd[bwc.field] = docs
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.STARTED,
clientId=client._topology_settings._topology_id,
command=cmd,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
)
_log_command_started(client, bwc.conn, cmd, bwc.db_name, request_id, request_id)
if bwc.publish:
bwc._start(cmd, request_id, docs)
try:
reply = await bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc]
duration = datetime.datetime.now() - bwc.start_time
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.SUCCEEDED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
reply=reply,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
)
_log_command_succeeded(
client, bwc.conn, cmd, bwc.db_name, request_id, request_id, reply, duration
)
if bwc.publish:
bwc._succeed(request_id, reply, duration) # type: ignore[arg-type]
await client._process_response(reply, bwc.session) # type: ignore[arg-type]
Expand All @@ -299,24 +273,17 @@ async def write_command(
failure: _DocumentOut = exc.details # type: ignore[assignment]
else:
failure = _convert_exception(exc)
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.FAILED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
failure=failure,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
isServerSideError=isinstance(exc, OperationFailure),
)
_log_command_failed(
client,
bwc.conn,
cmd,
bwc.db_name,
request_id,
request_id,
failure,
duration,
isinstance(exc, OperationFailure),
)

if bwc.publish:
bwc._fail(request_id, failure, duration)
Expand All @@ -337,22 +304,7 @@ async def unack_write(
client: AsyncMongoClient[Any],
) -> Optional[Mapping[str, Any]]:
"""A proxy for AsyncConnection.unack_write that handles event publishing."""
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.STARTED,
clientId=client._topology_settings._topology_id,
command=cmd,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
)
_log_command_started(client, bwc.conn, cmd, bwc.db_name, request_id, request_id)
if bwc.publish:
cmd = bwc._start(cmd, request_id, docs)
try:
Expand All @@ -363,23 +315,9 @@ async def unack_write(
else:
# Comply with APM spec.
reply = {"ok": 1}
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.SUCCEEDED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
reply=reply,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
)
_log_command_succeeded(
client, bwc.conn, cmd, bwc.db_name, request_id, request_id, reply, duration
)
if bwc.publish:
bwc._succeed(request_id, reply, duration)
except Exception as exc:
Expand All @@ -390,24 +328,17 @@ async def unack_write(
failure = exc.details # type: ignore[assignment]
else:
failure = _convert_exception(exc)
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.FAILED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
failure=failure,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
isServerSideError=isinstance(exc, OperationFailure),
)
_log_command_failed(
client,
bwc.conn,
cmd,
bwc.db_name,
request_id,
request_id,
failure,
duration,
isinstance(exc, OperationFailure),
)
if bwc.publish:
assert bwc.start_time is not None
bwc._fail(request_id, failure, duration)
Expand Down
139 changes: 35 additions & 104 deletions pymongo/asynchronous/client_bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import copy
import datetime
import logging
from collections.abc import MutableMapping
from itertools import islice
from typing import (
Expand Down Expand Up @@ -48,6 +47,11 @@
_merge_command,
_throw_client_bulk_write_exception,
)
from pymongo.command_helpers import (
_log_command_failed,
_log_command_started,
_log_command_succeeded,
)
from pymongo.common import (
validate_is_document_type,
validate_ok_for_replace,
Expand All @@ -63,7 +67,6 @@
WaitQueueTimeoutError,
)
from pymongo.helpers_shared import _RETRYABLE_ERROR_CODES
from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log
from pymongo.message import (
_ClientBulkWriteContext,
_convert_client_bulk_exception,
Expand Down Expand Up @@ -239,44 +242,15 @@ async def write_command(
"""A proxy for AsyncConnection.write_command that handles event publishing."""
cmd["ops"] = op_docs
cmd["nsInfo"] = ns_docs
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.STARTED,
clientId=client._topology_settings._topology_id,
command=cmd,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
)
_log_command_started(client, bwc.conn, cmd, bwc.db_name, request_id, request_id)
if bwc.publish:
bwc._start(cmd, request_id, op_docs, ns_docs)
try:
reply = await bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc, arg-type]
duration = datetime.datetime.now() - bwc.start_time
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.SUCCEEDED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
reply=reply,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
)
_log_command_succeeded(
client, bwc.conn, cmd, bwc.db_name, request_id, request_id, reply, duration
)
if bwc.publish:
bwc._succeed(request_id, reply, duration) # type: ignore[arg-type]
# Process the response from the server.
Expand All @@ -287,24 +261,17 @@ async def write_command(
failure: _DocumentOut = exc.details # type: ignore[assignment]
else:
failure = _convert_exception(exc)
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.FAILED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
failure=failure,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
isServerSideError=isinstance(exc, OperationFailure),
)
_log_command_failed(
client,
bwc.conn,
cmd,
bwc.db_name,
request_id,
request_id,
failure,
duration,
isinstance(exc, OperationFailure),
)

if bwc.publish:
bwc._fail(request_id, failure, duration)
Expand All @@ -328,22 +295,7 @@ async def unack_write(
client: AsyncMongoClient[Any],
) -> Optional[Mapping[str, Any]]:
"""A proxy for AsyncConnection.unack_write that handles event publishing."""
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.STARTED,
clientId=client._topology_settings._topology_id,
command=cmd,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
)
_log_command_started(client, bwc.conn, cmd, bwc.db_name, request_id, request_id)
if bwc.publish:
cmd = bwc._start(cmd, request_id, op_docs, ns_docs)
try:
Expand All @@ -354,23 +306,9 @@ async def unack_write(
else:
# Comply with APM spec.
reply = {"ok": 1}
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.SUCCEEDED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
reply=reply,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
)
_log_command_succeeded(
client, bwc.conn, cmd, bwc.db_name, request_id, request_id, reply, duration
)
if bwc.publish:
bwc._succeed(request_id, reply, duration)
except Exception as exc:
Expand All @@ -381,24 +319,17 @@ async def unack_write(
failure = exc.details # type: ignore[assignment]
else:
failure = _convert_exception(exc)
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.FAILED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
failure=failure,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
isServerSideError=isinstance(exc, OperationFailure),
)
_log_command_failed(
client,
bwc.conn,
cmd,
bwc.db_name,
request_id,
request_id,
failure,
duration,
isinstance(exc, OperationFailure),
)
if bwc.publish:
assert bwc.start_time is not None
bwc._fail(request_id, failure, duration)
Expand Down
Loading
Loading