Skip to content

Commit 4150782

Browse files
Readme update, contextual log traces in for system health, not debugging. Changed all async to sync, only using Task to connect and get messages sync.
1 parent ff2ef63 commit 4150782

4 files changed

Lines changed: 126 additions & 115 deletions

File tree

QuantConnect.DataBento/DataBentoDataProvider.cs

Lines changed: 82 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,18 @@
1515
*/
1616

1717
using System;
18+
using System.Linq;
1819
using NodaTime;
1920
using QuantConnect.Data;
21+
using QuantConnect.Data.Market;
2022
using QuantConnect.Util;
2123
using QuantConnect.Interfaces;
2224
using System.Collections.Generic;
2325
using QuantConnect.Configuration;
2426
using QuantConnect.Logging;
2527
using QuantConnect.Packets;
2628
using QuantConnect.Securities;
27-
using System.Threading.Tasks;
28-
using QuantConnect.Data.Market;
29+
using System.Collections.Concurrent;
2930

3031
namespace QuantConnect.Lean.DataSource.DataBento
3132
{
@@ -38,18 +39,16 @@ public class DataBentoProvider : IDataQueueHandler
3839
Config.Get("data-aggregator", "QuantConnect.Lean.Engine.DataFeeds.AggregationManager"), forceTypeNameOnExisting: false);
3940
private EventBasedDataQueueHandlerSubscriptionManager _subscriptionManager = null!;
4041
private readonly List<SubscriptionDataConfig> _activeSubscriptionConfigs = new();
41-
private readonly System.Collections.Concurrent.ConcurrentDictionary<Symbol, SubscriptionDataConfig> _subscriptionConfigs = new();
42+
private readonly ConcurrentDictionary<Symbol, SubscriptionDataConfig> _subscriptionConfigs = new();
4243
private DatabentoRawClient _client = null!;
4344
private readonly string _apiKey;
4445
private readonly DataBentoDataDownloader _dataDownloader;
45-
private bool _unsupportedSecurityTypeMessageLogged;
46-
private bool _unsupportedDataTypeMessageLogged;
4746
private bool _potentialUnsupportedResolutionMessageLogged;
4847
private bool _sessionStarted = false;
4948
private readonly object _sessionLock = new object();
5049
private readonly MarketHoursDatabase _marketHoursDatabase = MarketHoursDatabase.FromDataFolder();
51-
private readonly System.Collections.Concurrent.ConcurrentDictionary<Symbol, DateTimeZone> _symbolExchangeTimeZones = new();
52-
50+
private readonly ConcurrentDictionary<Symbol, DateTimeZone> _symbolExchangeTimeZones = new();
51+
5352
/// <summary>
5453
/// Returns true if we're currently connected to the Data Provider
5554
/// </summary>
@@ -86,69 +85,83 @@ public DataBentoProvider(string apiKey)
8685
/// </summary>
8786
private void Initialize()
8887
{
88+
Log.Trace("DataBentoProvider.Initialize(): Starting initialization");
8989
_subscriptionManager = new EventBasedDataQueueHandlerSubscriptionManager();
9090
_subscriptionManager.SubscribeImpl = (symbols, tickType) =>
9191
{
92+
Log.Trace($"DataBentoProvider.SubscribeImpl(): Received subscription request for {symbols.Count()} symbols, TickType={tickType}");
9293
foreach (var symbol in symbols)
9394
{
95+
Log.Trace($"DataBentoProvider.SubscribeImpl(): Processing symbol {symbol}");
9496
if (!_subscriptionConfigs.TryGetValue(symbol, out var config))
9597
{
96-
continue;
98+
Log.Error($"DataBentoProvider.SubscribeImpl(): No subscription config found for {symbol}");
99+
return false;
97100
}
98101
if (_client?.IsConnected != true)
99102
{
100-
continue;
103+
Log.Error($"DataBentoProvider.SubscribeImpl(): Client is not connected. Cannot subscribe to {symbol}");
104+
return false;
101105
}
102-
Task.Run(async () =>
106+
107+
var resolution = config.Resolution > Resolution.Tick ? Resolution.Tick : config.Resolution;
108+
if (!_client.Subscribe(config.Symbol, resolution, config.TickType))
103109
{
104-
// If the requested resolution is higher than tick, we subscribe to ticks and let the aggregator handle it.
105-
var resolutionToSubscribe = config.Resolution > Resolution.Tick ? Resolution.Tick : config.Resolution;
106-
var success = _client.Subscribe(config.Symbol, resolutionToSubscribe, config.TickType);
107-
if (!success)
108-
{
109-
Log.Error($"DataBentoProvider.SubscribeImpl(): Failed to subscribe to live data for {config.Symbol}");
110-
return;
111-
}
110+
Log.Error($"Failed to subscribe to {config.Symbol}");
111+
return false;
112+
}
112113

113-
// Start session once after first successful subscription
114-
lock (_sessionLock)
115-
{
116-
if (!_sessionStarted)
117-
{
118-
_sessionStarted = _client.StartSession();
119-
}
120-
}
121-
});
114+
lock (_sessionLock)
115+
{
116+
if (!_sessionStarted)
117+
_sessionStarted = _client.StartSession();
118+
}
122119
}
120+
123121
return true;
124122
};
125123

126124
_subscriptionManager.UnsubscribeImpl = (symbols, tickType) =>
127125
{
128126
foreach (var symbol in symbols)
129127
{
130-
if (_client?.IsConnected == true)
128+
Log.Trace($"DataBentoProvider.UnsubscribeImpl(): Processing symbol {symbol}");
129+
if (_client?.IsConnected != true)
131130
{
132-
Task.Run(() =>
133-
{
134-
_client.Unsubscribe(symbol);
135-
});
131+
throw new InvalidOperationException($"DataBentoProvider.UnsubscribeImpl(): Client is not connected. Cannot unsubscribe from {symbol}");
136132
}
133+
134+
_client.Unsubscribe(symbol);
137135
}
138136

139137
return true;
140138
};
141139

142140
// Initialize the live client
141+
Log.Trace("DataBentoProvider.Initialize(): Creating DatabentoRawClient");
143142
_client = new DatabentoRawClient(_apiKey);
144143
_client.DataReceived += OnDataReceived;
145144
_client.ConnectionStatusChanged += OnConnectionStatusChanged;
146145

147146
// Connect to live gateway
148-
Task.Run(async () =>
147+
Log.Trace("DataBentoProvider.Initialize(): Attempting connection to DataBento live gateway");
148+
Task.Run(() =>
149149
{
150-
var connected = await _client.ConnectAsync();
150+
var connected = _client.Connect();
151+
Log.Trace($"DataBentoProvider.Initialize(): Connect() returned {connected}");
152+
153+
if (connected)
154+
{
155+
Log.Trace("DataBentoProvider.Initialize(): Successfully connected to DataBento live gateway");
156+
}
157+
else
158+
{
159+
Log.Error("DataBentoProvider.Initialize(): Failed to connect to DataBento live gateway");
160+
}
151161
});
162+
163+
164+
Log.Trace("DataBentoProvider.Initialize(): Initialization complete");
152165
}
153166

154167
/// <summary>
@@ -159,7 +172,10 @@ private void Initialize()
159172
/// <returns>The new enumerator for this subscription request</returns>
160173
public IEnumerator<BaseData>? Subscribe(SubscriptionDataConfig dataConfig, EventHandler newDataAvailableHandler)
161174
{
162-
if (!CanSubscribe(dataConfig)){
175+
Log.Trace($"DataBentoProvider.Subscribe(): Received subscription request for {dataConfig.Symbol}, Resolution={dataConfig.Resolution}, TickType={dataConfig.TickType}");
176+
if (!CanSubscribe(dataConfig))
177+
{
178+
Log.Error($"DataBentoProvider.Subscribe(): Cannot subscribe to {dataConfig.Symbol} with Resolution={dataConfig.Resolution}, TickType={dataConfig.TickType}");
163179
return null;
164180
}
165181

@@ -177,11 +193,13 @@ private void Initialize()
177193
/// <param name="dataConfig">Subscription config to be removed</param>
178194
public void Unsubscribe(SubscriptionDataConfig dataConfig)
179195
{
196+
Log.Trace($"DataBentoProvider.Unsubscribe(): Received unsubscription request for {dataConfig.Symbol}, Resolution={dataConfig.Resolution}, TickType={dataConfig.TickType}");
180197
_subscriptionConfigs.TryRemove(dataConfig.Symbol, out _);
181198
_subscriptionManager.Unsubscribe(dataConfig);
182199
var toRemove = _activeSubscriptionConfigs.FirstOrDefault(c => c.Symbol == dataConfig.Symbol && c.TickType == dataConfig.TickType);
183200
if (toRemove != null)
184201
{
202+
Log.Trace($"DataBentoProvider.Unsubscribe(): Removing active subscription for {dataConfig.Symbol}, Resolution={dataConfig.Resolution}, TickType={dataConfig.TickType}");
185203
_activeSubscriptionConfigs.Remove(toRemove);
186204
}
187205
_dataAggregator.Remove(dataConfig);
@@ -214,8 +232,10 @@ public void Dispose()
214232
/// <returns>An enumerable of BaseData points</returns>
215233
public IEnumerable<BaseData>? GetHistory(Data.HistoryRequest request)
216234
{
235+
Log.Trace($"DataBentoProvider.GetHistory(): Received history request for {request.Symbol}, Resolution={request.Resolution}, TickType={request.TickType}");
217236
if (!CanSubscribe(request.Symbol))
218237
{
238+
Log.Error($"DataBentoProvider.GetHistory(): Cannot provide history for {request.Symbol} with Resolution={request.Resolution}, TickType={request.TickType}");
219239
return null;
220240
}
221241

@@ -278,11 +298,7 @@ private bool IsSupported(SecurityType securityType, Type dataType, TickType tick
278298
// Check supported security types
279299
if (!IsSecurityTypeSupported(securityType))
280300
{
281-
if (!_unsupportedSecurityTypeMessageLogged)
282-
{
283-
_unsupportedSecurityTypeMessageLogged = true;
284-
}
285-
return false;
301+
throw new NotSupportedException($"Unsupported security type: {securityType}");
286302
}
287303

288304
// Check supported data types
@@ -291,18 +307,17 @@ private bool IsSupported(SecurityType securityType, Type dataType, TickType tick
291307
dataType != typeof(Tick) &&
292308
dataType != typeof(OpenInterest))
293309
{
294-
if (!_unsupportedDataTypeMessageLogged)
295-
{
296-
_unsupportedDataTypeMessageLogged = true;
297-
}
298-
return false;
310+
throw new NotSupportedException($"Unsupported data type: {dataType}");
299311
}
300312

301313
// Warn about potential limitations for tick data
302314
// I'm mimicing polygon implementation with this
303315
if (!_potentialUnsupportedResolutionMessageLogged)
304316
{
305317
_potentialUnsupportedResolutionMessageLogged = true;
318+
Log.Trace("DataBentoDataProvider.IsSupported(): " +
319+
$"Subscription for {securityType}-{dataType}-{tickType}-{resolution} will be attempted. " +
320+
$"An Advanced DataBento subscription plan is required to stream tick data.");
306321
}
307322

308323
return true;
@@ -337,12 +352,18 @@ private void OnDataReceived(object? sender, BaseData data)
337352
{
338353
tick.Time = GetTickTime(tick.Symbol, tick.Time);
339354
_dataAggregator.Update(tick);
355+
356+
Log.Trace($"DataBentoProvider.OnDataReceived(): Updated tick - Symbol: {tick.Symbol}, " +
357+
$"TickType: {tick.TickType}, Price: {tick.Value}, Quantity: {tick.Quantity}");
340358
}
341359
else if (data is TradeBar tradeBar)
342360
{
343361
tradeBar.Time = GetTickTime(tradeBar.Symbol, tradeBar.Time);
344362
tradeBar.EndTime = GetTickTime(tradeBar.Symbol, tradeBar.EndTime);
345363
_dataAggregator.Update(tradeBar);
364+
365+
Log.Trace($"DataBentoProvider.OnDataReceived(): Updated TradeBar - Symbol: {tradeBar.Symbol}, " +
366+
$"O:{tradeBar.Open} H:{tradeBar.High} L:{tradeBar.Low} C:{tradeBar.Close} V:{tradeBar.Volume}");
346367
}
347368
else
348369
{
@@ -361,34 +382,34 @@ private void OnDataReceived(object? sender, BaseData data)
361382
/// </summary>
362383
private void OnConnectionStatusChanged(object? sender, bool isConnected)
363384
{
385+
Log.Trace($"DataBentoProvider.OnConnectionStatusChanged(): Connection status changed to: {isConnected}");
386+
364387
if (isConnected)
365388
{
366389
// Reset session flag on reconnection
367390
lock (_sessionLock)
368391
{
369392
_sessionStarted = false;
370393
}
371-
372-
// Resubscribe to all active subscriptions
373-
Task.Run(() =>
394+
395+
// Resubscribe to all active subscriptions
396+
foreach (var config in _activeSubscriptionConfigs)
374397
{
375-
foreach (var config in _activeSubscriptionConfigs)
376-
{
377-
_client.Subscribe(config.Symbol, config.Resolution, config.TickType);
378-
}
379-
380-
// Start session after resubscribing
381-
if (_activeSubscriptionConfigs.Any())
398+
_client.Subscribe(config.Symbol, config.Resolution, config.TickType);
399+
}
400+
401+
// Start session after resubscribing
402+
if (_activeSubscriptionConfigs.Any())
403+
{
404+
lock (_sessionLock)
382405
{
383-
lock (_sessionLock)
406+
if (!_sessionStarted)
384407
{
385-
if (!_sessionStarted)
386-
{
387-
_sessionStarted = _client.StartSession();
388-
}
408+
Log.Trace("DataBentoProvider.OnConnectionStatusChanged(): Starting session after reconnection");
409+
_sessionStarted = _client.StartSession();
389410
}
390411
}
391-
});
412+
}
392413
}
393414
}
394415
}

QuantConnect.DataBento/DataBentoHistoryProivder.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
using System.Collections.Generic;
2828
using QuantConnect.Configuration;
2929
using QuantConnect.Securities;
30-
using System.Threading.Tasks;
3130
using QuantConnect.Data.Consolidators;
3231

3332
namespace QuantConnect.Lean.DataSource.DataBento

0 commit comments

Comments
 (0)