Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 56 additions & 7 deletions etcd3/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import functools
import inspect
import threading
import time

import grpc
import grpc._channel
Expand All @@ -24,6 +25,42 @@
}


# this is used for gRPC proxy compatibility so that we do not
# mark as finished writing until we've received a response
# For more details see: https://github.com/davissp14/etcdv3-ruby/pull/117
class BlockingRequest:
def __init__(self, request):
self.request = request
self.proceed = False
self.blocked = False
self.returned = False

def read_done(self):
self.proceed = True

def is_blocked(self):
return self.blocked

def __iter__(self):
return self

def __next__(self):
if not self.returned:
self.returned = True
return self.request
self.blocked = True
try:
raise StopIteration
except StopIteration:
raise StopIteration
finally:
while not self.proceed:
time.sleep(0.001)
self.blocked = False

next = __next__ # Python 2 compatibility


def _translate_exception(exc):
code = exc.code()
exception = _EXCEPTIONS_BY_CODE.get(code)
Expand Down Expand Up @@ -898,13 +935,25 @@ def revoke_lease(self, lease_id):

@_handle_errors
def refresh_lease(self, lease_id):
keep_alive_request = etcdrpc.LeaseKeepAliveRequest(ID=lease_id)
request_stream = [keep_alive_request]
for response in self.leasestub.LeaseKeepAlive(
iter(request_stream),
self.timeout,
credentials=self.call_credentials,
metadata=self.metadata):
request_stream = BlockingRequest(
etcdrpc.LeaseKeepAliveRequest(ID=lease_id))
responses = []
try:
for response in self.leasestub.LeaseKeepAlive(
request_stream,
self.timeout,
credentials=self.call_credentials,
metadata=self.metadata):
responses.append(response)
break
except BaseException:
raise
finally:
request_stream.read_done()
while request_stream.is_blocked():
time.sleep(0.001)

for response in responses:
yield response

@_handle_errors
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def load_reqs(filename):

setup(
name='etcd3',
version='0.10.0',
version='0.10.1',
description="Python client for the etcd3 API",
long_description=readme + '\n\n' + history,
author="Louis Taylor",
Expand Down
10 changes: 5 additions & 5 deletions test-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
PyYAML==5.1
Sphinx==1.8.2
bumpversion==0.5.3
coverage==4.5.1
coverage==4.5.4
flake8-import-order==0.18
flake8==3.6.0
flake8==3.7.8
grpcio-tools>=1.2.0
hypothesis==3.82.1
hypothesis==4.36.2
pip==18.1
pytest==4.0.1
wheel==0.31.1
pycodestyle==2.4.0
pycodestyle==2.5.0
tox==3.5.3
flake8-docstrings==1.3.0
flake8-docstrings==1.4.0
mock==2.0.0
pifpaf>=0.27.1