diff --git a/packages/bigframes/bigframes/session/__init__.py b/packages/bigframes/bigframes/session/__init__.py index 05d979b4eb9c..40739c876a49 100644 --- a/packages/bigframes/bigframes/session/__init__.py +++ b/packages/bigframes/bigframes/session/__init__.py @@ -80,7 +80,7 @@ from bigframes import version from bigframes.core import blocks, utils from bigframes.core.logging import log_adapter -from bigframes.session import bigquery_session, bq_caching_executor, executor +from bigframes.session import bigquery_session, executor, proxy_executor # Avoid circular imports. if typing.TYPE_CHECKING: @@ -316,7 +316,7 @@ def __init__( if not self._strictly_ordered: labels["bigframes-mode"] = "unordered" - self._executor: executor.Executor = bq_caching_executor.BigQueryCachingExecutor( + self._executor: executor.Executor = proxy_executor.DualCompilerProxyExecutor( bqclient=self._clients_provider.bqclient, bqstoragereadclient=self._clients_provider.bqstoragereadclient, loader=self._loader, diff --git a/packages/bigframes/bigframes/session/bq_caching_executor.py b/packages/bigframes/bigframes/session/bq_caching_executor.py index e8d01c8e960b..dd09997a8b39 100644 --- a/packages/bigframes/bigframes/session/bq_caching_executor.py +++ b/packages/bigframes/bigframes/session/bq_caching_executor.py @@ -17,15 +17,12 @@ import concurrent.futures import math import threading -import uuid -import warnings from typing import Literal, Mapping, Optional, Sequence, Tuple import google.api_core.exceptions import google.cloud.bigquery.job as bq_job import google.cloud.bigquery.table as bq_table import google.cloud.bigquery_storage_v1 -import google.cloud.exceptions from google.cloud import bigquery import bigframes @@ -84,10 +81,14 @@ def __init__( enable_polars_execution: bool = False, publisher: bigframes.core.events.Publisher, labels: Mapping[str, str] = {}, + compiler_name: Literal["ibis", "sqlglot"] = "ibis", + cache: Optional[execution_cache.ExecutionCache] = None, ): self.bqclient = bqclient self.storage_manager = storage_manager - self.cache: execution_cache.ExecutionCache = execution_cache.ExecutionCache() + self.cache: execution_cache.ExecutionCache = ( + cache or execution_cache.ExecutionCache() + ) self.metrics = metrics self.loader = loader self.bqstoragereadclient = bqstoragereadclient @@ -111,6 +112,7 @@ def __init__( polars_executor.PolarsExecutor(), ) self._upload_lock = threading.Lock() + self._compiler_name = compiler_name def to_sql( self, @@ -127,7 +129,10 @@ def to_sql( else array_value.node ) node = self._substitute_large_local_sources(node) - compiled = self._compile(node, ordered=ordered) + compiled = compile.compile_sql( + compile.CompileRequest(node, sort_rows=ordered), + compiler_name=self._compiler_name, + ) return compiled.sql def execute( @@ -158,7 +163,11 @@ def execute( "Ordering and peeking not supported for gbq export" ) # separate path for export_gbq, as it has all sorts of annoying logic, such as possibly running as dml - result = self._export_gbq(array_value, execution_spec.destination_spec) + result = self._export_gbq( + array_value, + execution_spec.destination_spec, + extra_labels=dict(execution_spec.labels), + ) self._publisher.publish( bigframes.core.events.ExecutionFinished( result=result, @@ -174,6 +183,7 @@ def execute( if isinstance(execution_spec.destination_spec, ex_spec.CacheSpec) else None, must_create_table=not execution_spec.promise_under_10gb, + extra_labels=dict(execution_spec.labels), ) # post steps: export if isinstance(execution_spec.destination_spec, ex_spec.GcsOutputSpec): @@ -233,7 +243,10 @@ def _maybe_find_existing_table( return None def _export_gbq( - self, array_value: bigframes.core.ArrayValue, spec: ex_spec.TableOutputSpec + self, + array_value: bigframes.core.ArrayValue, + spec: ex_spec.TableOutputSpec, + extra_labels: Mapping[str, str] = {}, ) -> executor.ExecuteResult: """ Export the ArrayValue to an existing BigQuery table. @@ -243,55 +256,48 @@ def _export_gbq( # validate destination table existing_table = self._maybe_find_existing_table(spec) - def run_with_compiler(compiler_name, compiler_id=None): - compiled = self._compile(plan, ordered=False, compiler_name=compiler_name) - sql = compiled.sql + compiled = compile.compile_sql( + compile.CompileRequest(plan, sort_rows=False), + compiler_name=self._compiler_name, + ) + sql = compiled.sql - if (existing_table is not None) and _is_schema_match( - existing_table.schema, array_value.schema - ): - # b/409086472: Uses DML for table appends and replacements to avoid - # BigQuery `RATE_LIMIT_EXCEEDED` errors, as per quota limits: - # https://cloud.google.com/bigquery/quotas#standard_tables - job_config = bigquery.QueryJobConfig() - - ir = sqlglot_ir.SQLGlotIR.from_unparsed_query(sql) - if spec.if_exists == "append": - sql = sg_sql.to_sql( - sg_sql.insert(ir.expr.as_select_all(), spec.table) - ) - else: # for "replace" - assert spec.if_exists == "replace" - sql = sg_sql.to_sql( - sg_sql.replace(ir.expr.as_select_all(), spec.table) - ) - else: - dispositions = { - "fail": bigquery.WriteDisposition.WRITE_EMPTY, - "replace": bigquery.WriteDisposition.WRITE_TRUNCATE, - "append": bigquery.WriteDisposition.WRITE_APPEND, - } - job_config = bigquery.QueryJobConfig( - write_disposition=dispositions[spec.if_exists], - destination=spec.table, - clustering_fields=spec.cluster_cols if spec.cluster_cols else None, - ) + if (existing_table is not None) and _is_schema_match( + existing_table.schema, array_value.schema + ): + # b/409086472: Uses DML for table appends and replacements to avoid + # BigQuery `RATE_LIMIT_EXCEEDED` errors, as per quota limits: + # https://cloud.google.com/bigquery/quotas#standard_tables + job_config = bigquery.QueryJobConfig() - # Attach data type usage to the job labels - job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs - job_config.labels["bigframes-compiler"] = ( - f"{compiler_name}-{compiler_id}" if compiler_id else compiler_name - ) - # TODO(swast): plumb through the api_name of the user-facing api that - # caused this query. - iterator, job = self._run_execute_query( - sql=sql, - job_config=job_config, - session=array_value.session, + ir = sqlglot_ir.SQLGlotIR.from_unparsed_query(sql) + if spec.if_exists == "append": + sql = sg_sql.to_sql(sg_sql.insert(ir.expr.as_select_all(), spec.table)) + else: # for "replace" + assert spec.if_exists == "replace" + sql = sg_sql.to_sql(sg_sql.replace(ir.expr.as_select_all(), spec.table)) + else: + dispositions = { + "fail": bigquery.WriteDisposition.WRITE_EMPTY, + "replace": bigquery.WriteDisposition.WRITE_TRUNCATE, + "append": bigquery.WriteDisposition.WRITE_APPEND, + } + job_config = bigquery.QueryJobConfig( + write_disposition=dispositions[spec.if_exists], + destination=spec.table, + clustering_fields=spec.cluster_cols if spec.cluster_cols else None, ) - return iterator, job - iterator, job = self._compile_with_fallback(run_with_compiler) + # Attach data type usage to the job labels + job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs + # TODO(swast): plumb through the api_name of the user-facing api that + # caused this query. + iterator, job = self._run_execute_query( + sql=sql, + job_config=job_config, + session=array_value.session, + extra_labels=extra_labels, + ) has_special_dtype_col = any( t in (bigframes.dtypes.TIMEDELTA_DTYPE, bigframes.dtypes.OBJ_REF_DTYPE) @@ -359,6 +365,7 @@ def _run_execute_query( job_config: Optional[bq_job.QueryJobConfig] = None, query_with_job: bool = True, session=None, + extra_labels: Mapping[str, str] = {}, ) -> Tuple[bq_table.RowIterator, Optional[bigquery.QueryJob]]: """ Starts BigQuery query job and waits for results. @@ -369,8 +376,9 @@ def _run_execute_query( bigframes.options.compute.maximum_bytes_billed ) - if self._labels: + if self._labels or extra_labels: job_config.labels.update(self._labels) + job_config.labels.update(extra_labels) try: # Trick the type checker into thinking we got a literal. @@ -420,43 +428,6 @@ def _is_trivially_executable(self, array_value: bigframes.core.ArrayValue): self.prepare_plan(array_value.node) ) - def _compile( - self, - node: nodes.BigFrameNode, - *, - ordered: bool = False, - peek: Optional[int] = None, - materialize_all_order_keys: bool = False, - compiler_name: Literal["sqlglot", "ibis"] = "sqlglot", - ) -> compile.CompileResult: - return compile.compile_sql( - compile.CompileRequest( - node, - sort_rows=ordered, - peek_count=peek, - materialize_all_order_keys=materialize_all_order_keys, - ), - compiler_name=compiler_name, - ) - - def _compile_with_fallback(self, run_fn): - compiler_option = bigframes.options.experiments.sql_compiler - if compiler_option == "legacy": - return run_fn("ibis") - elif compiler_option == "experimental": - return run_fn("sqlglot") - else: # stable - compiler_id = f"{uuid.uuid1().hex[:12]}" - try: - return run_fn("sqlglot", compiler_id=compiler_id) - except google.cloud.exceptions.BadRequest as e: - msg = bfe.format_message( - f"Compiler ID {compiler_id}: BadRequest on sqlglot. " - f"Falling back to ibis. Details: {e.message}" - ) - warnings.warn(msg, category=UserWarning) - return run_fn("ibis", compiler_id=compiler_id) - def prepare_plan( self, plan: nodes.BigFrameNode, @@ -547,8 +518,9 @@ def _cache_most_complex_subtree(self, node: nodes.BigFrameNode) -> bool: max_complexity=QUERY_COMPLEXITY_LIMIT, cache=self.cache, # Heuristic: subtree_compleixty * (copies of subtree)^2 - heuristic=lambda complexity, count: math.log(complexity) - + 2 * math.log(count), + heuristic=lambda complexity, count: ( + math.log(complexity) + 2 * math.log(count) + ), ) if selection is None: # No good subtrees to cache, just return original tree @@ -621,6 +593,7 @@ def _execute_plan_gbq( peek: Optional[int] = None, cache_spec: Optional[ex_spec.CacheSpec] = None, must_create_table: bool = True, + extra_labels: Mapping[str, str] = {}, ) -> executor.ExecuteResult: """Just execute whatever plan as is, without further caching or decomposition.""" # TODO(swast): plumb through the api_name of the user-facing api that @@ -651,43 +624,36 @@ def _execute_plan_gbq( ] cluster_cols = cluster_cols[:_MAX_CLUSTER_COLUMNS] - def run_with_compiler(compiler_name, compiler_id=None): - compiled = self._compile( + compiled = compile.compile_sql( + compile.CompileRequest( plan, - ordered=ordered, - peek=peek, + sort_rows=ordered, + peek_count=peek, materialize_all_order_keys=(cache_spec is not None), - compiler_name=compiler_name, - ) - # might have more columns than og schema, for hidden ordering columns - compiled_schema = compiled.sql_schema - - destination_table: Optional[bigquery.TableReference] = None + ), + compiler_name=self._compiler_name, + ) + # might have more columns than og schema, for hidden ordering columns + compiled_schema = compiled.sql_schema - job_config = bigquery.QueryJobConfig() - if create_table: - destination_table = self.storage_manager.create_temp_table( - compiled_schema, cluster_cols - ) - job_config.destination = destination_table + destination_table: Optional[bigquery.TableReference] = None - # Attach data type usage to the job labels - job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs - job_config.labels["bigframes-compiler"] = ( - f"{compiler_name}-{compiler_id}" if compiler_id else compiler_name + job_config = bigquery.QueryJobConfig() + if create_table: + destination_table = self.storage_manager.create_temp_table( + compiled_schema, cluster_cols ) - iterator, query_job = self._run_execute_query( - sql=compiled.sql, - job_config=job_config, - query_with_job=(destination_table is not None), - session=plan.session, - ) - return iterator, query_job, compiled - - iterator, query_job, compiled = self._compile_with_fallback(run_with_compiler) - - # might have more columns than og schema, for hidden ordering columns - compiled_schema = compiled.sql_schema + job_config.destination = destination_table + + # Attach data type usage to the job labels + job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs + iterator, query_job = self._run_execute_query( + sql=compiled.sql, + job_config=job_config, + query_with_job=(destination_table is not None), + session=plan.session, + extra_labels=extra_labels, + ) # we could actually cache even when caching is not explicitly requested, but being conservative for now result_bq_data = None diff --git a/packages/bigframes/bigframes/session/execution_spec.py b/packages/bigframes/bigframes/session/execution_spec.py index c9431dbd1168..7ba4666a3616 100644 --- a/packages/bigframes/bigframes/session/execution_spec.py +++ b/packages/bigframes/bigframes/session/execution_spec.py @@ -15,7 +15,7 @@ from __future__ import annotations import dataclasses -from typing import Literal, Optional, Union +from typing import Literal, Mapping, Optional, Union from google.cloud import bigquery @@ -30,6 +30,11 @@ class ExecutionSpec: # This is an optimization flag for gbq execution, it doesn't change semantics, but if promise is falsely made, errors may occur promise_under_10gb: bool = False + labels: tuple[tuple[str, str], ...] = () + + def add_labels(self, labels: Mapping[str, str]) -> ExecutionSpec: + return dataclasses.replace(self, labels=self.labels + tuple(labels.items())) + # This one is temporary, in future, caching will not be done through immediate execution, but will label nodes # that will be cached only when a super-tree is executed diff --git a/packages/bigframes/bigframes/session/proxy_executor.py b/packages/bigframes/bigframes/session/proxy_executor.py new file mode 100644 index 000000000000..4b175683d82d --- /dev/null +++ b/packages/bigframes/bigframes/session/proxy_executor.py @@ -0,0 +1,168 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import typing +import uuid +import warnings +from typing import Mapping, Optional + +import google.cloud.bigquery as bigquery +import google.cloud.exceptions + +import bigframes.core +from bigframes import exceptions as bfe +from bigframes.session import ( + bq_caching_executor, + execution_cache, + execution_spec, + executor, + loader, + metrics, + temporary_storage, +) + +_COMPILER_LABEL_KEY = "bigframes-compiler" + + +class DualCompilerProxyExecutor(executor.Executor): + """ + Used to rollout new compiler implementation. + """ + + def __init__( + self, + bqclient: bigquery.Client, + storage_manager: temporary_storage.TemporaryStorageManager, + bqstoragereadclient: google.cloud.bigquery_storage_v1.BigQueryReadClient, + loader: loader.GbqDataLoader, + *, + metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None, + enable_polars_execution: bool = False, + publisher: bigframes.core.events.Publisher, + labels: Mapping[str, str] = {}, + ): + shared_cache = execution_cache.ExecutionCache() + self._ibis_executor = bq_caching_executor.BigQueryCachingExecutor( + bqclient, + storage_manager, + bqstoragereadclient, + loader, + metrics=metrics, + enable_polars_execution=enable_polars_execution, + publisher=publisher, + labels=labels, + cache=shared_cache, + compiler_name="ibis", + ) + self._sqlglot_executor = bq_caching_executor.BigQueryCachingExecutor( + bqclient, + storage_manager, + bqstoragereadclient, + loader, + metrics=metrics, + enable_polars_execution=enable_polars_execution, + publisher=publisher, + labels=labels, + cache=shared_cache, + compiler_name="sqlglot", + ) + + def to_sql( + self, + array_value: bigframes.core.ArrayValue, + offset_column: Optional[str] = None, + ordered: bool = False, + enable_cache: bool = True, + ) -> str: + """ + Convert an ArrayValue to a sql query that will yield its value. + """ + return self._ibis_executor.to_sql( + array_value, + offset_column=offset_column, + ordered=ordered, + enable_cache=enable_cache, + ) + + def execute( + self, + array_value: bigframes.core.ArrayValue, + execution_spec: execution_spec.ExecutionSpec, + ) -> executor.ExecuteResult: + compiler_option = bigframes.options.experiments.sql_compiler + if compiler_option == "legacy": + return self._ibis_executor.execute(array_value, execution_spec) + elif compiler_option == "experimental": + return self._sqlglot_executor.execute( + array_value, execution_spec.add_labels({_COMPILER_LABEL_KEY: "sqlglot"}) + ) + else: # stable + correlation_id = f"{uuid.uuid1().hex[:12]}" + try: + return self._sqlglot_executor.execute( + array_value, + execution_spec.add_labels( + {_COMPILER_LABEL_KEY: f"sqlglot-{correlation_id}"} + ), + ) + except google.cloud.exceptions.BadRequest as e: + msg = bfe.format_message( + f"Compiler ID {correlation_id}: BadRequest on sqlglot. " + f"Falling back to ibis. Details: {e.message}" + ) + warnings.warn(msg, category=UserWarning) + return self._ibis_executor.execute( + array_value, + execution_spec.add_labels( + {_COMPILER_LABEL_KEY: f"ibis-{correlation_id}"} + ), + ) + + def dry_run( + self, array_value: bigframes.core.ArrayValue, ordered: bool = True + ) -> bigquery.QueryJob: + """ + Dry run executing the ArrayValue. + + Does not actually execute the data but will get stats and indicate any invalid query errors. + """ + return self._ibis_executor.dry_run(array_value, ordered=ordered) + + def cached( + self, + array_value: bigframes.core.ArrayValue, + *, + config: executor.CacheConfig, + ) -> None: + compiler_option = bigframes.options.experiments.sql_compiler + if compiler_option == "legacy": + return self._ibis_executor.cached(array_value, config=config) + elif compiler_option == "experimental": + return self._sqlglot_executor.cached(array_value, config=config) + else: # stable + correlation_id = f"{uuid.uuid1().hex[:12]}" + try: + return self._sqlglot_executor.cached(array_value, config=config) + except google.cloud.exceptions.BadRequest as e: + msg = bfe.format_message( + f"Compiler ID {correlation_id}: BadRequest on sqlglot. " + f"Falling back to ibis. Details: {e.message}" + ) + warnings.warn(msg, category=UserWarning) + return self._ibis_executor.cached( + array_value, + config=config, + ) diff --git a/packages/bigframes/tests/unit/session/test_bq_caching_executor.py b/packages/bigframes/tests/unit/session/test_bq_caching_executor.py deleted file mode 100644 index 9fc40840d8a7..000000000000 --- a/packages/bigframes/tests/unit/session/test_bq_caching_executor.py +++ /dev/null @@ -1,111 +0,0 @@ -# Copyright 2026 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from unittest import mock - -import google.cloud.bigquery as bigquery -import google.cloud.exceptions -import pyarrow as pa -import pytest - -import bigframes -import bigframes.core.nodes as nodes -import bigframes.core.schema as schemata -from bigframes.session.bq_caching_executor import BigQueryCachingExecutor - - -@pytest.fixture -def mock_executor(): - bqclient = mock.create_autospec(bigquery.Client) - bqclient.project = "test-project" - storage_manager = mock.Mock() - bqstoragereadclient = mock.Mock() - loader = mock.Mock() - publisher = mock.Mock() - return BigQueryCachingExecutor( - bqclient, storage_manager, bqstoragereadclient, loader, publisher=publisher - ) - - -def test_compiler_with_fallback_legacy(mock_executor): - run_fn = mock.Mock() - with bigframes.option_context("experiments.sql_compiler", "legacy"): - mock_executor._compile_with_fallback(run_fn) - run_fn.assert_called_once_with("ibis") - - -def test_compiler_with_fallback_experimental(mock_executor): - run_fn = mock.Mock() - with bigframes.option_context("experiments.sql_compiler", "experimental"): - mock_executor._compile_with_fallback(run_fn) - run_fn.assert_called_once_with("sqlglot") - - -def test_compiler_with_fallback_stable_success(mock_executor): - run_fn = mock.Mock() - with bigframes.option_context("experiments.sql_compiler", "stable"): - mock_executor._compile_with_fallback(run_fn) - run_fn.assert_called_once_with("sqlglot", compiler_id=mock.ANY) - - -def test_compiler_execute_plan_gbq_fallback_labels(mock_executor): - plan = mock.create_autospec(nodes.BigFrameNode) - plan.schema = schemata.ArraySchema(tuple()) - plan.session = None - - # Mock prepare_plan - mock_executor.prepare_plan = mock.Mock(return_value=plan) - - # Mock _compile - from bigframes.core.compile.configs import CompileResult - - fake_compiled = CompileResult( - sql="SELECT 1", sql_schema=[], row_order=None, encoded_type_refs="fake_refs" - ) - mock_executor._compile = mock.Mock(return_value=fake_compiled) - - # Mock _run_execute_query to fail first time, then succeed - mock_iterator = mock.Mock() - mock_iterator.total_rows = 0 - mock_iterator.to_arrow.return_value = pa.Table.from_arrays([], names=[]) - mock_query_job = mock.Mock(spec=bigquery.QueryJob) - mock_query_job.destination = None - - error = google.cloud.exceptions.BadRequest("failed") - error.job = mock.Mock(spec=bigquery.QueryJob) # type: ignore - error.job.job_id = "failed_job_id" # type: ignore - - mock_executor._run_execute_query = mock.Mock( - side_effect=[error, (mock_iterator, mock_query_job)] - ) - - with ( - bigframes.option_context("experiments.sql_compiler", "stable"), - pytest.warns(UserWarning, match="Falling back to ibis"), - ): - mock_executor._execute_plan_gbq(plan, ordered=False, must_create_table=False) - - # Verify labels for both calls - assert mock_executor._run_execute_query.call_count == 2 - - call_1_kwargs = mock_executor._run_execute_query.call_args_list[0][1] - call_2_kwargs = mock_executor._run_execute_query.call_args_list[1][1] - - label_1 = call_1_kwargs["job_config"].labels["bigframes-compiler"] - label_2 = call_2_kwargs["job_config"].labels["bigframes-compiler"] - - assert label_1.startswith("sqlglot-") - assert label_2.startswith("ibis-") - # Both should have the same compiler_id suffix - assert label_1.split("-")[1] == label_2.split("-")[1] diff --git a/packages/bigframes/tests/unit/session/test_proxy_executor.py b/packages/bigframes/tests/unit/session/test_proxy_executor.py new file mode 100644 index 000000000000..382bc0a23058 --- /dev/null +++ b/packages/bigframes/tests/unit/session/test_proxy_executor.py @@ -0,0 +1,187 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import mock + +import google.cloud.bigquery as bigquery +import google.cloud.exceptions +import pyarrow as pa +import pytest + +import bigframes +import bigframes.core.nodes as nodes +import bigframes.core.schema as schemata +from bigframes.session.proxy_executor import DualCompilerProxyExecutor + + +@pytest.fixture +def mock_executor(): + bqclient = mock.create_autospec(bigquery.Client) + bqclient.project = "test-project" + storage_manager = mock.Mock() + bqstoragereadclient = mock.Mock() + loader = mock.Mock() + publisher = mock.Mock() + return DualCompilerProxyExecutor( + bqclient, storage_manager, bqstoragereadclient, loader, publisher=publisher + ) + + +def test_execute_legacy_routes_to_ibis(mock_executor, monkeypatch): + array_value = mock.Mock(spec=bigframes.core.ArrayValue) + execution_spec = mock.Mock(spec=bigframes.session.execution_spec.ExecutionSpec) + + mock_executor._ibis_executor = mock.Mock() + mock_executor._sqlglot_executor = mock.Mock() + + monkeypatch.setattr(bigframes.options.experiments, "sql_compiler", "legacy") + mock_executor.execute(array_value, execution_spec) + + mock_executor._ibis_executor.execute.assert_called_once_with( + array_value, execution_spec + ) + mock_executor._sqlglot_executor.execute.assert_not_called() + + +def test_execute_experimental_routes_to_sqlglot(mock_executor, monkeypatch): + array_value = mock.Mock(spec=bigframes.core.ArrayValue) + execution_spec = mock.Mock(spec=bigframes.session.execution_spec.ExecutionSpec) + execution_spec.add_labels.return_value = execution_spec + + mock_executor._ibis_executor = mock.Mock() + mock_executor._sqlglot_executor = mock.Mock() + + monkeypatch.setattr(bigframes.options.experiments, "sql_compiler", "experimental") + mock_executor.execute(array_value, execution_spec) + + execution_spec.add_labels.assert_called_once_with({"bigframes-compiler": "sqlglot"}) + mock_executor._sqlglot_executor.execute.assert_called_once_with( + array_value, execution_spec + ) + mock_executor._ibis_executor.execute.assert_not_called() + + +def test_execute_stable_routes_to_sqlglot_success(mock_executor, monkeypatch): + array_value = mock.Mock(spec=bigframes.core.ArrayValue) + execution_spec = mock.Mock(spec=bigframes.session.execution_spec.ExecutionSpec) + execution_spec.add_labels.return_value = execution_spec + + mock_executor._ibis_executor = mock.Mock() + mock_executor._sqlglot_executor = mock.Mock() + + monkeypatch.setattr(bigframes.options.experiments, "sql_compiler", "stable") + with mock.patch("uuid.uuid1") as mock_uuid: + mock_uuid.return_value.hex = "1234567890123456" + mock_executor.execute(array_value, execution_spec) + + execution_spec.add_labels.assert_called_once_with( + {"bigframes-compiler": "sqlglot-123456789012"} + ) + mock_executor._sqlglot_executor.execute.assert_called_once_with( + array_value, execution_spec + ) + mock_executor._ibis_executor.execute.assert_not_called() + + +def test_execute_stable_routes_to_sqlglot_fallback_to_ibis(mock_executor, monkeypatch): + array_value = mock.Mock(spec=bigframes.core.ArrayValue) + execution_spec = mock.Mock(spec=bigframes.session.execution_spec.ExecutionSpec) + + spec_sqlglot = mock.Mock(spec=bigframes.session.execution_spec.ExecutionSpec) + spec_ibis = mock.Mock(spec=bigframes.session.execution_spec.ExecutionSpec) + execution_spec.add_labels.side_effect = [spec_sqlglot, spec_ibis] + + mock_executor._ibis_executor = mock.Mock() + mock_executor._sqlglot_executor = mock.Mock() + + mock_executor._sqlglot_executor.execute.side_effect = ( + google.cloud.exceptions.BadRequest("test error") + ) + + monkeypatch.setattr(bigframes.options.experiments, "sql_compiler", "stable") + with mock.patch("uuid.uuid1") as mock_uuid: + mock_uuid.return_value.hex = "1234567890123456" + with pytest.warns(UserWarning, match="BadRequest on sqlglot"): + mock_executor.execute(array_value, execution_spec) + + execution_spec.add_labels.assert_has_calls( + [ + mock.call({"bigframes-compiler": "sqlglot-123456789012"}), + mock.call({"bigframes-compiler": "ibis-123456789012"}), + ] + ) + + mock_executor._sqlglot_executor.execute.assert_called_once_with( + array_value, spec_sqlglot + ) + mock_executor._ibis_executor.execute.assert_called_once_with(array_value, spec_ibis) + + +def test_cached_legacy_routes_to_ibis(mock_executor, monkeypatch): + array_value = mock.Mock(spec=bigframes.core.ArrayValue) + config = mock.Mock() + + mock_executor._ibis_executor = mock.Mock() + mock_executor._sqlglot_executor = mock.Mock() + + monkeypatch.setattr(bigframes.options.experiments, "sql_compiler", "legacy") + mock_executor.cached(array_value, config=config) + + mock_executor._ibis_executor.cached.assert_called_once_with( + array_value, config=config + ) + mock_executor._sqlglot_executor.cached.assert_not_called() + + +def test_cached_experimental_routes_to_sqlglot(mock_executor, monkeypatch): + array_value = mock.Mock(spec=bigframes.core.ArrayValue) + config = mock.Mock() + + mock_executor._ibis_executor = mock.Mock() + mock_executor._sqlglot_executor = mock.Mock() + + monkeypatch.setattr(bigframes.options.experiments, "sql_compiler", "experimental") + mock_executor.cached(array_value, config=config) + + mock_executor._sqlglot_executor.cached.assert_called_once_with( + array_value, config=config + ) + mock_executor._ibis_executor.cached.assert_not_called() + + +def test_cached_stable_routes_to_sqlglot_fallback_to_ibis(mock_executor, monkeypatch): + array_value = mock.Mock(spec=bigframes.core.ArrayValue) + config = mock.Mock() + + mock_executor._ibis_executor = mock.Mock() + mock_executor._sqlglot_executor = mock.Mock() + + mock_executor._sqlglot_executor.cached.side_effect = ( + google.cloud.exceptions.BadRequest("test error") + ) + + monkeypatch.setattr(bigframes.options.experiments, "sql_compiler", "stable") + with mock.patch("uuid.uuid1") as mock_uuid: + mock_uuid.return_value.hex = "1234567890123456" + with pytest.warns( + UserWarning, match="Compiler ID 123456789012: BadRequest on sqlglot" + ): + mock_executor.cached(array_value, config=config) + + mock_executor._sqlglot_executor.cached.assert_called_once_with( + array_value, config=config + ) + mock_executor._ibis_executor.cached.assert_called_once_with( + array_value, config=config + )