@@ -81,10 +81,12 @@ cdef class ThinConnImpl(BaseConnImpl):
8181 elif stmt._cursor_id != 0 :
8282 self ._add_cursor_to_close(stmt)
8383
84- cdef object _connect_with_description(self , Description description):
84+ cdef object _connect_with_description(self , Description description,
85+ ConnectParamsImpl params,
86+ bint final_desc):
8587 cdef:
86- double timeout = description.tcp_connect_timeout
87- bint load_balance = description.load_balance, raise_exc
88+ bint load_balance = description.load_balance
89+ bint raise_exc = False
8890 list address_lists = description.address_lists
8991 uint32_t i, j, k, num_addresses, idx1, idx2
9092 uint32_t num_attempts = description.retry_count + 1
@@ -116,13 +118,19 @@ cdef class ThinConnImpl(BaseConnImpl):
116118 else :
117119 idx2 = k
118120 address = address_list.addresses[idx2]
119- raise_exc = i == num_attempts - 1
120- sock = self ._get_socket(address, timeout, raise_exc)
121- if sock is None :
121+ if final_desc:
122+ raise_exc = i == num_attempts - 1
123+ redirect_params = self ._connect_with_address(address,
124+ description,
125+ params,
126+ raise_exc)
127+ if redirect_params is not None :
128+ return redirect_params
129+ if self ._protocol._in_connect:
122130 continue
123131 address_list.lru_index = (idx1 + 1 ) % num_addresses
124132 description.lru_index = (idx2 + 1 ) % num_lists
125- return (sock, address)
133+ return
126134 time.sleep(description.retry_delay)
127135
128136 cdef ConnectParamsImpl _connect_with_params(self ,
@@ -137,36 +145,23 @@ cdef class ThinConnImpl(BaseConnImpl):
137145 list descriptions = description_list.descriptions
138146 ssize_t i, idx, num_descriptions = len (descriptions)
139147 Description description
140- Address address
141- tuple ret_tuple
148+ bint final_desc = False
142149 for i in range (num_descriptions):
150+ if i == num_descriptions - 1 :
151+ final_desc = True
143152 if description_list.load_balance:
144153 idx = (i + description_list.lru_index) % num_descriptions
145154 else :
146155 idx = i
147156 description = descriptions[idx]
148- ret_tuple = self ._connect_with_description(description)
149- if ret_tuple is not None :
150- sock, address = ret_tuple
157+ redirect_params = self ._connect_with_description(description,
158+ params,
159+ final_desc)
160+ if redirect_params is not None \
161+ or not self ._protocol._in_connect:
151162 description_list.lru_index = (idx + 1 ) % num_descriptions
152163 break
153- if description.expire_time > 0 :
154- sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1 )
155- if hasattr (socket, " TCP_KEEPIDLE" ) \
156- and hasattr (socket, " TCP_KEEPINTVL" ) \
157- and hasattr (socket, " TCP_KEEPCNT" ):
158- sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE,
159- description.expire_time * 60 )
160- sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 6 )
161- sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 10 )
162- sock.settimeout(None )
163- if address.protocol == " tcps" :
164- sock = get_ssl_socket(sock, params, description, address)
165- self ._drcp_enabled = description.server_type == " pooled"
166- if self ._cclass is None :
167- self ._cclass = description.cclass
168- self ._protocol = Protocol(sock)
169- return self ._protocol._connect(self , params, description, address)
164+ return redirect_params
170165
171166 cdef Message _create_message(self , type typ):
172167 """
@@ -182,36 +177,71 @@ cdef class ThinConnImpl(BaseConnImpl):
182177 self ._pool = None
183178 self ._protocol._force_close()
184179
185- cdef object _get_socket(self , object address,
186- double tcp_connect_timeout,
187- bint raise_exception):
180+ cdef object _connect_with_address(self , Address address,
181+ Description description,
182+ ConnectParamsImpl params,
183+ bint raise_exception):
188184 """
189- Get a socket on which to communicate using the provided parameters. If
190- a proxy is configured, a connection to the proxy is established and the
191- target host and port is forwarded to the proxy before the socket is
192- returned.
185+ Creates a socket on which to communicate using the provided parameters.
186+ If a proxy is configured, a connection to the proxy is established and
187+ the target host and port is forwarded to the proxy. The socket is used
188+ to establish a connection with the database. If a redirect is
189+ required, the redirect parameters are returned.
193190 """
194- cdef bint use_proxy = (address.https_proxy is not None )
191+ cdef:
192+ bint use_proxy = (address.https_proxy is not None )
193+ double timeout = description.tcp_connect_timeout
195194 if use_proxy:
196195 connect_info = (address.https_proxy, address.https_proxy_port)
197196 else :
198197 connect_info = (address.host, address.port)
199198 try :
200- sock = socket.create_connection(connect_info,
201- tcp_connect_timeout)
202- except (socket.gaierror, ConnectionRefusedError):
199+ sock = socket.create_connection(connect_info, timeout)
200+ if use_proxy:
201+ data = f" CONNECT {address.host}:{address.port} HTTP/1.0\r \n \r \n "
202+ sock.send(data.encode())
203+ reply = sock.recv(1024 )
204+ match = re.search(' HTTP/1.[01]\\ s+(\\ d+)\\ s+' , reply.decode())
205+ if match is None or match.groups()[0 ] != ' 200' :
206+ errors._raise_err(errors.ERR_PROXY_FAILURE,
207+ response = reply.decode())
208+ if description.expire_time > 0 :
209+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1 )
210+ if hasattr (socket, " TCP_KEEPIDLE" ) \
211+ and hasattr (socket, " TCP_KEEPINTVL" ) \
212+ and hasattr (socket, " TCP_KEEPCNT" ):
213+ sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE,
214+ description.expire_time * 60 )
215+ sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL,
216+ 6 )
217+ sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT,
218+ 10 )
219+ sock.settimeout(None )
220+ if address.protocol == " tcps" :
221+ sock = get_ssl_socket(sock, params, description, address)
222+ self ._drcp_enabled = description.server_type == " pooled"
223+ if self ._cclass is None :
224+ self ._cclass = description.cclass
225+ self ._protocol = Protocol(sock)
226+ redirect_params = self ._protocol._connect_phase_one(self , params,
227+ description,
228+ address)
229+ if redirect_params is not None :
230+ return redirect_params
231+ except exceptions.ConnectionError:
203232 if raise_exception:
204233 raise
205- return None
206- if use_proxy:
207- data = f" CONNECT {address.host}:{address.port} HTTP/1.0\r \n \r \n "
208- sock.send(data.encode())
209- reply = sock.recv(1024 )
210- match = re.search(' HTTP/1.[01]\\ s+(\\ d+)\\ s+' , reply.decode())
211- if match is None or match.groups()[0 ] != ' 200' :
212- errors._raise_err(errors.ERR_PROXY_FAILURE,
213- response = reply.decode())
214- return sock
234+ return
235+ except (socket.gaierror, ConnectionRefusedError) as e:
236+ if raise_exception:
237+ errors._raise_err(errors.ERR_CONNECTION_FAILED, cause = e,
238+ exception = str (e))
239+ return
240+ except Exception as e:
241+ errors._raise_err(errors.ERR_CONNECTION_FAILED, cause = e,
242+ exception = str (e))
243+ return
244+ self ._protocol._connect_phase_two(self , description, params)
215245
216246 cdef Statement _get_statement(self , str sql, bint cache_statement):
217247 """
0 commit comments