@@ -53,7 +53,7 @@ func NewDynamodbOnlineStore(project string, config *registry.RepoConfig, onlineS
5353 ctx := context .Background ()
5454 cfg , err := awsConfig .LoadDefaultConfig (ctx )
5555 if err != nil {
56- panic ( err )
56+ return nil , err
5757 }
5858 store .client = dynamodb .NewFromConfig (cfg )
5959
@@ -237,24 +237,64 @@ func (d *DynamodbOnlineStore) OnlineRead(ctx context.Context, entityKeys []*type
237237
238238 // process response from dynamodb
239239 for j := 0 ; j < batchSize ; j ++ {
240- entityId := Responses [j ]["entity_id" ].(* dtypes.AttributeValueMemberS ).Value
241- timestampString := Responses [j ]["event_ts" ].(* dtypes.AttributeValueMemberS ).Value
240+ entityIdAttr , ok := Responses [j ]["entity_id" ]
241+ if ! ok || entityIdAttr == nil {
242+ continue
243+ }
244+ entityIdMember , ok := entityIdAttr .(* dtypes.AttributeValueMemberS )
245+ if ! ok {
246+ return fmt .Errorf ("unexpected DynamoDB attribute type for 'entity_id' in table %s" , tableName )
247+ }
248+ entityId := entityIdMember .Value
249+
250+ tsAttr , ok := Responses [j ]["event_ts" ]
251+ if ! ok || tsAttr == nil {
252+ continue
253+ }
254+ tsMember , ok := tsAttr .(* dtypes.AttributeValueMemberS )
255+ if ! ok {
256+ return fmt .Errorf ("unexpected DynamoDB attribute type for 'event_ts' in table %s" , tableName )
257+ }
258+ timestampString := tsMember .Value
259+
242260 t , err := time .Parse ("2006-01-02 15:04:05-07:00" , timestampString )
243261 if err != nil {
244262 return err
245263 }
246264 timeStamp := timestamppb .New (t )
247265
248- featureValues := Responses [j ]["values" ].(* dtypes.AttributeValueMemberM ).Value
266+ rawValues , ok := Responses [j ]["values" ]
267+ if ! ok || rawValues == nil {
268+ continue
269+ }
270+ valuesMap , ok := rawValues .(* dtypes.AttributeValueMemberM )
271+ if ! ok {
272+ return fmt .Errorf ("unexpected DynamoDB attribute type for 'values' in table %s" , tableName )
273+ }
274+ featureValues := valuesMap .Value
249275 entityIndex := entityIndexMap [entityId ]
250276
251277 for _ , featureName := range featureNames {
252- featureValue := featureValues [featureName ].(* dtypes.AttributeValueMemberB ).Value
278+ featureIndex := featureNamesIndex [featureName ]
279+ rawVal , exists := featureValues [featureName ]
280+ if ! exists || rawVal == nil {
281+ mu .Lock ()
282+ results [entityIndex ][featureIndex ] = FeatureData {
283+ Reference : serving.FeatureReferenceV2 {FeatureViewName : featureViewName , FeatureName : featureName },
284+ Timestamp : timestamppb.Timestamp {Seconds : timeStamp .Seconds , Nanos : timeStamp .Nanos },
285+ Value : types.Value {Val : & types.Value_NullVal {NullVal : types .Null_NULL }},
286+ }
287+ mu .Unlock ()
288+ continue
289+ }
290+ memberB , ok := rawVal .(* dtypes.AttributeValueMemberB )
291+ if ! ok {
292+ return fmt .Errorf ("unexpected DynamoDB attribute type for feature %q in view %q" , featureName , featureViewName )
293+ }
253294 var value types.Value
254- if err := proto .Unmarshal (featureValue , & value ); err != nil {
295+ if err := proto .Unmarshal (memberB . Value , & value ); err != nil {
255296 return err
256297 }
257- featureIndex := featureNamesIndex [featureName ]
258298
259299 mu .Lock ()
260300 results [entityIndex ][featureIndex ] = FeatureData {Reference : serving.FeatureReferenceV2 {FeatureViewName : featureViewName , FeatureName : featureName },
0 commit comments