2323using QuantConnect . Configuration ;
2424using QuantConnect . Logging ;
2525using QuantConnect . Packets ;
26+ using QuantConnect . Securities ;
2627using System . Threading . Tasks ;
2728using QuantConnect . Data . Market ;
2829
@@ -67,7 +68,12 @@ public class DataBentoProvider : IDataQueueHandler
6768 private bool _unsupportedDataTypeMessageLogged ;
6869 private bool _potentialUnsupportedResolutionMessageLogged ;
6970
71+ private bool _sessionStarted = false ;
72+ private readonly object _sessionLock = new object ( ) ;
7073
74+ private readonly MarketHoursDatabase _marketHoursDatabase = MarketHoursDatabase . FromDataFolder ( ) ;
75+ private readonly System . Collections . Concurrent . ConcurrentDictionary < Symbol , DateTimeZone > _symbolExchangeTimeZones = new ( ) ;
76+
7177 /// <summary>
7278 /// Returns true if we're currently connected to the Data Provider
7379 /// </summary>
@@ -124,12 +130,30 @@ private void Initialize()
124130 if ( _client ? . IsConnected == true )
125131 {
126132 Log . Trace ( $ "DataBentoProvider.SubscribeImpl(): Client is connected, attempting async subscribe for { symbol } ") ;
127- Task . Run ( ( ) =>
133+ Task . Run ( async ( ) =>
128134 {
129135 var success = _client . Subscribe ( config . Symbol , config . Resolution , config . TickType ) ;
130136 if ( success )
131137 {
132138 Log . Trace ( $ "DataBentoProvider.SubscribeImpl(): Successfully subscribed to { config . Symbol } ") ;
139+
140+ // Start session after first successful subscription
141+ lock ( _sessionLock )
142+ {
143+ if ( ! _sessionStarted )
144+ {
145+ Log . Trace ( "DataBentoProvider.SubscribeImpl(): Starting DataBento session to receive data" ) ;
146+ _sessionStarted = _client . StartSession ( ) ;
147+ if ( _sessionStarted )
148+ {
149+ Log . Trace ( "DataBentoProvider.SubscribeImpl(): Session started successfully - data should begin flowing" ) ;
150+ }
151+ else
152+ {
153+ Log . Error ( "DataBentoProvider.SubscribeImpl(): Failed to start session" ) ;
154+ }
155+ }
156+ }
133157 }
134158 else
135159 {
@@ -366,19 +390,58 @@ private bool IsSupported(SecurityType securityType, Type dataType, TickType tick
366390
367391 return true ;
368392 }
393+
394+ /// <summary>
395+ /// Converts the given UTC time into the symbol security exchange time zone
396+ /// </summary>
397+ private DateTime GetTickTime ( Symbol symbol , DateTime utcTime )
398+ {
399+ var exchangeTimeZone = _symbolExchangeTimeZones . GetOrAdd ( symbol , sym =>
400+ {
401+ if ( _marketHoursDatabase . TryGetEntry ( sym . ID . Market , sym , sym . SecurityType , out var entry ) )
402+ {
403+ return entry . ExchangeHours . TimeZone ;
404+ }
405+ // Futures default to Chicago
406+ return TimeZones . Chicago ;
407+ } ) ;
408+
409+ return utcTime . ConvertFromUtc ( exchangeTimeZone ) ;
410+ }
411+
369412 // <summary>
370413 /// Handles data received from the live client
371414 /// </summary>
372415 private void OnDataReceived ( object ? sender , BaseData data )
373416 {
374- Log . Trace ( $ "DataBentoProvider.OnDataReceived(): Received data: { data } ") ;
375417 try
376418 {
377- _dataAggregator . Update ( data ) ;
419+ if ( data is Tick tick )
420+ {
421+ tick . Time = GetTickTime ( tick . Symbol , tick . Time ) ;
422+ _dataAggregator . Update ( tick ) ;
423+
424+ Log . Trace ( $ "DataBentoProvider.OnDataReceived(): Updated tick - Symbol: { tick . Symbol } , " +
425+ $ "TickType: { tick . TickType } , Price: { tick . Value } , Quantity: { tick . Quantity } ") ;
426+ }
427+ else if ( data is TradeBar tradeBar )
428+ {
429+ tradeBar . Time = GetTickTime ( tradeBar . Symbol , tradeBar . Time ) ;
430+ tradeBar . EndTime = GetTickTime ( tradeBar . Symbol , tradeBar . EndTime ) ;
431+ _dataAggregator . Update ( tradeBar ) ;
432+
433+ Log . Trace ( $ "DataBentoProvider.OnDataReceived(): Updated TradeBar - Symbol: { tradeBar . Symbol } , " +
434+ $ "O:{ tradeBar . Open } H:{ tradeBar . High } L:{ tradeBar . Low } C:{ tradeBar . Close } V:{ tradeBar . Volume } ") ;
435+ }
436+ else
437+ {
438+ data . Time = GetTickTime ( data . Symbol , data . Time ) ;
439+ _dataAggregator . Update ( data ) ;
440+ }
378441 }
379442 catch ( Exception ex )
380443 {
381- Log . Error ( $ "DataBentoProvider.OnDataReceived(): Error updating data aggregator: { ex . Message } ") ;
444+ Log . Error ( $ "DataBentoProvider.OnDataReceived(): Error updating data aggregator: { ex . Message } \n { ex . StackTrace } ") ;
382445 }
383446 }
384447
@@ -391,13 +454,32 @@ private void OnConnectionStatusChanged(object? sender, bool isConnected)
391454
392455 if ( isConnected )
393456 {
457+ // Reset session flag on reconnection
458+ lock ( _sessionLock )
459+ {
460+ _sessionStarted = false ;
461+ }
462+
394463 // Resubscribe to all active subscriptions
395464 Task . Run ( ( ) =>
396465 {
397466 foreach ( var config in _activeSubscriptionConfigs )
398467 {
399468 _client . Subscribe ( config . Symbol , config . Resolution , config . TickType ) ;
400469 }
470+
471+ // Start session after resubscribing
472+ if ( _activeSubscriptionConfigs . Any ( ) )
473+ {
474+ lock ( _sessionLock )
475+ {
476+ if ( ! _sessionStarted )
477+ {
478+ Log . Trace ( "DataBentoProvider.OnConnectionStatusChanged(): Starting session after reconnection" ) ;
479+ _sessionStarted = _client . StartSession ( ) ;
480+ }
481+ }
482+ }
401483 } ) ;
402484 }
403485 }
0 commit comments