@@ -221,12 +221,21 @@ public bool Subscribe(Symbol symbol, Resolution resolution, TickType tickType)
221221 // subscribe
222222 var subscribeMessage = $ "schema={ schema } |stype_in=parent|symbols={ databentoSymbol } ";
223223 Log . Trace ( $ "DatabentoRawClient.Subscribe(): Subscribing with message: { subscribeMessage } ") ;
224-
224+
225225 _writer . WriteLine ( subscribeMessage ) ;
226226
227227 _subscriptions . TryAdd ( symbol , ( resolution , tickType ) ) ;
228228 Log . Trace ( $ "DatabentoRawClient.Subscribe(): Subscribed to { symbol } ({ databentoSymbol } ) at { resolution } resolution for { tickType } ") ;
229229
230+ // If subscribing to quote ticks, also subscribe to trade ticks
231+ if ( tickType == TickType . Quote && resolution == Resolution . Tick )
232+ {
233+ var tradeSchema = GetSchema ( resolution , TickType . Trade ) ;
234+ var tradeSubscribeMessage = $ "schema={ tradeSchema } |stype_in=parent|symbols={ databentoSymbol } ";
235+ Log . Trace ( $ "DatabentoRawClient.Subscribe(): Also subscribing to trades with message: { tradeSubscribeMessage } ") ;
236+ _writer . WriteLine ( tradeSubscribeMessage ) ;
237+ }
238+
230239 return true ;
231240 }
232241 catch ( Exception ex )
@@ -354,7 +363,7 @@ private async Task ProcessSingleMessage(string message)
354363 if ( headerElement . TryGetProperty ( "rtype" , out var rtypeElement ) )
355364 {
356365 var rtype = rtypeElement . GetInt32 ( ) ;
357-
366+
358367 if ( rtype == 23 )
359368 {
360369 if ( root . TryGetProperty ( "msg" , out var msgElement ) )
@@ -372,15 +381,13 @@ private async Task ProcessSingleMessage(string message)
372381 {
373382 var instrumentId = instId . GetInt64 ( ) ;
374383 var outSymbolStr = outSymbol . GetString ( ) ;
375-
384+
376385 Log . Trace ( $ "DatabentoRawClient: Symbol mapping: { inSymbol . GetString ( ) } -> { outSymbolStr } (instrument_id: { instrumentId } )") ;
377-
386+
378387 // Find the subscription that matches this symbol
379388 foreach ( var kvp in _subscriptions )
380389 {
381390 var leanSymbol = kvp . Key ;
382- // Check if the DataBento symbol matches our subscription
383- // For ES19Z25, the outSymbol might be ESZ25 or similar
384391 if ( outSymbolStr != null )
385392 {
386393 _instrumentIdToSymbol [ instrumentId ] = leanSymbol ;
@@ -391,10 +398,22 @@ private async Task ProcessSingleMessage(string message)
391398 }
392399 return ;
393400 }
394- else if ( rtype == 0 || rtype == 1 || rtype == 160 )
401+ else if ( rtype == 1 )
402+ {
403+ // MBP-1 (Market By Price) - Quote ticks
404+ await HandleMBPMessage ( root , headerElement ) ;
405+ return ;
406+ }
407+ else if ( rtype == 0 || rtype == 32 )
408+ {
409+ // Trade messages - Trade ticks
410+ await HandleTradeTickMessage ( root , headerElement ) ;
411+ return ;
412+ }
413+ else if ( rtype == 32 || rtype == 33 || rtype == 34 || rtype == 35 )
395414 {
396- // Trade data or MBO
397- await HandleTradeMessage ( root , headerElement ) ;
415+ // OHLCV bar messages
416+ await HandleOHLCVMessage ( root , headerElement ) ;
398417 return ;
399418 }
400419 }
@@ -415,11 +434,171 @@ private async Task ProcessSingleMessage(string message)
415434 Log . Error ( $ "DatabentoRawClient.ProcessSingleMessage(): Error: { ex . Message } ") ;
416435 }
417436 }
437+
438+ /// <summary>
439+ /// Handles OHLCV messages and converts to LEAN TradeBar data
440+ /// </summary>
441+ private async Task HandleOHLCVMessage ( JsonElement root , JsonElement header )
442+ {
443+ await Task . CompletedTask ;
444+
445+ try
446+ {
447+ if ( ! header . TryGetProperty ( "ts_event" , out var tsElement ) ||
448+ ! header . TryGetProperty ( "instrument_id" , out var instIdElement ) )
449+ {
450+ return ;
451+ }
452+
453+ // Convert timestamp from nanoseconds to DateTime
454+ var timestampNs = long . Parse ( tsElement . GetString ( ) ! ) ;
455+ var unixEpoch = new DateTime ( 1970 , 1 , 1 , 0 , 0 , 0 , DateTimeKind . Utc ) ;
456+ var timestamp = unixEpoch . AddTicks ( timestampNs / 100 ) ;
457+
458+ var instrumentId = instIdElement . GetInt64 ( ) ;
459+
460+ if ( ! _instrumentIdToSymbol . TryGetValue ( instrumentId , out var matchedSymbol ) )
461+ {
462+ Log . Trace ( $ "DatabentoRawClient: No mapping for instrument_id { instrumentId } in OHLCV message.") ;
463+ return ;
464+ }
465+
466+ // Get the resolution for this symbol
467+ if ( ! _subscriptions . TryGetValue ( matchedSymbol , out var subscription ) )
468+ {
469+ return ;
470+ }
471+
472+ var resolution = subscription . Item1 ;
473+
474+ // Extract OHLCV data
475+ if ( root . TryGetProperty ( "open" , out var openElement ) &&
476+ root . TryGetProperty ( "high" , out var highElement ) &&
477+ root . TryGetProperty ( "low" , out var lowElement ) &&
478+ root . TryGetProperty ( "close" , out var closeElement ) &&
479+ root . TryGetProperty ( "volume" , out var volumeElement ) )
480+ {
481+ // Parse prices
482+ var openRaw = long . Parse ( openElement . GetString ( ) ! ) ;
483+ var highRaw = long . Parse ( highElement . GetString ( ) ! ) ;
484+ var lowRaw = long . Parse ( lowElement . GetString ( ) ! ) ;
485+ var closeRaw = long . Parse ( closeElement . GetString ( ) ! ) ;
486+ var volume = volumeElement . GetInt64 ( ) ;
487+
488+ var open = openRaw * PriceScaleFactor ;
489+ var high = highRaw * PriceScaleFactor ;
490+ var low = lowRaw * PriceScaleFactor ;
491+ var close = closeRaw * PriceScaleFactor ;
492+
493+ // Determine the period based on resolution
494+ TimeSpan period = resolution switch
495+ {
496+ Resolution . Second => TimeSpan . FromSeconds ( 1 ) ,
497+ Resolution . Minute => TimeSpan . FromMinutes ( 1 ) ,
498+ Resolution . Hour => TimeSpan . FromHours ( 1 ) ,
499+ Resolution . Daily => TimeSpan . FromDays ( 1 ) ,
500+ _ => TimeSpan . FromMinutes ( 1 )
501+ } ;
502+
503+ // Create TradeBar
504+ var tradeBar = new TradeBar (
505+ timestamp ,
506+ matchedSymbol ,
507+ open ,
508+ high ,
509+ low ,
510+ close ,
511+ volume ,
512+ period
513+ ) ;
514+
515+ Log . Trace ( $ "DatabentoRawClient: OHLCV bar: { matchedSymbol } O={ open } H={ high } L={ low } C={ close } V={ volume } at { timestamp } ") ;
516+ DataReceived ? . Invoke ( this , tradeBar ) ;
517+ }
518+ }
519+ catch ( Exception ex )
520+ {
521+ Log . Error ( $ "DatabentoRawClient.HandleOHLCVMessage(): Error: { ex . Message } ") ;
522+ }
523+ }
524+
525+ /// <summary>
526+ /// Handles MBP messages for quote ticks
527+ /// </summary>
528+ private async Task HandleMBPMessage ( JsonElement root , JsonElement header )
529+ {
530+ await Task . CompletedTask ;
531+
532+ try
533+ {
534+ if ( ! header . TryGetProperty ( "ts_event" , out var tsElement ) ||
535+ ! header . TryGetProperty ( "instrument_id" , out var instIdElement ) )
536+ {
537+ return ;
538+ }
539+
540+ // Convert timestamp from nanoseconds to DateTime
541+ var timestampNs = long . Parse ( tsElement . GetString ( ) ! ) ;
542+ var unixEpoch = new DateTime ( 1970 , 1 , 1 , 0 , 0 , 0 , DateTimeKind . Utc ) ;
543+ var timestamp = unixEpoch . AddTicks ( timestampNs / 100 ) ;
544+
545+ var instrumentId = instIdElement . GetInt64 ( ) ;
546+
547+ if ( ! _instrumentIdToSymbol . TryGetValue ( instrumentId , out var matchedSymbol ) )
548+ {
549+ Log . Trace ( $ "DatabentoRawClient: No mapping for instrument_id { instrumentId } in MBP message.") ;
550+ return ;
551+ }
552+
553+ // For MBP-1, bid/ask data is in the levels array at index 0
554+ if ( root . TryGetProperty ( "levels" , out var levelsElement ) &&
555+ levelsElement . GetArrayLength ( ) > 0 )
556+ {
557+ var level0 = levelsElement [ 0 ] ;
558+
559+ var quoteTick = new Tick
560+ {
561+ Symbol = matchedSymbol ,
562+ Time = timestamp ,
563+ TickType = TickType . Quote
564+ } ;
565+
566+ if ( level0 . TryGetProperty ( "ask_px" , out var askPxElement ) &&
567+ level0 . TryGetProperty ( "ask_sz" , out var askSzElement ) )
568+ {
569+ var askPriceRaw = long . Parse ( askPxElement . GetString ( ) ! ) ;
570+ quoteTick . AskPrice = askPriceRaw * PriceScaleFactor ;
571+ quoteTick . AskSize = askSzElement . GetInt32 ( ) ;
572+ }
573+
574+ if ( level0 . TryGetProperty ( "bid_px" , out var bidPxElement ) &&
575+ level0 . TryGetProperty ( "bid_sz" , out var bidSzElement ) )
576+ {
577+ var bidPriceRaw = long . Parse ( bidPxElement . GetString ( ) ! ) ;
578+ quoteTick . BidPrice = bidPriceRaw * PriceScaleFactor ;
579+ quoteTick . BidSize = bidSzElement . GetInt32 ( ) ;
580+ }
581+
582+ // Set the tick value to the mid price
583+ quoteTick . Value = ( quoteTick . BidPrice + quoteTick . AskPrice ) / 2 ;
584+
585+ // QuantConnect convention: Quote ticks should have zero Price and Quantity
586+ quoteTick . Quantity = 0 ;
587+
588+ Log . Trace ( $ "DatabentoRawClient: Quote tick: { matchedSymbol } Bid={ quoteTick . BidPrice } x{ quoteTick . BidSize } Ask={ quoteTick . AskPrice } x{ quoteTick . AskSize } ") ;
589+ DataReceived ? . Invoke ( this , quoteTick ) ;
590+ }
591+ }
592+ catch ( Exception ex )
593+ {
594+ Log . Error ( $ "DatabentoRawClient.HandleMBPMessage(): Error: { ex . Message } ") ;
595+ }
596+ }
418597
419598 /// <summary>
420- /// Handles trade messages and converts to LEAN Tick data
599+ /// Handles trade tick messages (actual executed transactions)
421600 /// </summary>
422- private async Task HandleTradeMessage ( JsonElement root , JsonElement header )
601+ private async Task HandleTradeTickMessage ( JsonElement root , JsonElement header )
423602 {
424603 await Task . CompletedTask ;
425604
@@ -440,74 +619,38 @@ private async Task HandleTradeMessage(JsonElement root, JsonElement header)
440619
441620 if ( ! _instrumentIdToSymbol . TryGetValue ( instrumentId , out var matchedSymbol ) )
442621 {
443- // Fallback for safety, though symbol mapping should be reliable
444- Log . Trace ( $ "DatabentoRawClient: No mapping for instrument_id { instrumentId } , attempting fallback.") ;
622+ Log . Trace ( $ "DatabentoRawClient: No mapping for instrument_id { instrumentId } in trade message.") ;
445623 return ;
446624 }
447625
448- // Extract trade data
449- if ( root . TryGetProperty ( "action" , out var actionElement ) &&
450- root . TryGetProperty ( "price" , out var priceElement ) &&
626+ if ( root . TryGetProperty ( "price" , out var priceElement ) &&
451627 root . TryGetProperty ( "size" , out var sizeElement ) )
452628 {
453- var action = actionElement . GetString ( ) ;
454629 var priceRaw = long . Parse ( priceElement . GetString ( ) ! ) ;
455630 var size = sizeElement . GetInt32 ( ) ;
456631 var price = priceRaw * PriceScaleFactor ;
457632
458- if ( action == "T" ) // Trade
459- {
460- var tradeTick = new Tick
461- {
462- Symbol = matchedSymbol ,
463- Time = timestamp ,
464- Value = price ,
465- Quantity = size ,
466- TickType = TickType . Trade
467- } ;
468- Log . Trace ( $ "DatabentoRawClient: Trade tick: { matchedSymbol } @ { price } x { size } at { timestamp } ") ;
469- DataReceived ? . Invoke ( this , tradeTick ) ;
470- }
471- else if ( action == "A" || action == "M" || action == "C" ) // Add, Modify, or Clear
633+ var tradeTick = new Tick
472634 {
473- if ( root . TryGetProperty ( "side" , out var sideElement ) )
474- {
475- var side = sideElement . GetString ( ) ;
476- var lastTick = _lastTicks . GetOrAdd ( matchedSymbol , symbol => new Tick { Symbol = symbol , TickType = TickType . Quote } ) ;
477-
478- lastTick . Time = timestamp ;
479-
480- if ( action == "C" ) // Clear
481- {
482- if ( side == "A" ) { lastTick . AskPrice = 0 ; lastTick . AskSize = 0 ; }
483- if ( side == "B" ) { lastTick . BidPrice = 0 ; lastTick . BidSize = 0 ; }
484- }
485- else // Add or Modify
486- {
487- if ( side == "A" ) // Ask
488- {
489- lastTick . AskPrice = price ;
490- lastTick . AskSize = size ;
491- }
492- else if ( side == "B" ) // Bid
493- {
494- lastTick . BidPrice = price ;
495- lastTick . BidSize = size ;
496- }
497- }
498-
499- // Create a clone to send to the aggregator
500- var quoteTick = ( Tick ) lastTick . Clone ( ) ;
501-
502- Log . Trace ( $ "DatabentoRawClient: Quote tick: { matchedSymbol } Bid={ quoteTick . BidPrice } x{ quoteTick . BidSize } Ask={ quoteTick . AskPrice } x{ quoteTick . AskSize } ") ;
503- DataReceived ? . Invoke ( this , quoteTick ) ;
504- }
505- }
635+ Symbol = matchedSymbol ,
636+ Time = timestamp ,
637+ Value = price ,
638+ Quantity = size ,
639+ TickType = TickType . Trade ,
640+ // Trade ticks should have zero bid/ask values
641+ BidPrice = 0 ,
642+ BidSize = 0 ,
643+ AskPrice = 0 ,
644+ AskSize = 0
645+ } ;
646+
647+ Log . Trace ( $ "DatabentoRawClient: Trade tick: { matchedSymbol } Price={ price } Quantity={ size } ") ;
648+ DataReceived ? . Invoke ( this , tradeTick ) ;
506649 }
507650 }
508651 catch ( Exception ex )
509652 {
510- Log . Error ( $ "DatabentoRawClient.HandleTradeMessage (): Error: { ex . Message } ") ;
653+ Log . Error ( $ "DatabentoRawClient.HandleTradeTickMessage (): Error: { ex . Message } ") ;
511654 }
512655 }
513656
0 commit comments