Skip to content

Commit 373c6bb

Browse files
SK-2285 insert with requests session
1 parent 450db9d commit 373c6bb

1 file changed

Lines changed: 28 additions & 24 deletions

File tree

skyflow/vault/_client.py

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
import types
66
import requests
77
import asyncio
8+
from requests.packages.urllib3.util.retry import Retry
9+
from requests.adapters import HTTPAdapter
810
from skyflow.vault._insert import getInsertRequestBody, processResponse, convertResponse
911
from skyflow.vault._update import sendUpdateRequests, createUpdateResponseBody
1012
from skyflow.vault._config import Configuration, ConnectionConfig, DeleteOptions, DetokenizeOptions, GetOptions, InsertOptions, UpdateOptions, QueryOptions
@@ -36,6 +38,16 @@ def __init__(self, config: Configuration):
3638
raise SkyflowError(SkyflowErrorCodes.INVALID_INPUT, SkyflowErrorMessages.TOKEN_PROVIDER_ERROR.value % (
3739
str(type(config.tokenProvider))), interface=interface)
3840

41+
retry_strategy = Retry(
42+
total=3,
43+
backoff_factor=0.5,
44+
status_forcelist=[500, 502, 503, 504],
45+
)
46+
47+
self.session = requests.Session()
48+
adapter = HTTPAdapter(pool_connections=1, pool_maxsize=20, pool_block=True, max_retries=retry_strategy)
49+
self.session.mount("https://", adapter)
50+
3951
self.vaultID = config.vaultID
4052
self.vaultURL = config.vaultURL.rstrip('/')
4153
self.tokenProvider = config.tokenProvider
@@ -55,30 +67,22 @@ def insert(self, records: dict, options: InsertOptions = InsertOptions()):
5567
"Authorization": "Bearer " + self.storedToken,
5668
"sky-metadata": json.dumps(getMetrics())
5769
}
58-
max_retries = 3
59-
# Use for-loop for retry logic, avoid code repetition
60-
for attempt in range(max_retries+1):
61-
try:
62-
# If jsonBody is a dict, use json=, else use data=
63-
response = requests.post(requestURL, data=jsonBody, headers=headers)
64-
processedResponse = processResponse(response)
65-
result, partial = convertResponse(records, processedResponse, options)
66-
if partial:
67-
log_error(SkyflowErrorMessages.BATCH_INSERT_PARTIAL_SUCCESS.value, interface)
68-
raise SkyflowError(SkyflowErrorCodes.PARTIAL_SUCCESS, SkyflowErrorMessages.BATCH_INSERT_PARTIAL_SUCCESS.value, result, interface=interface)
69-
if 'records' not in result:
70-
log_error(SkyflowErrorMessages.BATCH_INSERT_FAILURE.value, interface)
71-
raise SkyflowError(SkyflowErrorCodes.SERVER_ERROR, SkyflowErrorMessages.BATCH_INSERT_FAILURE.value, result, interface=interface)
72-
log_info(InfoMessages.INSERT_DATA_SUCCESS.value, interface)
73-
return result
74-
except Exception as err:
75-
if attempt < max_retries:
76-
continue
77-
else:
78-
if isinstance(err, SkyflowError):
79-
raise err
80-
else:
81-
raise SkyflowError(SkyflowErrorCodes.SERVER_ERROR, f"Error occurred: {err}", interface=interface)
70+
# response = requests.post(requestURL, data=jsonBody, headers=headers)
71+
response = self.session.post(
72+
requestURL,
73+
data=jsonBody,
74+
headers=headers,
75+
)
76+
processedResponse = processResponse(response)
77+
print(">>> processedResponse", processedResponse)
78+
result, partial = convertResponse(records, processedResponse, options)
79+
if partial:
80+
log_error(SkyflowErrorMessages.BATCH_INSERT_PARTIAL_SUCCESS.value, interface)
81+
elif 'records' not in result:
82+
log_error(SkyflowErrorMessages.BATCH_INSERT_FAILURE.value, interface)
83+
else:
84+
log_info(InfoMessages.INSERT_DATA_SUCCESS.value, interface)
85+
return result
8286

8387
def detokenize(self, records: dict, options: DetokenizeOptions = DetokenizeOptions()):
8488
interface = InterfaceName.DETOKENIZE.value

0 commit comments

Comments
 (0)