Skip to content

Commit 8eebad7

Browse files
feat: MongoDB offline store (#6138)
* feat: Add MongoDB offline store (ibis-based PIT join, v1 alpha) - MongoDBSource: DataSource backed by a MongoDB collection, schema sampled via \ aggregation (default N=100) - MongoDBOfflineStoreConfig: connection_string + default database - MongoDBOfflineStore: delegates to ibis PIT join engine via in-memory memtable approach - SavedDatasetMongoDBStorage: persist training datasets to MongoDB - _build_data_source_reader/_build_data_source_writer closures capture config (connection_string, database) for MongoDB access Signed-off-by: Casey Clements <casey.clements@mongodb.com> * refactor: improve MongoDB offline store code quality - Update copyright headers to 2026 - Move mongodb_to_feast_value_type to feast/type_map.py, consistent with pg_type_to_feast_value_type and cb_columnar_type_to_feast_value_type - Add docstrings to MongoDBOptions.to_proto/from_proto, MongoDBSource class, and get_table_column_names_and_types - Replace dead 'assert name' with cast(str, ...) for type-checker safety - Add explanatory comment to validate() stub - Remove module-level warnings.simplefilter('once', RuntimeWarning), which was a process-wide side effect; per-call warnings.warn is enough - Convert all assert isinstance(data_source, MongoDBSource) guards to ValueError with descriptive messages in both public API methods and the reader/writer closures - Fix bug: add tz_aware=True to MongoClient in the writer closure, matching the reader, to ensure consistent timezone-aware datetime handling across read and write paths Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Started work on full Mongo/MQL implementation. Kept MongoDBOfflineStoreIbis and MongoDBOfflineStoreNative Signed-off-by: Casey Clements <casey.clements@mongodb.com> * refactor: rename alpha to preview, clarify MQL pipeline comments Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Added unit tests for offline store retrieval, requiring docker and pymongo, skipping as natural. Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Added test of multiple feature views and compound join keys Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Initial implementation of native single-collection offline store Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Added DriverInfo to MongoDBClients Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Optimized MQL. Applied FV-level TTL Signed-off-by: Casey Clements <casey.clements@mongodb.com> * filter TTL by relevant FVs only, cautiously reset df index; add created_at tie-breaker in sort Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Updated docstrings Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Lazy index creation via _get_client_and_ensure_indexes Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Add performance benchmarks comparing Ibis vs Native MongoDB offline stores Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Refactor Native get_historical_features: replace with fetch+pandas join - Eliminate -based PIT join which scaled poorly (O(n×m)) - Use single query to fetch all matching feature data - Batch entity_ids into chunks of 1000 for large queries - Flatten features subdoc with pd.json_normalize - Apply pd.merge_asof for efficient PIT join per FeatureView - Handle TTL filtering in pandas instead of MQL \ - Remove unused _ttl_to_ms and _build_ttl_gte_expr helpers Performance improvement: - Before: 10k rows in ~188s (53 rows/s) - After: 10k rows in ~7.4s (1,354 rows/s) - Now competitive with Ibis implementation Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Refactor get_historical_features with chunked processing for large entity_df - Add CHUNK_SIZE (5000) for entity_df processing to bound memory usage - Extract _run_single helper function for processing each chunk - Add _chunk_dataframe generator for yielding DataFrame slices - Preserve original row ordering via _row_idx column - Exclude internal columns (prefixed with _) from entity key serialization - Concat chunk results and restore ordering at the end This allows processing arbitrarily large entity_df while keeping memory bounded by processing in 5000-row chunks. Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Optimize Native get_historical_features: reuse client, increase batch sizes Performance optimizations: - Reuse MongoClient across chunks (was creating new client per chunk) - Increase CHUNK_SIZE from 5,000 to 50,000 rows - Increase MONGO_BATCH_SIZE from 1,000 to 10,000 entity_ids - Pass collection to _run_single instead of creating client each time - Make index creation idempotent (check for existing index) Results (100k rows): - Before: 21.7s - After: 5.2s (4.2x faster) Results (1M rows): - Before: 1664s (28 min) - After: 212s (3.5 min) (7.8x faster) Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Remove duplicate MongoDBOfflineStoreNative from mongodb.py The Native implementation now lives exclusively in mongodb_native.py with the single-collection schema. This removes the confusing duplicate that used the Ibis collection-per-FV schema. Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Consolidate mongodb_source.py into mongodb.py - Move MongoDBSource, MongoDBOptions, SavedDatasetMongoDBStorage into mongodb.py - Move _infer_python_type_str helper into mongodb.py - Update imports in tests and benchmarks - Remove mongodb_source.py This consolidates the collection-per-FV implementation into a single file, making the codebase easier to navigate. Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Rename mongodb_offline_store to mongodb, use One/Many naming convention - Rename module: mongodb_offline_store/ → mongodb/ - Rename files: mongodb.py → mongodb_many.py, mongodb_native.py → mongodb_one.py Class renames: - MongoDBSource → MongoDBSourceMany - MongoDBOptions → MongoDBOptionsMany - SavedDatasetMongoDBStorage → SavedDatasetMongoDBStorageMany - MongoDBOfflineStoreIbis → MongoDBOfflineStoreMany - MongoDBOfflineStoreIbisConfig → MongoDBOfflineStoreManyConfig - MongoDBSourceNative → MongoDBSourceOne - MongoDBOfflineStoreNative → MongoDBOfflineStoreOne - MongoDBOfflineStoreNativeConfig → MongoDBOfflineStoreOneConfig - MongoDBNativeRetrievalJob → MongoDBOneRetrievalJob The One/Many naming reflects the core architectural difference: - One: Single shared collection for all FeatureViews - Many: One collection per FeatureView Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Add README.md documenting MongoDB offline store implementations Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Rename mongodb/ to mongodb_offline_store/, organize tests - Rename module: mongodb/ → mongodb_offline_store/ (follows naming convention) - Move tests to mongodb_offline_store/ subdirectory: - test_mongodb_offline_retrieval.py → mongodb_offline_store/test_many.py - test_mongodb_offline_retrieval_native.py → mongodb_offline_store/test_one.py - benchmark_mongodb_offline_stores.py → mongodb_offline_store/benchmark.py - Update all imports to use mongodb_offline_store path Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Update docstring in benchmark.py Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Update README to show created_at tie-breaker in Many schema Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Update README index recommendations for Many implementation - Clarify that indexes should be on join keys + timestamp - Show example for compound join keys - Note that Many does not auto-create indexes Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Add auto-create index to MongoDBOfflineStoreMany - Add _ensure_index_many() function with module-level cache - Call during pull_latest_from_table_or_query (materialization) - Creates index on join_keys + timestamp + created_timestamp - Checks for existing index before creating - Update README to reflect auto-create behavior Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Update benchmark.py to use One/Many naming convention - Rename functions: _generate_ibis_data → _generate_many_data, etc. - Rename fixtures: ibis_config → many_config, native_config → one_config - Rename tests: test_scale_rows_ibis → test_scale_rows_many, etc. - Update all docstrings and print statements - Update summary comparison output format Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Add comprehensive module docstring to mongodb_many.py Documents: - Collection structure (one per FeatureView) - Index creation (auto-created during materialization) - Document schema (flat, top-level features) - Point-in-time join strategy (Ibis memtables) - Performance characteristics and memory considerations - When to use vs MongoDBOfflineStoreOne - Comparison table with One implementation Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Add Feature Freshness and Schema Evolution docs to mongodb_many.py Add missing documentation sections: - Feature Freshness Semantics: document-level freshness, not per-feature - Schema Evolution ('Feature Creep'): flexible schema implications - Notes: entity keys as native types, PIT correctness, TTL constraints Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Add MongoDB DataSourceCreators for universal Feast tests Add DataSourceCreator implementations for MongoDB offline stores: - MongoDBManyDataSourceCreator: Fully functional, passes universal tests. Creates one collection per FeatureView with flat document schema. - MongoDBOneDataSourceCreator: Implementation exists but NOT registered. The One schema requires knowing join keys vs features at data creation time, but DataSourceCreator.create_data_source() doesn't receive entity definitions. See TODO in mongodb.py for details on required interface changes. Other changes: - Fix data_source_class_type path in mongodb_one.py (mongodb_native -> mongodb_one) - Improve datetime handling in mongodb_one.py for non-datetime columns - Add 'mongodb' marker to pytest.ini - Register MongoDBManyDataSourceCreator in repo_configuration.py Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Add .secrets.baseline Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Addressed PR comment: join_keys = get_expected_join_keys(project, feature_views, registry) Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Adds tests scenario that not using offline_utils.get_expected_join_keys would get wrong. Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Tests revealed possible name collision in pandas.merge_asof Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Add further (Large) benchmark tests Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Upgrades from Devin comments. Class cache _index_initialized; get_expected_join_keys outside run_single Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Addressed PR comments Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Apply lower bound via max(TTL) when all feature viewws in a chunk have one Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Add created_at to compound index so that materialization is correct if erroneuos data has been corrected. Remember, OfflineStores are append only. Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Handdle numpy scalers in _serialize_entity_key_from_row as suggested. Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Add persist and tests Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Remove accidentally included design notes. Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Fix entity key serialization: per-FV join key types and numpy 2.0 compat _serialize_entity_key_from_row had two bugs: 1. numpy 2.0 broke the scalar unwrap: np.int64 no longer inherits from Python int, so `isinstance(value, (int, float)) and hasattr(value, "item")` never fired. All int entity keys were silently stored as their string representation, causing every MongoDB lookup to miss. Fixed by checking only `hasattr(value, "item")`. 2. INT32 vs INT64 mismatch: Python int always mapped to int64_val, but entities declared with value_type=INT32 are stored using int32_val (4 bytes vs 8 bytes). The serialized bytes never matched, returning NULL for all features. Fixed by passing declared ValueType per join key (derived from FeatureView.entity_columns, no registry required) and using the correct proto field in a new `if declared_type is not None` branch. Also adds test_int32_entity_key to pin the INT32 round-trip contract, and updates _make_entity_id to accept an optional value_types map so tests can write documents with the correct proto field for non-INT64 entities. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Add offline_write_batch to MongoDBOfflineStoreOne Implements the missing write path so that FeatureStore.write_to_offline_store() and the Arrow Flight offline server no longer raise NotImplementedError. Each row in the input pyarrow.Table is stored as one document in the single-collection schema (entity_id, feature_view, features, event_timestamp, created_at). Entity keys are serialized using the same type-aware _serialize_entity_key_from_row introduced in the previous commit, so bytes written here are guaranteed to match bytes produced by get_historical_features. Documents are appended (not upserted): multiple observations at the same (entity, feature_view, event_timestamp) are permitted and handled by the pull_latest_from_table_or_query $first/$created_at sort, which supports data corrections written with a later created_at. Adds test_offline_write_batch_round_trip which verifies both the document structure in the collection and the full write→read round-trip through get_historical_features. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Casey Clements <casey.clements@mongodb.com> * mongodb_one: clarify pipeline sort rationale and avoid sparse-column expansion Replace four TODO annotations in MongoDBOfflineStoreOne with accurate explanations and, where appropriate, a code fix: * pull_latest_from_table_or_query – \ stage: document why entity_id leads the sort key. The compound index (entity_id, feature_view, event_timestamp DESC, created_at DESC) can back the entire stage only when entity_id is first. Co-locating documents for the same entity also ensures \ picks the correct (most-recent event_timestamp, latest created_at) document. * get_historical_features – features expansion: replace pd.json_normalize + pd.concat with a targeted per-feature extraction. json_normalize expands every key ever present in any document; with schema evolution that produces many sparse columns and wastes memory. The new approach calls dict.get() for each requested feature only, keeping the DataFrame narrow regardless of how many unrelated feature keys exist in the store. * get_historical_features – fv_df sort: document that merge_asof requires the right-hand frame to be sorted by its join key before the column rename to _fv_ts. * get_historical_features – fv_df_subset copy: document that .copy() is required because column-selection on a DataFrame returns a view; calling rename() on a view raises SettingWithCopyWarning and can silently mutate the source frame. All 27 MongoDB unit tests continue to pass. Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Add mongodb_native.py: initial MQL-based offline store (pre-refactor snapshot) Ports the native MongoDB offline store from commit a1e3c93 onto the feature branch as a baseline before the Atlas-first refactor. Uses a temp-collection + $lookup strategy for get_historical_features. Known issues (documented in design/): union join-key serialization, single entity_id across all FVs, _row_idx/iloc mismatch, missing created_at tiebreaker, and no offline_write_batch. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Refactor mongodb_native: Atlas-first $documents+$lookup PIT join Replace the temp-collection + single $lookup approach with a pure-MQL $documents + chained $lookup pipeline that runs entirely on Atlas. Correctness fixes ported from mongodb_one.py: - _serialize_entity_key_from_row: add join_key_types param for INT32 vs INT64 correctness; unwrap numpy scalars for numpy 2.0 compatibility - Per-FV join key derivation from fv.entity_columns — union keys produce bytes that never match stored documents when FVs have different entities - reset_index(drop=True) before building $documents; _row_idx is then a safe iloc index regardless of the caller's DataFrame index - created_at tiebreaker in $lookup sort for data corrections Architecture changes: - $documents injects entity_df chunk as a virtual collection; no temp collection is created, no orphan risk on crash - One $lookup per FV with let:{eid: $_entity_ids.<fv>, ts: $_ts}; each FV matches its own entity key bytes — solves the single-entity_id bug - TTL expressed inline per FV inside its $lookup subpipeline (no $switch) - $project {features:1} inside subpipeline + final $project drop _entity_ids to minimise data transferred from Atlas - allowDiskUse=True on all aggregate() calls - 4-key compound index adds created_at for correction tiebreaking - Module-level _indexes_ensured set (vs class-level bool) for correctness across multiple RepoConfigs in the same process Parity with mongodb_one.py: - offline_write_batch implemented - SavedDatasetMongoDBStorageNative + persist() implemented - RetrievalJob carries config Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Add unit tests for MongoDBOfflineStoreNative Mirrors test_one.py for the $documents + $lookup implementation. Includes test_heterogeneous_join_keys which specifically exercises the per-FV entity key fix (driver_stats and customer_stats keyed by different join keys in the same entity_df). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Add cross-implementation equivalence suite (test_cross.py) Seeds the same logical data in both storage schemas (single-collection for one/native, per-FV-collection for many) and asserts all three implementations return identical feature values for six scenarios: PIT join, TTL filtering, multi-FV join, overlapping feature names (full_feature_names=True), compound join keys, and extra label columns in entity_df. Also adds overlapping-feature-name coverage to the 'many' implementation, which was missing this case in its individual test suite. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Add benchmark_sweep.py: four-dimensional scaling suite across all three implementations Covers N (entity count), M (feature width), P (observation depth), K (feature view fan-out) as independent dimensions with smoke/stress/OOM-guard tiers. Side-by-side table printed after each parametrize run; results appended to benchmark_results.csv. Replaces the one/many-only benchmark.py. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Add mongodb_agg offline store — $match+$sort+$group, O(log P) without $lookup/$expr mongodb_agg.py implements a fourth offline store that avoids the $expr+$lookup COLLSCAN limitation of mongodb_native. Uses a compound-index-backed $match→$sort→$group pipeline (scoring path) or $match+merge_asof (training path). K-collapse batches FVs sharing the same join key into a single round-trip. Confirmed properties: O(log P) (7% over 20× P on Atlas), ~2 MB memory at N=500/P=5, K-collapse reduces fan-out cost vs native (2.8× at K=3 vs native's 5×). Also: - benchmark_sweep.py: add agg as fourth column, support MONGODB_URI env var for running against an external cluster without a testcontainer - design/benchmark-sweep-findings.md: add agg implementation section, K-sweep results including bug-fixed K=3, updated summary table Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Vectorize agg scoring path, add upfront index build, ignore design/ mongodb_agg.py: - Replace O(N×M) apply(axis=1) scoring-path join with a vectorized merge + boolean mask — 4–6× faster at stress scale (N=50K: 209s → 28s) - Expand features dict via pd.json_normalize instead of per-feature apply - Both changes confirmed correct: K smoke tests pass, stress tier passes benchmark_sweep.py: - _reset_and_seed now creates the compound index synchronously after bulk insert so queries never race against a background index build - Added _COMPOUND_IDX constant shared by the seeding step Stress tier results (N=200K, M=100): agg: 139s, 955 MB trace (fastest + lowest memory) native: 171s, 3208 MB trace one: 331s, 4044 MB trace many: SKIPPED (13 GB projected) .gitignore: add design/ exclusion Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Adds offline_write_batch Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Adds detail to handling of K in benchmarks. Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Adds missing typing. Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Consolidate MongoDB offline store to single implementation Replace the four-implementation experiment (native, one, many, agg) with a single production-ready mongodb.py. Key changes: - Rename mongodb_agg.py → mongodb.py; rename all public classes to drop the Agg suffix (MongoDBOfflineStore, MongoDBSource, MongoDBOfflineStoreConfig) - Add strict_pit flag to get_historical_features (False = real-time inference) - Fix INT32 entity key serialisation (was always writing int64_val) - Add on_demand_feature_views property to MongoDBRetrievalJob - Expose _CHUNK_SIZE / _MONGO_BATCH_SIZE at module level for testability - Update persist() to accept SavedDatasetFileStorage natively - Replace test_one/many/native/cross + benchmarks with test_mongodb.py (17 tests including strict_pit, chunk-boundary, and K-collapse coverage) - Wire MongoDBDataSourceCreator into repo_configuration; implement create_saved_dataset_destination via SavedDatasetFileStorage - Remove design/ from .gitignore Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Fixes strict_pit_false unit test. Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Fix MongoDB offline store: projection keying, TTL bounds, field mapping, and schema inference - Key get_historical_features internals by projection name (fv.projection.name_to_use()) instead of fv.name, fixing entity mapping for multi-projection feature views - Use min_ts - fv.ttl for TTL lower bound instead of max_ts - fv.ttl, so documents needed for early entity rows in a chunk are not excluded - Replace pd.json_normalize with manual dict extraction to preserve complex types (Map/Struct/Array) and apply reverse field_mapping for source column lookup - Add get_table_column_names_and_types to MongoDBSource for feast apply support - Use batch_source.feature_view_name in offline_write_batch so pushed data lands in the same collection partition as initial ingest Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Fix MongoDB test DataSourceCreator: implement create_logged_features_destination Return FileLoggingDestination instead of raising NotImplementedError, unblocking feature logging tests. Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Fix pd.isna() ValueError on list/array features in offline_write_batch pd.isna() on list/array values returns a numpy array instead of a scalar bool, causing 'truth value of an array is ambiguous' ValueError. Guard with try/except and isinstance check to handle non-scalar types. Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Fix bool/int type inference order in get_table_column_names_and_types Check isinstance(v, bool) before isinstance(v, int) since bool is a subclass of int in Python. Without this, boolean features are always inferred as int64. Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Fix mongodb_to_feast_value_type to accept type strings from get_table_column_names_and_types get_table_column_names_and_types produces 'int64', 'float64', 'string', 'list', 'dict' but mongodb_to_feast_value_type only accepted 'int', 'float', 'str'. Add aliases so schema inference works correctly for MongoDBSource feature views. Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Sort join keys in _serialize_entity_key_from_row for consistent entity_id bytes get_historical_features sorts join keys before serialization, but _serialize_entity_key_from_row (used by offline_write_batch) did not. For compound keys in non-alphabetical order, this produced different entity_id bytes, causing written features to be unmatched on read. Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Resolve .secrets.baseline merge conflict with master Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Add mongodb to CI extras so pymongo is installed in CI The mongodb optional dependency group was missing from the ci extras list in pyproject.toml. This meant pymongo was never installed in CI, causing all MongoDB offline store tests to be silently skipped. Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Remove MongoDB from universal test parametrization The MongoDB offline store's "One" (single-collection) design is incompatible with the universal test suite's DataSourceCreator contract, which does not provide join-key information at data creation time. Remove the AVAILABLE_OFFLINE_STORES registration so universal tests are not parametrized with MongoDB. The 17 dedicated unit tests in test_mongodb.py continue to run and validate the One design. Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Regenerate pixi lockfile after pymongo addition to ci extras Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Fix scoring_path heuristic: check entity uniqueness per-FV, not globally The scoring_path detection checked entity uniqueness against the union of all feature view join keys. When FVs have different join key sets (e.g. FV_A on user_id, FV_B on (user_id, device_id)), entity_df can be unique on the union but have duplicate entity_ids for FVs with fewer keys. The $group stage would then discard valid older documents, causing silent NULL results. Move the scoring_path decision inside the per-FV loop, checking uniqueness on each FV's serialized entity_id column. Each FV now independently picks the scoring or training path based on its own key cardinality. Add test_mixed_join_key_cardinality covering the exact scenario. Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Fix offline_write_batch: use original join key names for entity serialization offline_write_batch used mapped (aliased) join key names when serializing entity keys, while get_historical_features uses original names. When a join_key_map is configured, the entity_id bytes would differ between write and read, causing all features to silently return NULL. Use ec.name (original) instead of the mapped name so bytes match. Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Fix pull_latest and pull_all to return join key columns pull_latest_from_table_or_query and pull_all_from_table_or_query only returned serialized entity_id bytes, event_timestamp, and feature columns — but not the actual join key columns (e.g. driver_id) that the base class contract requires. Add _expand_entity_id_column helper that deserializes entity_id bytes back into individual join key columns using deserialize_entity_key. Both methods now call this helper before returning the DataFrame. Add assertions in existing pull_latest and pull_all tests to verify join key columns are present in the output. Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Fix scoring_path: require homogeneous timestamps to prevent data loss The scoring path ($group $first) selects the latest doc per entity up to max_ts — the maximum request timestamp across ALL entities in the chunk. When entities have different request timestamps, $group may pick a doc that post-dates a specific entity's request time. The Python future_mask would null it, but the valid older doc was already discarded by $group. Add a second condition: scoring_path is only used when all entity request timestamps in the chunk are identical, which is the common real-time scoring case (all requests at 'now'). When timestamps differ, fall back to the training path (merge_asof) which handles per-row PIT correctness. Add test_heterogeneous_timestamps_fall_back_to_training_path. Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Fix training path: sort fv_df by created_at to break event_timestamp ties The training path (merge_asof) sorted fv_df only by event_timestamp. When multiple docs for the same entity share the same event_timestamp but differ in created_at, merge_asof picks the last row in sorted order. Without a created_at sort, that order depends on MongoDB's undefined document return order — making the result non-deterministic. Sort by [event_timestamp, created_at] so merge_asof consistently picks the doc with the highest created_at among ties, matching the scoring path's behavior ($sort + $group $first). Add test_training_path_created_at_tiebreaker. Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Clean up stale docstrings: remove references to MongoDBOfflineStoreOne/Agg Remove leftover references to earlier naming iterations (MongoDBSourceOne, 'agg offline store', 'Improves on MongoDBOfflineStoreOne') from class and method docstrings. Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Clean up stale docstrings: remove references to MongoDBOfflineStoreOne/Agg Remove leftover references to earlier naming iterations (MongoDBSourceOne, 'agg offline store', 'Improves on MongoDBOfflineStoreOne') from class and method docstrings. Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Added driver metadata to clients Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Update .secrets.baseline Signed-off-by: Casey Clements <casey.clements@mongodb.com> * Remove preview warnings from MongoDB offline store The offline store is GA. Remove the three RuntimeWarning calls in pull_latest_from_table_or_query, pull_all_from_table_or_query, and get_historical_features, along with the unused warnings import. Signed-off-by: Casey Clements <casey.clements@mongodb.com> --------- Signed-off-by: Casey Clements <casey.clements@mongodb.com> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent bd45c68 commit 8eebad7

26 files changed

Lines changed: 10571 additions & 6783 deletions

.secrets.baseline

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1539,5 +1539,5 @@
15391539
}
15401540
]
15411541
},
1542-
"generated_at": "2026-04-17T13:31:24Z"
1542+
"generated_at": "2026-04-30T13:56:37Z"
15431543
}

pixi.lock

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ test = [
153153
]
154154

155155
ci = [
156-
"feast[test, aws, azure, cassandra, clickhouse, couchbase, delta, docling, duckdb, elasticsearch, faiss, gcp, ge, go, grpcio, hazelcast, hbase, ibis, image, k8s, mcp, milvus, mssql, mysql, openlineage, opentelemetry, oracle, spark, trino, postgres, pytorch, qdrant, rag, ray, redis, singlestore, snowflake, sqlite_vec]",
156+
"feast[test, aws, azure, cassandra, clickhouse, couchbase, delta, docling, duckdb, elasticsearch, faiss, gcp, ge, go, grpcio, hazelcast, hbase, ibis, image, k8s, mcp, milvus, mongodb, mssql, mysql, openlineage, opentelemetry, oracle, spark, trino, postgres, pytorch, qdrant, rag, ray, redis, singlestore, snowflake, sqlite_vec]",
157157
"build",
158158
"virtualenv==20.23.0",
159159
"dbt-artifacts-parser",
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# MongoDB Offline Store
2+
3+
This offline store lets you train models and run batch scoring directly from it.
4+
All feature views share a single collection (`feature_history`). Reads use
5+
MongoDB aggregation pipelines with a compound index, so per-entity cost is
6+
O(log n_observations) regardless of collection size, and K feature views with the same
7+
entity key collapse into one round-trip instead of K (1 if your data shares a unique id.)
8+
9+
## Schema
10+
11+
All feature views share one collection (default: `feature_history`), discriminated by the `feature_view` field.
12+
13+
```javascript
14+
// Collection: feature_history
15+
{
16+
"entity_id": Binary("..."), // Serialized entity key (bytes)
17+
"feature_view": "driver_stats", // Discriminator
18+
"features": { // Nested subdocument
19+
"trips_today": 5,
20+
"rating": 4.8
21+
},
22+
"event_timestamp": ISODate("2024-01-15T10:00:00Z"),
23+
"created_at": ISODate("2024-01-15T10:00:01Z")
24+
}
25+
```
26+
## Index
27+
28+
The store creates one compound index lazily on first use. This index supports every query issued..
29+
30+
```javascript
31+
db.feature_history.createIndex({
32+
"entity_id": 1,
33+
"feature_view": 1,
34+
"event_timestamp": -1,
35+
"created_at": -1
36+
})
37+
38+
```
39+
## Configuration
40+
41+
```yaml
42+
offline_store:
43+
type: feast.infra.offline_stores.contrib.mongodb_offline_store.mongodb.MongoDBOfflineStore
44+
connection_string: mongodb://localhost:27017
45+
database: feast
46+
collection: feature_history # optional, default: feature_history
47+
```
48+
49+
## Key Features
50+
51+
**Query-collapse** — Feature views that share the same join key set are grouped into a single MongoDB aggregation round-trip instead of one per feature view. Reduces round-trips from K to the number of unique join key signatures, often one.
52+
53+
**Scoring path** — When `entity_df` contains unique entity IDs, a `$match + $sort + $group` pipeline performs server-side deduplication returning at most one document per `(entity_id, feature_view)`. The compound index makes per-entity cost O(log n_obs).
54+
55+
**Training path** — When `entity_df` contains repeated entity IDs at different timestamps, the `$group` stage is omitted and `pandas.merge_asof` performs per-row point-in-time joins optimized in C.
56+
57+
**`strict_pit`** — `get_historical_features` accepts a `strict_pit` keyword argument (default `True`). With `strict_pit=True` (default, safe for training), documents whose timestamp is strictly after the entity request timestamp are returned as `NULL`. Set `strict_pit=False` for real-time inference where you always want the most recent observation.
58+
59+
60+
## Writing Data
61+
62+
Use `offline_write_batch` (called automatically by `feast materialize`) to write feature observations:
63+
64+
```python
65+
store.write_to_offline_store(feature_view_name, df)
66+
```
67+
68+
Documents are appended; `pull_latest` and the scoring path select the highest `created_at` at read time.
69+
70+
## Memory Behaviour
71+
72+
The store filters by entity key in `$match` rather than loading the entire collection. Memory usage is bounded by the number of unique entity IDs × documents per entity, not the total collection size.
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
import feast.version
2+
3+
try:
4+
from pymongo.driver_info import DriverInfo
5+
6+
DRIVER_METADATA = DriverInfo(name="Feast", version=feast.version.get_version())
7+
except ImportError:
8+
DRIVER_METADATA = None # type: ignore[assignment]

0 commit comments

Comments
 (0)