Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,8 @@ def _refresh_controller_id(self, timeout_ms=30000):
# use defaults for allow_auto_topic_creation / include_authorized_operations in v6+
request = MetadataRequest[version]()

timeout_at = time.time() + timeout_ms / 1000
while time.time() < timeout_at:
timeout_at = time.monotonic() + timeout_ms / 1000
while time.monotonic() < timeout_at:
response = self.send_request(request)
controller_id = response.controller_id
if controller_id == -1:
Expand Down
4 changes: 2 additions & 2 deletions kafka/benchmarks/consumer_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ def run(args):
print('-> OK!')
print()

start_time = time.time()
start_time = time.monotonic()
records = 0
for msg in consumer:
records += 1
if records >= args.num_records:
break

end_time = time.time()
end_time = time.monotonic()
timer_stop.set()
timer.join()
print('Consumed {0} records'.format(records))
Expand Down
4 changes: 2 additions & 2 deletions kafka/benchmarks/producer_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ def _benchmark():
raise ValueError(r)
print("%d suceeded, %d failed" % (count_success, count_failure))

start_time = time.time()
start_time = time.monotonic()
_benchmark()
end_time = time.time()
end_time = time.monotonic()
timer_stop.set()
timer.join()
print('Execution time:', end_time - start_time, 'secs')
Expand Down
32 changes: 16 additions & 16 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ def _conn_state_change(self, node_id, sock, conn):
self._selector.modify(sock, selectors.EVENT_WRITE, conn)

if self.cluster.is_bootstrap(node_id):
self._last_bootstrap = time.time()
self._last_bootstrap = time.monotonic()

elif conn.state is ConnectionStates.API_VERSIONS_SEND:
try:
Expand Down Expand Up @@ -708,9 +708,9 @@ def _poll(self, timeout):
# Send pending requests first, before polling for responses
self._register_send_sockets()

start_select = time.time()
start_select = time.monotonic()
ready = self._selector.select(timeout)
end_select = time.time()
end_select = time.monotonic()
if self._sensors:
self._sensors.select_time.record((end_select - start_select) * 1000000000)

Expand Down Expand Up @@ -782,7 +782,7 @@ def _poll(self, timeout):
timeout_ms))

if self._sensors:
self._sensors.io_time.record((time.time() - end_select) * 1000000000)
self._sensors.io_time.record((time.monotonic() - end_select) * 1000000000)

self._maybe_close_oldest_connection()

Expand Down Expand Up @@ -1020,9 +1020,9 @@ def check_version(self, node_id=None, timeout=None, **kwargs):
"""
timeout = timeout or (self.config['api_version_auto_timeout_ms'] / 1000)
with self._lock:
end = time.time() + timeout
while time.time() < end:
time_remaining = max(end - time.time(), 0)
end = time.monotonic() + timeout
while time.monotonic() < end:
time_remaining = max(end - time.monotonic(), 0)
if node_id is not None and self.connection_delay(node_id) > 0:
sleep_time = min(time_remaining, self.connection_delay(node_id) / 1000.0)
if sleep_time > 0:
Expand All @@ -1043,8 +1043,8 @@ def check_version(self, node_id=None, timeout=None, **kwargs):
continue
conn = self._conns[try_node]

while conn.connecting() and time.time() < end:
timeout_ms = min((end - time.time()) * 1000, 200)
while conn.connecting() and time.monotonic() < end:
timeout_ms = min((end - time.monotonic()) * 1000, 200)
self.poll(timeout_ms=timeout_ms)

if conn._api_version is not None:
Expand Down Expand Up @@ -1130,7 +1130,7 @@ def _maybe_close_oldest_connection(self):
expired_connection = self._idle_expiry_manager.poll_expired_connection()
if expired_connection:
conn_id, ts = expired_connection
idle_ms = (time.time() - ts) * 1000
idle_ms = (time.monotonic() - ts) * 1000
log.info('Closing idle connection %s, last active %d ms ago', conn_id, idle_ms)
self.close(node_id=conn_id)

Expand Down Expand Up @@ -1185,14 +1185,14 @@ def __init__(self, connections_max_idle_ms):
else:
self.connections_max_idle = float('inf')
self.next_idle_close_check_time = None
self.update_next_idle_close_check_time(time.time())
self.update_next_idle_close_check_time(time.monotonic())
self.lru_connections = collections.OrderedDict()

def update(self, conn_id):
# order should reflect last-update
if conn_id in self.lru_connections:
del self.lru_connections[conn_id]
self.lru_connections[conn_id] = time.time()
self.lru_connections[conn_id] = time.monotonic()

def remove(self, conn_id):
if conn_id in self.lru_connections:
Expand All @@ -1201,10 +1201,10 @@ def remove(self, conn_id):
def is_expired(self, conn_id):
if conn_id not in self.lru_connections:
return None
return time.time() >= self.lru_connections[conn_id] + self.connections_max_idle
return time.monotonic() >= self.lru_connections[conn_id] + self.connections_max_idle

def next_check_ms(self):
now = time.time()
now = time.monotonic()
if not self.lru_connections or self.next_idle_close_check_time == float('inf'):
return float('inf')
elif self.next_idle_close_check_time <= now:
Expand All @@ -1216,7 +1216,7 @@ def update_next_idle_close_check_time(self, ts):
self.next_idle_close_check_time = ts + self.connections_max_idle

def poll_expired_connection(self):
if time.time() < self.next_idle_close_check_time:
if time.monotonic() < self.next_idle_close_check_time:
return None

if not len(self.lru_connections):
Expand All @@ -1228,7 +1228,7 @@ def poll_expired_connection(self):

self.update_next_idle_close_check_time(oldest_ts)

if time.time() >= oldest_ts + self.connections_max_idle:
if time.monotonic() >= oldest_ts + self.connections_max_idle:
return (oldest_conn_id, oldest_ts)
else:
return None
Expand Down
6 changes: 3 additions & 3 deletions kafka/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def coordinator_for_group(self, group):

def ttl(self):
"""Milliseconds until metadata should be refreshed"""
now = time.time() * 1000
now = time.monotonic() * 1000
if self._need_update:
ttl = 0
else:
Expand Down Expand Up @@ -231,7 +231,7 @@ def failed_update(self, exception):
self._future = None
if f:
f.failure(exception)
self._last_refresh_ms = time.time() * 1000
self._last_refresh_ms = time.monotonic() * 1000

def update_metadata(self, metadata):
"""Update cluster state given a MetadataResponse.
Expand Down Expand Up @@ -335,7 +335,7 @@ def update_metadata(self, metadata):
self._future = None
self._need_update = False

now = time.time() * 1000
now = time.monotonic() * 1000
self._last_refresh_ms = now
self._last_successful_refresh_ms = now

Expand Down
28 changes: 14 additions & 14 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ def _next_afi_sockaddr(self):
def connect_blocking(self, timeout=float('inf')):
if self.connected():
return True
timeout += time.time()
timeout += time.monotonic()
# First attempt to perform dns lookup
# note that the underlying interface, socket.getaddrinfo,
# has no explicit timeout so we may exceed the user-specified timeout
Expand All @@ -335,7 +335,7 @@ def connect_blocking(self, timeout=float('inf')):
# Loop once over all returned dns entries
selector = None
while self._gai:
while time.time() < timeout:
while time.monotonic() < timeout:
self.connect()
if self.connected():
if selector is not None:
Expand All @@ -359,7 +359,7 @@ def connect(self):
"""Attempt to connect and return ConnectionState"""
if self.state is ConnectionStates.DISCONNECTED and not self.blacked_out():
self.state = ConnectionStates.CONNECTING
self.last_attempt = time.time()
self.last_attempt = time.monotonic()
next_lookup = self._next_afi_sockaddr()
if not next_lookup:
self.close(Errors.KafkaConnectionError('DNS failure'))
Expand Down Expand Up @@ -464,7 +464,7 @@ def connect(self):
ConnectionStates.DISCONNECTED):
# Connection timed out
request_timeout = self.config['request_timeout_ms'] / 1000.0
if time.time() > request_timeout + self.last_attempt:
if time.monotonic() > request_timeout + self.last_attempt:
log.error('%s: Connection attempt timed out', self)
self.close(Errors.KafkaConnectionError('timeout'))
return self.state
Expand Down Expand Up @@ -776,7 +776,7 @@ def _recv_sasl_authenticate(self):
if version == 1:
((correlation_id, response),) = self._protocol.receive_bytes(data)
(future, timestamp, _timeout) = self.in_flight_requests.pop(correlation_id)
latency_ms = (time.time() - timestamp) * 1000
latency_ms = (time.monotonic() - timestamp) * 1000
if self._sensors:
self._sensors.request_time.record(latency_ms)
log.debug('%s: Response %d (%s ms): %s', self, correlation_id, latency_ms, response)
Expand Down Expand Up @@ -834,7 +834,7 @@ def throttle_delay(self):
Return the number of milliseconds to wait until connection is no longer throttled.
"""
if self._throttle_time is not None:
remaining_ms = (self._throttle_time - time.time()) * 1000
remaining_ms = (self._throttle_time - time.monotonic()) * 1000
if remaining_ms > 0:
return remaining_ms
else:
Expand All @@ -855,7 +855,7 @@ def connection_delay(self):
elif self.config["socks5_proxy"] and Socks5Wrapper.use_remote_lookup(self.config["socks5_proxy"]):
return 0
else:
time_waited = time.time() - self.last_attempt
time_waited = time.monotonic() - self.last_attempt
return max(self._reconnect_backoff - time_waited, 0) * 1000
else:
# When connecting or connected, we should be able to delay
Expand Down Expand Up @@ -1006,7 +1006,7 @@ def _send(self, request, blocking=True, request_timeout_ms=None):
log.debug('%s: Request %d (timeout_ms %s): %s', self, correlation_id, request_timeout_ms, request)
if request.expect_response():
assert correlation_id not in self.in_flight_requests, 'Correlation ID already in-flight!'
sent_time = time.time()
sent_time = time.monotonic()
timeout_at = sent_time + (request_timeout_ms / 1000)
self.in_flight_requests[correlation_id] = (future, sent_time, timeout_at)
else:
Expand Down Expand Up @@ -1093,7 +1093,7 @@ def _maybe_throttle(self, response):
# Client side throttling enabled in v2.0 brokers
# prior to that throttling (if present) was managed broker-side
if self.config['api_version'] is not None and self.config['api_version'] >= (2, 0):
throttle_time = time.time() + throttle_time_ms / 1000
throttle_time = time.monotonic() + throttle_time_ms / 1000
self._throttle_time = max(throttle_time, self._throttle_time or 0)
log.warning("%s: %s throttled by broker (%d ms)", self,
response.__class__.__name__, throttle_time_ms)
Expand Down Expand Up @@ -1129,7 +1129,7 @@ def recv(self):
except KeyError:
self.close(Errors.KafkaConnectionError('Received unrecognized correlation id'))
return ()
latency_ms = (time.time() - timestamp) * 1000
latency_ms = (time.monotonic() - timestamp) * 1000
if self._sensors:
self._sensors.request_time.record(latency_ms)

Expand Down Expand Up @@ -1193,7 +1193,7 @@ def requests_timed_out(self):
return self.next_ifr_request_timeout_ms() == 0

def timed_out_ifrs(self):
now = time.time()
now = time.monotonic()
ifrs = sorted(self.in_flight_requests.values(), reverse=True, key=lambda ifr: ifr[2])
return list(filter(lambda ifr: ifr[2] <= now, ifrs))

Expand All @@ -1204,7 +1204,7 @@ def get_timeout(v):
return v[2]
next_timeout = min(map(get_timeout,
self.in_flight_requests.values()))
return max(0, (next_timeout - time.time()) * 1000)
return max(0, (next_timeout - time.monotonic()) * 1000)
else:
return float('inf')

Expand Down Expand Up @@ -1275,8 +1275,8 @@ def check_version(self, timeout=2, **kwargs):

Raises: NodeNotReadyError on timeout
"""
timeout_at = time.time() + timeout
if not self.connect_blocking(timeout_at - time.time()):
timeout_at = time.monotonic() + timeout
if not self.connect_blocking(timeout_at - time.monotonic()):
raise Errors.NodeNotReadyError()
else:
return self._api_version
Expand Down
10 changes: 5 additions & 5 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def send_fetches(self):
log.debug("Sending FetchRequest to node %s", node_id)
self._nodes_with_pending_fetch_requests.add(node_id)
future = self._client.send(node_id, request, wakeup=False)
future.add_callback(self._handle_fetch_response, node_id, fetch_offsets, time.time())
future.add_callback(self._handle_fetch_response, node_id, fetch_offsets, time.monotonic())
future.add_errback(self._handle_fetch_error, node_id)
future.add_both(self._clear_pending_fetch_request, node_id)
futures.append(future)
Expand Down Expand Up @@ -421,21 +421,21 @@ def _reset_offsets_async(self, timestamps):
if not self._client.ready(node_id):
continue
partitions = set(timestamps_and_epochs.keys())
expire_at = time.time() + self.config['request_timeout_ms'] / 1000
expire_at = time.monotonic() + self.config['request_timeout_ms'] / 1000
self._subscriptions.set_reset_pending(partitions, expire_at)

def on_success(timestamps_and_epochs, result):
fetched_offsets, partitions_to_retry = result
if partitions_to_retry:
self._subscriptions.reset_failed(partitions_to_retry, time.time() + self.config['retry_backoff_ms'] / 1000)
self._subscriptions.reset_failed(partitions_to_retry, time.monotonic() + self.config['retry_backoff_ms'] / 1000)
self._client.cluster.request_update()

for partition, offset in fetched_offsets.items():
ts, _epoch = timestamps_and_epochs[partition]
self._reset_offset_if_needed(partition, ts, offset.offset)

def on_failure(partitions, error):
self._subscriptions.reset_failed(partitions, time.time() + self.config['retry_backoff_ms'] / 1000)
self._subscriptions.reset_failed(partitions, time.monotonic() + self.config['retry_backoff_ms'] / 1000)
self._client.cluster.request_update()

if not getattr(error, 'retriable', False):
Expand Down Expand Up @@ -778,7 +778,7 @@ def _handle_fetch_response(self, node_id, fetch_offsets, send_time, response):
self._completed_fetches.append(completed_fetch)

if self._sensors:
self._sensors.fetch_latency.record((time.time() - send_time) * 1000)
self._sensors.fetch_latency.record((time.monotonic() - send_time) * 1000)

def _handle_fetch_error(self, node_id, exception):
level = logging.INFO if isinstance(exception, Errors.Cancelled) else logging.ERROR
Expand Down
6 changes: 3 additions & 3 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -1176,7 +1176,7 @@ def _update_fetch_positions(self, timeout_ms=None):
return not self._fetcher.reset_offsets_if_needed()

def _message_generator_v2(self):
timeout_ms = 1000 * max(0, self._consumer_timeout - time.time())
timeout_ms = 1000 * max(0, self._consumer_timeout - time.monotonic())
record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
for tp, records in record_map.items():
# Generators are stateful, and it is possible that the tp / records
Expand All @@ -1201,7 +1201,7 @@ def __next__(self):
if self._closed:
raise StopIteration('KafkaConsumer closed')
self._set_consumer_timeout()
while time.time() < self._consumer_timeout:
while time.monotonic() < self._consumer_timeout:
if not self._iterator:
self._iterator = self._message_generator_v2()
try:
Expand All @@ -1213,5 +1213,5 @@ def __next__(self):
def _set_consumer_timeout(self):
# consumer_timeout_ms can be used to stop iteration early
if self.config['consumer_timeout_ms'] >= 0:
self._consumer_timeout = time.time() + (
self._consumer_timeout = time.monotonic() + (
self.config['consumer_timeout_ms'] / 1000.0)
2 changes: 1 addition & 1 deletion kafka/consumer/subscription_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ def reset(self, strategy):
self.next_allowed_retry_time = None

def is_reset_allowed(self):
return self.next_allowed_retry_time is None or self.next_allowed_retry_time < time.time()
return self.next_allowed_retry_time is None or self.next_allowed_retry_time < time.monotonic()

@property
def awaiting_reset(self):
Expand Down
Loading