Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/bigframes/bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
from bigframes import version
from bigframes.core import blocks, utils
from bigframes.core.logging import log_adapter
from bigframes.session import bigquery_session, bq_caching_executor, executor
from bigframes.session import bigquery_session, executor, proxy_executor

# Avoid circular imports.
if typing.TYPE_CHECKING:
Expand Down Expand Up @@ -311,7 +311,7 @@ def __init__(
if not self._strictly_ordered:
labels["bigframes-mode"] = "unordered"

self._executor: executor.Executor = bq_caching_executor.BigQueryCachingExecutor(
self._executor: executor.Executor = proxy_executor.DualCompilerProxyExecutor(
bqclient=self._clients_provider.bqclient,
bqstoragereadclient=self._clients_provider.bqstoragereadclient,
loader=self._loader,
Expand Down
214 changes: 90 additions & 124 deletions packages/bigframes/bigframes/session/bq_caching_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@
import concurrent.futures
import math
import threading
import uuid
import warnings
from typing import Literal, Mapping, Optional, Sequence, Tuple

import google.api_core.exceptions
import google.cloud.bigquery.job as bq_job
import google.cloud.bigquery.table as bq_table
import google.cloud.bigquery_storage_v1
import google.cloud.exceptions
from google.cloud import bigquery

import bigframes
Expand Down Expand Up @@ -84,10 +81,14 @@ def __init__(
enable_polars_execution: bool = False,
publisher: bigframes.core.events.Publisher,
labels: Mapping[str, str] = {},
compiler_name: str = "ibis",
cache: Optional[execution_cache.ExecutionCache] = None,
):
self.bqclient = bqclient
self.storage_manager = storage_manager
self.cache: execution_cache.ExecutionCache = execution_cache.ExecutionCache()
self.cache: execution_cache.ExecutionCache = (
cache or execution_cache.ExecutionCache()
)
self.metrics = metrics
self.loader = loader
self.bqstoragereadclient = bqstoragereadclient
Expand All @@ -111,6 +112,7 @@ def __init__(
polars_executor.PolarsExecutor(),
)
self._upload_lock = threading.Lock()
self._compiler_name = compiler_name

def to_sql(
self,
Expand All @@ -127,7 +129,10 @@ def to_sql(
else array_value.node
)
node = self._substitute_large_local_sources(node)
compiled = self._compile(node, ordered=ordered)
compiled = compile.compiler().compile_sql(
compile.CompileRequest(node, sort_rows=ordered),
compiler_name=self._compiler_name,
)
return compiled.sql

def execute(
Expand Down Expand Up @@ -158,7 +163,11 @@ def execute(
"Ordering and peeking not supported for gbq export"
)
# separate path for export_gbq, as it has all sorts of annoying logic, such as possibly running as dml
result = self._export_gbq(array_value, execution_spec.destination_spec)
result = self._export_gbq(
array_value,
execution_spec.destination_spec,
extra_labels=dict(execution_spec.labels),
)
self._publisher.publish(
bigframes.core.events.ExecutionFinished(
result=result,
Expand All @@ -174,6 +183,7 @@ def execute(
if isinstance(execution_spec.destination_spec, ex_spec.CacheSpec)
else None,
must_create_table=not execution_spec.promise_under_10gb,
extra_labels=dict(execution_spec.labels),
)
# post steps: export
if isinstance(execution_spec.destination_spec, ex_spec.GcsOutputSpec):
Expand Down Expand Up @@ -233,7 +243,10 @@ def _maybe_find_existing_table(
return None

def _export_gbq(
self, array_value: bigframes.core.ArrayValue, spec: ex_spec.TableOutputSpec
self,
array_value: bigframes.core.ArrayValue,
spec: ex_spec.TableOutputSpec,
extra_labels: Mapping[str, str] = {},
) -> executor.ExecuteResult:
"""
Export the ArrayValue to an existing BigQuery table.
Expand All @@ -243,55 +256,48 @@ def _export_gbq(
# validate destination table
existing_table = self._maybe_find_existing_table(spec)

def run_with_compiler(compiler_name, compiler_id=None):
compiled = self._compile(plan, ordered=False, compiler_name=compiler_name)
sql = compiled.sql
compiled = compile.compiler().compile_sql(
compile.CompileRequest(plan, sort_rows=False),
compiler_name=self._compiler_name,
)
sql = compiled.sql

if (existing_table is not None) and _is_schema_match(
existing_table.schema, array_value.schema
):
# b/409086472: Uses DML for table appends and replacements to avoid
# BigQuery `RATE_LIMIT_EXCEEDED` errors, as per quota limits:
# https://cloud.google.com/bigquery/quotas#standard_tables
job_config = bigquery.QueryJobConfig()

ir = sqlglot_ir.SQLGlotIR.from_unparsed_query(sql)
if spec.if_exists == "append":
sql = sg_sql.to_sql(
sg_sql.insert(ir.expr.as_select_all(), spec.table)
)
else: # for "replace"
assert spec.if_exists == "replace"
sql = sg_sql.to_sql(
sg_sql.replace(ir.expr.as_select_all(), spec.table)
)
else:
dispositions = {
"fail": bigquery.WriteDisposition.WRITE_EMPTY,
"replace": bigquery.WriteDisposition.WRITE_TRUNCATE,
"append": bigquery.WriteDisposition.WRITE_APPEND,
}
job_config = bigquery.QueryJobConfig(
write_disposition=dispositions[spec.if_exists],
destination=spec.table,
clustering_fields=spec.cluster_cols if spec.cluster_cols else None,
)
if (existing_table is not None) and _is_schema_match(
existing_table.schema, array_value.schema
):
# b/409086472: Uses DML for table appends and replacements to avoid
# BigQuery `RATE_LIMIT_EXCEEDED` errors, as per quota limits:
# https://cloud.google.com/bigquery/quotas#standard_tables
job_config = bigquery.QueryJobConfig()

# Attach data type usage to the job labels
job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs
job_config.labels["bigframes-compiler"] = (
f"{compiler_name}-{compiler_id}" if compiler_id else compiler_name
)
# TODO(swast): plumb through the api_name of the user-facing api that
# caused this query.
iterator, job = self._run_execute_query(
sql=sql,
job_config=job_config,
session=array_value.session,
ir = sqlglot_ir.SQLGlotIR.from_unparsed_query(sql)
if spec.if_exists == "append":
sql = sg_sql.to_sql(sg_sql.insert(ir.expr.as_select_all(), spec.table))
else: # for "replace"
assert spec.if_exists == "replace"
sql = sg_sql.to_sql(sg_sql.replace(ir.expr.as_select_all(), spec.table))
else:
dispositions = {
"fail": bigquery.WriteDisposition.WRITE_EMPTY,
"replace": bigquery.WriteDisposition.WRITE_TRUNCATE,
"append": bigquery.WriteDisposition.WRITE_APPEND,
}
job_config = bigquery.QueryJobConfig(
write_disposition=dispositions[spec.if_exists],
destination=spec.table,
clustering_fields=spec.cluster_cols if spec.cluster_cols else None,
)
return iterator, job

iterator, job = self._compile_with_fallback(run_with_compiler)
# Attach data type usage to the job labels
job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs
# TODO(swast): plumb through the api_name of the user-facing api that
# caused this query.
iterator, job = self._run_execute_query(
sql=sql,
job_config=job_config,
session=array_value.session,
extra_labels=self._labels,
Comment thread
TrevorBergeron marked this conversation as resolved.
Outdated
)

has_special_dtype_col = any(
t in (bigframes.dtypes.TIMEDELTA_DTYPE, bigframes.dtypes.OBJ_REF_DTYPE)
Expand Down Expand Up @@ -359,6 +365,7 @@ def _run_execute_query(
job_config: Optional[bq_job.QueryJobConfig] = None,
query_with_job: bool = True,
session=None,
extra_labels: Mapping[str, str] = {},
) -> Tuple[bq_table.RowIterator, Optional[bigquery.QueryJob]]:
"""
Starts BigQuery query job and waits for results.
Expand All @@ -369,8 +376,9 @@ def _run_execute_query(
bigframes.options.compute.maximum_bytes_billed
)

if self._labels:
if self._labels or extra_labels:
job_config.labels.update(self._labels)
job_config.labels.update(extra_labels)

try:
# Trick the type checker into thinking we got a literal.
Expand Down Expand Up @@ -420,43 +428,6 @@ def _is_trivially_executable(self, array_value: bigframes.core.ArrayValue):
self.prepare_plan(array_value.node)
)

def _compile(
self,
node: nodes.BigFrameNode,
*,
ordered: bool = False,
peek: Optional[int] = None,
materialize_all_order_keys: bool = False,
compiler_name: Literal["sqlglot", "ibis"] = "sqlglot",
) -> compile.CompileResult:
return compile.compile_sql(
compile.CompileRequest(
node,
sort_rows=ordered,
peek_count=peek,
materialize_all_order_keys=materialize_all_order_keys,
),
compiler_name=compiler_name,
)

def _compile_with_fallback(self, run_fn):
compiler_option = bigframes.options.experiments.sql_compiler
if compiler_option == "legacy":
return run_fn("ibis")
elif compiler_option == "experimental":
return run_fn("sqlglot")
else: # stable
compiler_id = f"{uuid.uuid1().hex[:12]}"
try:
return run_fn("sqlglot", compiler_id=compiler_id)
except google.cloud.exceptions.BadRequest as e:
msg = bfe.format_message(
f"Compiler ID {compiler_id}: BadRequest on sqlglot. "
f"Falling back to ibis. Details: {e.message}"
)
warnings.warn(msg, category=UserWarning)
return run_fn("ibis", compiler_id=compiler_id)

def prepare_plan(
self,
plan: nodes.BigFrameNode,
Expand Down Expand Up @@ -547,8 +518,9 @@ def _cache_most_complex_subtree(self, node: nodes.BigFrameNode) -> bool:
max_complexity=QUERY_COMPLEXITY_LIMIT,
cache=self.cache,
# Heuristic: subtree_compleixty * (copies of subtree)^2
heuristic=lambda complexity, count: math.log(complexity)
+ 2 * math.log(count),
heuristic=lambda complexity, count: (
math.log(complexity) + 2 * math.log(count)
),
)
if selection is None:
# No good subtrees to cache, just return original tree
Expand Down Expand Up @@ -621,6 +593,7 @@ def _execute_plan_gbq(
peek: Optional[int] = None,
cache_spec: Optional[ex_spec.CacheSpec] = None,
must_create_table: bool = True,
extra_labels: Mapping[str, str] = {},
) -> executor.ExecuteResult:
"""Just execute whatever plan as is, without further caching or decomposition."""
# TODO(swast): plumb through the api_name of the user-facing api that
Expand Down Expand Up @@ -651,43 +624,36 @@ def _execute_plan_gbq(
]
cluster_cols = cluster_cols[:_MAX_CLUSTER_COLUMNS]

def run_with_compiler(compiler_name, compiler_id=None):
compiled = self._compile(
compiled = compile.compile_sql(
compile.CompileRequest(
plan,
ordered=ordered,
peek=peek,
sort_rows=ordered,
peek_count=peek,
materialize_all_order_keys=(cache_spec is not None),
compiler_name=compiler_name,
)
# might have more columns than og schema, for hidden ordering columns
compiled_schema = compiled.sql_schema

destination_table: Optional[bigquery.TableReference] = None
),
compiler_name=self._compiler_name,
)
# might have more columns than og schema, for hidden ordering columns
compiled_schema = compiled.sql_schema

job_config = bigquery.QueryJobConfig()
if create_table:
destination_table = self.storage_manager.create_temp_table(
compiled_schema, cluster_cols
)
job_config.destination = destination_table
destination_table: Optional[bigquery.TableReference] = None

# Attach data type usage to the job labels
job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs
job_config.labels["bigframes-compiler"] = (
f"{compiler_name}-{compiler_id}" if compiler_id else compiler_name
job_config = bigquery.QueryJobConfig()
if create_table:
destination_table = self.storage_manager.create_temp_table(
compiled_schema, cluster_cols
)
iterator, query_job = self._run_execute_query(
sql=compiled.sql,
job_config=job_config,
query_with_job=(destination_table is not None),
session=plan.session,
)
return iterator, query_job, compiled

iterator, query_job, compiled = self._compile_with_fallback(run_with_compiler)

# might have more columns than og schema, for hidden ordering columns
compiled_schema = compiled.sql_schema
job_config.destination = destination_table

# Attach data type usage to the job labels
job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs
iterator, query_job = self._run_execute_query(
sql=compiled.sql,
job_config=job_config,
query_with_job=(destination_table is not None),
session=plan.session,
extra_labels=extra_labels,
)

# we could actually cache even when caching is not explicitly requested, but being conservative for now
result_bq_data = None
Expand Down
7 changes: 6 additions & 1 deletion packages/bigframes/bigframes/session/execution_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from __future__ import annotations

import dataclasses
from typing import Literal, Optional, Union
from typing import Literal, Mapping, Optional, Union

from google.cloud import bigquery

Expand All @@ -30,6 +30,11 @@ class ExecutionSpec:
# This is an optimization flag for gbq execution, it doesn't change semantics, but if promise is falsely made, errors may occur
promise_under_10gb: bool = False

labels: tuple[tuple[str, str], ...] = ()

def add_labels(self, labels: Mapping[str, str]) -> ExecutionSpec:
return dataclasses.replace(self, labels=self.labels + tuple(labels.items()))


# This one is temporary, in future, caching will not be done through immediate execution, but will label nodes
# that will be cached only when a super-tree is executed
Expand Down
Loading
Loading