Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2960,9 +2960,10 @@ def _create_response_future(self, query, parameters, trace, custom_payload,
"2 or higher (supported in Cassandra 2.0 and higher). Consider "
"setting Cluster.protocol_version to 2 to support this operation.")
statement_keyspace = query.keyspace if ProtocolVersion.uses_keyspace_flag(self._protocol_version) else None
batch_timestamp = query.timestamp if query.timestamp is not None else timestamp
message = BatchMessage(
query.batch_type, query._statements_and_parameters, cl,
serial_cl, timestamp, statement_keyspace)
serial_cl, batch_timestamp, statement_keyspace)
elif isinstance(query, GraphStatement):
# the statement_keyspace is not aplicable to GraphStatement
message = QueryMessage(query.query, cl, serial_cl, fetch_size,
Expand Down
9 changes: 7 additions & 2 deletions cassandra/cqlengine/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import threading

from cassandra.cluster import Cluster, _ConfigMode, _NOT_SET, NoHostAvailable, UserTypeDoesNotExist, ConsistencyLevel
from cassandra.query import SimpleStatement, dict_factory
from cassandra.query import SimpleStatement, BatchStatement, dict_factory

from cassandra.cqlengine import CQLEngineException
from cassandra.cqlengine.statements import BaseCQLStatement
Expand Down Expand Up @@ -340,7 +340,12 @@ def execute(query, params=None, consistency_level=None, timeout=NOT_SET, connect
if not conn.session:
raise CQLEngineException("It is required to setup() cqlengine before executing queries")

if isinstance(query, SimpleStatement):
if isinstance(query, BatchStatement):
log.debug(format_log_context('Executing BatchStatement with {} statements'.format(
len(query._statements_and_parameters)), connection=connection))
result = conn.session.execute(query, timeout=timeout)
return result
elif isinstance(query, SimpleStatement):
pass #
elif isinstance(query, BaseCQLStatement):
params = query.get_context()
Expand Down
60 changes: 33 additions & 27 deletions cassandra/cqlengine/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,36 +226,42 @@ def execute(self):
self._execute_callbacks()
return

batch_type = None if self.batch_type is CBatchType.LOGGED else self.batch_type
opener = 'BEGIN ' + (str(batch_type) + ' ' if batch_type else '') + ' BATCH'
if self.timestamp:

if isinstance(self.timestamp, int):
ts = self.timestamp
elif isinstance(self.timestamp, (datetime, timedelta)):
ts = self.timestamp
if isinstance(self.timestamp, timedelta):
ts += datetime.now() # Apply timedelta
ts = int(time.mktime(ts.timetuple()) * 1e+6 + ts.microsecond)
else:
raise ValueError("Batch expects a long, a timedelta, or a datetime")
# Map cqlengine batch_type to core BatchType
if self.batch_type is None or self.batch_type is CBatchType.LOGGED:
batch_type = CBatchType.LOGGED
elif self.batch_type == 'UNLOGGED' or self.batch_type is CBatchType.UNLOGGED:
batch_type = CBatchType.UNLOGGED
elif self.batch_type == 'COUNTER' or self.batch_type is CBatchType.COUNTER:
batch_type = CBatchType.COUNTER
else:
batch_type = CBatchType.LOGGED

opener += ' USING TIMESTAMP {0}'.format(ts)
# Calculate timestamp in microseconds if set
timestamp = None
if self.timestamp:
if isinstance(self.timestamp, timedelta):
ts = datetime.now() + self.timestamp
timestamp = int(time.mktime(ts.timetuple()) * 1e+6 + ts.microsecond)
elif isinstance(self.timestamp, datetime):
timestamp = int(time.mktime(self.timestamp.timetuple()) * 1e+6 + self.timestamp.microsecond)

# Create BatchStatement
batch = BatchStatement(
batch_type=batch_type,
consistency_level=self._consistency,
timestamp=timestamp
)

query_list = [opener]
parameters = {}
ctx_counter = 0
# Add each query as a SimpleStatement with parameters
for query in self.queries:
query.update_context_id(ctx_counter)
ctx = query.get_context()
ctx_counter += len(ctx)
query_list.append(' ' + str(query))
parameters.update(ctx)

query_list.append('APPLY BATCH;')

tmp = conn.execute('\n'.join(query_list), parameters, self._consistency, self._timeout, connection=self._connection)
check_applied(tmp)
query.update_context_id(0) # Reset context for each query
params = query.get_context()
stmt = SimpleStatement(str(query))
batch.add(stmt, params)

# Execute the batch
result = conn.execute(batch, timeout=self._timeout, connection=self._connection)
check_applied(result)

self.queries = []
self._execute_callbacks()
Expand Down
19 changes: 18 additions & 1 deletion cassandra/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -759,13 +759,22 @@ class BatchStatement(Statement):
supported when using protocol version 3 or higher.
"""

timestamp = None
"""
The optional timestamp for all operations in the batch, in microseconds
since the UNIX epoch. If not set, the client timestamp generator or
server time will be used.

.. versionadded:: 3.29.2
"""

_statements_and_parameters = None
_session = None
_is_lwt = False

def __init__(self, batch_type=BatchType.LOGGED, retry_policy=None,
consistency_level=None, serial_consistency_level=None,
session=None, custom_payload=None):
session=None, custom_payload=None, timestamp=None):
"""
`batch_type` specifies The :class:`.BatchType` for the batch operation.
Defaults to :attr:`.BatchType.LOGGED`.
Expand All @@ -781,6 +790,10 @@ def __init__(self, batch_type=BatchType.LOGGED, retry_policy=None,
updated with any values found in their custom payloads. These are
only allowed when using protocol version 4 or higher.

`timestamp` is an optional timestamp for all operations in the batch,
in microseconds since the UNIX epoch. If set, this will override the
client timestamp generator.

Example usage:

.. code-block:: python
Expand Down Expand Up @@ -809,8 +822,12 @@ def __init__(self, batch_type=BatchType.LOGGED, retry_policy=None,

.. versionchanged:: 2.6.0
Added `custom_payload` as a parameter

.. versionchanged:: 3.29.2
Added `timestamp` as a parameter
"""
self.batch_type = batch_type
self.timestamp = timestamp
self._statements_and_parameters = []
self._session = session
Statement.__init__(self, retry_policy=retry_policy, consistency_level=consistency_level,
Expand Down
Loading