Skip to content

Commit b4401db

Browse files
authored
Refactor ApiClient to use HttpClient (#9114)
* Refactor ApiClient to use HttpClient * Minor fix * Refactor and cleanup in ApiConnection * Add normalized extension methods for string and stream json serialization * Minor fixes * Address peer review * Minor fixes * Minor fixes and peer review * Race condition fix * Minor changes * Cleanup * Minor fixes * Minor fix
1 parent bc646c9 commit b4401db

File tree

5 files changed

+308
-36
lines changed

5 files changed

+308
-36
lines changed

Algorithm.Python/ScheduledQueuingAlgorithm.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,18 @@ def initialize(self) -> None:
2020
self.set_start_date(2020, 9, 1)
2121
self.set_end_date(2020, 9, 2)
2222
self.set_cash(100000)
23-
23+
2424
self.__number_of_symbols = 2000
2525
self.__number_of_symbols_fine = 1000
2626
self.set_universe_selection(FineFundamentalUniverseSelectionModel(self.coarse_selection_function, self.fine_selection_function, None))
27-
27+
2828
self.set_portfolio_construction(EqualWeightingPortfolioConstructionModel())
29-
29+
3030
self.set_execution(ImmediateExecutionModel())
31-
31+
3232
self._queue = Queue()
3333
self._dequeue_size = 100
34-
34+
3535
self.add_equity("SPY", Resolution.MINUTE)
3636
self.schedule.on(self.date_rules.every_day("SPY"), self.time_rules.at(0, 0), self.fill_queue)
3737
self.schedule.on(self.date_rules.every_day("SPY"), self.time_rules.every(timedelta(minutes=60)), self.take_from_queue)
@@ -40,22 +40,22 @@ def coarse_selection_function(self, coarse: list[CoarseFundamental]) -> list[Sym
4040
has_fundamentals = [security for security in coarse if security.has_fundamental_data]
4141
sorted_by_dollar_volume = sorted(has_fundamentals, key=lambda x: x.dollar_volume, reverse=True)
4242
return [ x.symbol for x in sorted_by_dollar_volume[:self.__number_of_symbols] ]
43-
43+
4444
def fine_selection_function(self, fine: list[FineFundamental]) -> list[Symbol]:
4545
sorted_by_pe_ratio = sorted(fine, key=lambda x: x.valuation_ratios.pe_ratio, reverse=True)
4646
return [ x.symbol for x in sorted_by_pe_ratio[:self.__number_of_symbols_fine] ]
47-
47+
4848
def fill_queue(self) -> None:
49-
securities = [security for security in self.active_securities.values if security.fundamentals]
50-
49+
securities = [security for security in self.active_securities.values() if security.fundamentals]
50+
5151
# Fill queue with symbols sorted by PE ratio (decreasing order)
5252
self._queue.queue.clear()
5353
sorted_by_pe_ratio = sorted(securities, key=lambda x: x.fundamentals.valuation_ratios.pe_ratio, reverse=True)
5454
for security in sorted_by_pe_ratio:
5555
self._queue.put(security.symbol)
56-
56+
5757
def take_from_queue(self) -> None:
5858
symbols = [self._queue.get() for _ in range(min(self._dequeue_size, self._queue.qsize()))]
5959
self.history(symbols, 10, Resolution.DAILY)
60-
60+
6161
self.log(f"Symbols at {self.time}: {[str(symbol) for symbol in symbols]}")

Algorithm/QCAlgorithm.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2484,7 +2484,7 @@ public Option AddOptionContract(Symbol symbol, Resolution? resolution = null, bo
24842484
/// </summary>
24852485
/// <param name="ticker">The currency pair</param>
24862486
/// <param name="resolution">The <see cref="Resolution"/> of market data, Tick, Second, Minute, Hour, or Daily. Default is <see cref="Resolution.Minute"/></param>
2487-
/// <param name="market">The foreign exchange trading market, <seealso cref="Market"/>. Default value is null and looked up using <see cref="IBrokerageModel.DefaultMarkets"> in <see cref="AddSecurity{T}"/></param>
2487+
/// <param name="market">The foreign exchange trading market, <seealso cref="Market"/>. Default value is null and looked up using <see cref="IBrokerageModel.DefaultMarkets" /> in <see cref="AddSecurity{T}"/></param>
24882488
/// <param name="fillForward">If true, returns the last available data even if none in that timeslice. Default is <value>true</value></param>
24892489
/// <param name="leverage">The requested leverage for this forex security. Default is set by <see cref="SecurityInitializer"/></param>
24902490
/// <returns>The new <see cref="Forex"/> security</returns>
@@ -2499,7 +2499,7 @@ public Forex AddForex(string ticker, Resolution? resolution = null, string marke
24992499
/// </summary>
25002500
/// <param name="ticker">The CFD ticker symbol</param>
25012501
/// <param name="resolution">The <see cref="Resolution"/> of market data, Tick, Second, Minute, Hour, or Daily. Default is <see cref="Resolution.Minute"/></param>
2502-
/// <param name="market">The cfd trading market, <seealso cref="Market"/>. Default value is null and looked up using <see cref="IBrokerageModel.DefaultMarkets"> in <see cref="AddSecurity{T}"/></param>
2502+
/// <param name="market">The cfd trading market, <seealso cref="Market"/>. Default value is null and looked up using <see cref="IBrokerageModel.DefaultMarkets"/> in <see cref="AddSecurity{T}"/></param>
25032503
/// <param name="fillForward">If true, returns the last available data even if none in that timeslice. Default is <value>true</value></param>
25042504
/// <param name="leverage">The requested leverage for this CFD. Default is set by <see cref="SecurityInitializer"/></param>
25052505
/// <returns>The new <see cref="Cfd"/> security</returns>
@@ -2530,7 +2530,7 @@ public Index AddIndex(string ticker, Resolution? resolution = null, string marke
25302530
/// </summary>
25312531
/// <param name="ticker">The crypto ticker symbol/param>
25322532
/// <param name="resolution">The <see cref="Resolution"/> of market data, Tick, Second, Minute, Hour, or Daily. Default is <see cref="Resolution.Minute"/></param>
2533-
/// <param name="market">The The crypto trading market, <seealso cref="Market"/>. Default value is null and looked up using <see cref="IBrokerageModel.DefaultMarkets"> in <see cref="AddSecurity{T}"/></param>
2533+
/// <param name="market">The The crypto trading market, <seealso cref="Market"/>. Default value is null and looked up using <see cref="IBrokerageModel.DefaultMarkets"/> in <see cref="AddSecurity{T}"/></param>
25342534
/// <param name="fillForward">If true, returns the last available data even if none in that timeslice. Default is <value>true</value></param>
25352535
/// <param name="leverage">The requested leverage for this crypto. Default is set by <see cref="SecurityInitializer"/></param>
25362536
/// <returns>The new <see cref="Crypto"/> security</returns>
@@ -2545,7 +2545,7 @@ public Crypto AddCrypto(string ticker, Resolution? resolution = null, string mar
25452545
/// </summary>
25462546
/// <param name="ticker">The crypto future ticker symbol</param>
25472547
/// <param name="resolution">The <see cref="Resolution"/> of market data, Tick, Second, Minute, Hour, or Daily. Default is <see cref="Resolution.Minute"/></param>
2548-
/// <param name="market">The The crypto future trading market, <seealso cref="Market"/>. Default value is null and looked up using <see cref="IBrokerageModel.DefaultMarkets"> in <see cref="AddSecurity{T}"/></param>
2548+
/// <param name="market">The The crypto future trading market, <seealso cref="Market"/>. Default value is null and looked up using <see cref="IBrokerageModel.DefaultMarkets"/> in <see cref="AddSecurity{T}"/></param>
25492549
/// <param name="fillForward">If true, returns the last available data even if none in that timeslice. Default is <value>true</value></param>
25502550
/// <param name="leverage">The requested leverage for this crypto future. Default is set by <see cref="SecurityInitializer"/></param>
25512551
/// <returns>The new <see cref="CryptoFuture"/> security</returns>

Api/ApiConnection.cs

Lines changed: 174 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,15 @@
1515

1616
using System;
1717
using RestSharp;
18-
using Newtonsoft.Json;
19-
using QuantConnect.Orders;
2018
using QuantConnect.Logging;
2119
using System.Threading.Tasks;
22-
using RestSharp.Authenticators;
20+
using System.Net.Http;
21+
using System.Net.Http.Headers;
22+
using System.Text;
23+
using System.Collections.Generic;
24+
using QuantConnect.Util;
25+
using System.IO;
26+
using System.Threading;
2327

2428
namespace QuantConnect.Api
2529
{
@@ -28,11 +32,15 @@ namespace QuantConnect.Api
2832
/// </summary>
2933
public class ApiConnection
3034
{
31-
private readonly static JsonSerializerSettings _jsonSettings = new() { Converters = { new LiveAlgorithmResultsJsonConverter(), new OrderJsonConverter() } };
35+
/// <summary>
36+
/// Authorized client to use for requests.
37+
/// </summary>
38+
private HttpClient _httpClient;
3239

3340
/// <summary>
3441
/// Authorized client to use for requests.
3542
/// </summary>
43+
[Obsolete("RestSharp is deprecated and will be removed in a future release. Please use the SetClient method or the request methods that take an HttpRequestMessage")]
3644
public RestClient Client { get; set; }
3745

3846
// Authorization Credentials
@@ -46,11 +54,14 @@ public class ApiConnection
4654
/// </summary>
4755
/// <param name="userId">User Id number from QuantConnect.com account. Found at www.quantconnect.com/account </param>
4856
/// <param name="token">Access token for the QuantConnect account. Found at www.quantconnect.com/account </param>
49-
public ApiConnection(int userId, string token)
57+
/// <param name="baseUrl">The client's base address</param>
58+
/// <param name="defaultHeaders">Default headers for the client</param>
59+
/// <param name="timeout">The client timeout in seconds</param>
60+
public ApiConnection(int userId, string token, string baseUrl = null, Dictionary<string, string> defaultHeaders = null, int timeout = 0)
5061
{
5162
_token = token;
5263
_userId = userId.ToStringInvariant();
53-
Client = new RestClient(Globals.Api);
64+
SetClient(!string.IsNullOrEmpty(baseUrl) ? baseUrl : Globals.Api, defaultHeaders, timeout);
5465
}
5566

5667
/// <summary>
@@ -60,13 +71,40 @@ public bool Connected
6071
{
6172
get
6273
{
63-
var request = new RestRequest("authenticate", Method.GET);
64-
AuthenticationResponse response;
65-
if (TryRequest(request, out response))
74+
using var request = new HttpRequestMessage(HttpMethod.Get, "authenticate");
75+
return TryRequest(request, out AuthenticationResponse response) && response.Success;
76+
}
77+
}
78+
79+
/// <summary>
80+
/// Overrides the current client
81+
/// </summary>
82+
/// <param name="baseUrl">The client's base address</param>
83+
/// <param name="defaultHeaders">Default headers for the client</param>
84+
/// <param name="timeout">The client timeout in seconds</param>
85+
public void SetClient(string baseUrl, Dictionary<string, string> defaultHeaders = null, int timeout = 0)
86+
{
87+
if (_httpClient != null)
88+
{
89+
_httpClient.DisposeSafely();
90+
}
91+
92+
_httpClient = new HttpClient() { BaseAddress = new Uri($"{baseUrl.TrimEnd('/')}/") };
93+
Client = new RestClient(baseUrl);
94+
95+
if (defaultHeaders != null)
96+
{
97+
foreach (var header in defaultHeaders)
6698
{
67-
return response.Success;
99+
_httpClient.DefaultRequestHeaders.Add(header.Key, header.Value);
68100
}
69-
return false;
101+
Client.AddDefaultHeaders(defaultHeaders);
102+
}
103+
104+
if (timeout > 0)
105+
{
106+
_httpClient.Timeout = TimeSpan.FromSeconds(timeout);
107+
Client.Timeout = timeout * 1000;
70108
}
71109
}
72110

@@ -77,6 +115,7 @@ public bool Connected
77115
/// <param name="request"></param>
78116
/// <param name="result">Result object from the </param>
79117
/// <returns>T typed object response</returns>
118+
[Obsolete("RestSharp is deprecated and will be removed in a future release. Please use the TryRequest(HttpRequestMessage)")]
80119
public bool TryRequest<T>(RestRequest request, out T result)
81120
where T : RestResponse
82121
{
@@ -90,7 +129,24 @@ public bool TryRequest<T>(RestRequest request, out T result)
90129
/// </summary>
91130
/// <typeparam name="T"></typeparam>
92131
/// <param name="request"></param>
132+
/// <param name="result">Result object from the </param>
133+
/// <param name="timeout">Timeout for the request</param>
93134
/// <returns>T typed object response</returns>
135+
public bool TryRequest<T>(HttpRequestMessage request, out T result, TimeSpan? timeout = null)
136+
where T : RestResponse
137+
{
138+
var resultTuple = TryRequestAsync<T>(request).SynchronouslyAwaitTaskResult();
139+
result = resultTuple.Item2;
140+
return resultTuple.Item1;
141+
}
142+
143+
/// <summary>
144+
/// Place a secure request and get back an object of type T.
145+
/// </summary>
146+
/// <typeparam name="T"></typeparam>
147+
/// <param name="request"></param>
148+
/// <returns>T typed object response</returns>
149+
[Obsolete("RestSharp is deprecated and will be removed in a future release. Please use the TryRequestAsync(HttpRequestMessage)")]
94150
public async Task<Tuple<bool, T>> TryRequestAsync<T>(RestRequest request)
95151
where T : RestResponse
96152
{
@@ -116,7 +172,7 @@ public async Task<Tuple<bool, T>> TryRequestAsync<T>(RestRequest request)
116172
}
117173

118174
responseContent = restsharpResponse.Content;
119-
result = JsonConvert.DeserializeObject<T>(responseContent, _jsonSettings);
175+
result = responseContent.DeserializeJson<T>();
120176

121177
if (result == null || !result.Success)
122178
{
@@ -133,36 +189,133 @@ public async Task<Tuple<bool, T>> TryRequestAsync<T>(RestRequest request)
133189
return new Tuple<bool, T>(true, result);
134190
}
135191

192+
/// <summary>
193+
/// Place a secure request and get back an object of type T.
194+
/// </summary>
195+
/// <typeparam name="T"></typeparam>
196+
/// <param name="request"></param>
197+
/// <param name="timeout">Timeout for the request</param>
198+
/// <returns>T typed object response</returns>
199+
public async Task<Tuple<bool, T>> TryRequestAsync<T>(HttpRequestMessage request, TimeSpan? timeout = null)
200+
where T : RestResponse
201+
{
202+
HttpResponseMessage response = null;
203+
Stream responseContentStream = null;
204+
T result = null;
205+
try
206+
{
207+
if (request.RequestUri.OriginalString.StartsWith('/'))
208+
{
209+
request.RequestUri = new Uri(request.RequestUri.ToString().TrimStart('/'), UriKind.Relative);
210+
}
211+
212+
SetAuthenticator(request);
213+
214+
// Execute the authenticated REST API Call
215+
if (timeout.HasValue)
216+
{
217+
using var cancellationTokenSource = new CancellationTokenSource(timeout.Value);
218+
response = await _httpClient.SendAsync(request, cancellationTokenSource.Token).ConfigureAwait(false);
219+
responseContentStream = await response.Content.ReadAsStreamAsync(cancellationTokenSource.Token).ConfigureAwait(false);
220+
}
221+
else
222+
{
223+
response = await _httpClient.SendAsync(request).ConfigureAwait(false);
224+
responseContentStream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false);
225+
}
226+
227+
result = responseContentStream.DeserializeJson<T>(leaveOpen: true);
228+
229+
if (!response.IsSuccessStatusCode)
230+
{
231+
Log.Error($"ApiConnect.TryRequest({request.RequestUri}): HTTP Error: {(int)response.StatusCode} {response.ReasonPhrase}. " +
232+
$"Content: {GetRawResponseContent(responseContentStream)}");
233+
}
234+
if (result == null || !result.Success)
235+
{
236+
if (Log.DebuggingEnabled)
237+
{
238+
Log.Debug($"ApiConnection.TryRequest({request.RequestUri}): Raw response: '{GetRawResponseContent(responseContentStream)}'");
239+
}
240+
return new Tuple<bool, T>(false, result);
241+
}
242+
}
243+
catch (Exception err)
244+
{
245+
Log.Error($"ApiConnection.TryRequest({request.RequestUri}): Error: {err.Message}, Response content: {GetRawResponseContent(responseContentStream)}");
246+
return new Tuple<bool, T>(false, null);
247+
}
248+
finally
249+
{
250+
response?.DisposeSafely();
251+
responseContentStream?.DisposeSafely();
252+
}
253+
254+
return new Tuple<bool, T>(true, result);
255+
}
256+
257+
private static string GetRawResponseContent(Stream stream)
258+
{
259+
if (stream == null)
260+
{
261+
return string.Empty;
262+
}
263+
264+
try
265+
{
266+
stream.Position = 0;
267+
using var reader = new StreamReader(stream, leaveOpen: true);
268+
return reader.ReadToEnd();
269+
}
270+
catch (Exception)
271+
{
272+
return string.Empty;
273+
}
274+
}
275+
136276
private void SetAuthenticator(RestRequest request)
137277
{
138-
var newTimeStamp = (int)Time.TimeStamp();
278+
var base64EncodedAuthenticationString = GetAuthenticatorHeader(out var timeStamp);
279+
request.AddHeader("Authorization", $"Basic {base64EncodedAuthenticationString}");
280+
request.AddHeader("Timestamp", timeStamp);
281+
}
139282

283+
private void SetAuthenticator(HttpRequestMessage request)
284+
{
285+
var base64EncodedAuthenticationString = GetAuthenticatorHeader(out var timeStamp);
286+
_httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", base64EncodedAuthenticationString);
287+
request.Headers.Add("Timestamp", timeStamp);
288+
}
289+
290+
private string GetAuthenticatorHeader(out string timeStamp)
291+
{
292+
var newTimeStamp = (int)Time.TimeStamp();
140293
var currentAuth = _authenticator;
141294
if (currentAuth == null || newTimeStamp - currentAuth.TimeStamp > 7000)
142295
{
143296
// Generate the hash each request
144297
// Add the UTC timestamp to the request header.
145298
// Timestamps older than 7200 seconds will not work.
146299
var hash = Api.CreateSecureHash(newTimeStamp, _token);
147-
var authenticator = new HttpBasicAuthenticator(_userId, hash);
148-
_authenticator = currentAuth = new LeanAuthenticator(authenticator, newTimeStamp);
149-
150-
Client.Authenticator = currentAuth.Authenticator;
300+
var authenticationString = $"{_userId}:{hash}";
301+
var base64EncodedAuthenticationString = Convert.ToBase64String(Encoding.UTF8.GetBytes(authenticationString));
302+
_authenticator = currentAuth = new LeanAuthenticator(newTimeStamp, base64EncodedAuthenticationString);
151303
}
152304

153-
request.AddHeader("Timestamp", currentAuth.TimeStampStr);
305+
timeStamp = currentAuth.TimeStampStr;
306+
return currentAuth.Base64EncodedAuthenticationString;
154307
}
155308

156309
private class LeanAuthenticator
157310
{
158311
public int TimeStamp { get; }
159312
public string TimeStampStr { get; }
160-
public HttpBasicAuthenticator Authenticator { get; }
161-
public LeanAuthenticator(HttpBasicAuthenticator authenticator, int timeStamp)
313+
public string Base64EncodedAuthenticationString { get; }
314+
public LeanAuthenticator(int timeStamp, string base64EncodedAuthenticationString)
162315
{
163316
TimeStamp = timeStamp;
164-
Authenticator = authenticator;
165317
TimeStampStr = timeStamp.ToStringInvariant();
318+
Base64EncodedAuthenticationString = base64EncodedAuthenticationString;
166319
}
167320
}
168321
}

0 commit comments

Comments
 (0)