diff --git a/macros/meta/get_column_specific_config.sql b/macros/meta/get_column_specific_config.sql new file mode 100644 index 0000000..222fc46 --- /dev/null +++ b/macros/meta/get_column_specific_config.sql @@ -0,0 +1,20 @@ +{% macro get_column_specific_anomaly( + database, schema, column_name, table_name, metric_name +) %} + {% set column_path = "column." ~ column_name %} + select + name as name, + schema as schema, + database as database, + {{ "'" ~ column_name ~ "'" }} as column, + {{ "'" ~ metric_name ~ "'" }} as metric_name, + json_query(t, {{ "'$." ~ metric_name ~ "'" }}) as metric_spec + from + {{ ref("re_data_selected") }}, + unnest({{ re_data.json_extract_array("additional_metrics", column_path) }}) t + where + name = {{ "'" ~ table_name ~ "'" }} + and schema = {{ "'" ~ schema ~ "'" }} + and database = {{ "'" ~ database ~ "'" }} + +{% endmacro %} diff --git a/macros/utils/anomaly_labeling.sql b/macros/utils/anomaly_labeling.sql new file mode 100644 index 0000000..97cecb2 --- /dev/null +++ b/macros/utils/anomaly_labeling.sql @@ -0,0 +1,400 @@ +{% macro change_percentage(last_value, last_avg) %} + cast({{ last_value }} - {{ last_avg }} as {{ numeric_type() }}) + / nullif(cast({{ last_avg }} as {{ numeric_type() }}), 0) + * 100.0 +{% endmacro %} + +{% macro is_anomaly_from_model( + anomaly_config, + last_value, + last_avg, + z_score_value, + modified_z_score_value, + last_first_quartile, + last_iqr, + last_third_quartile +) %} + case + when + ( + lower(coalesce({{ json_extract(anomaly_config, "direction") }}, 'both')) + = 'up' + and {{ last_value }} > {{ last_avg }} + ) + or ( + lower(coalesce({{ json_extract(anomaly_config, "direction") }}, 'both')) + = 'down' + and {{ last_value }} < {{ last_avg }} + ) + or ( + lower(coalesce({{ json_extract(anomaly_config, "direction") }}, 'both')) + != 'up' + and lower( + coalesce({{ json_extract(anomaly_config, "direction") }}, 'both') + ) + != 'down' + ) + then + case + when {{ json_extract(anomaly_config, "name") }} = 'z_score' + then + abs({{ z_score_value }}) > cast( + {{ json_extract(anomaly_config, "threshold") }} + as {{ numeric_type() }} + ) + when {{ json_extract(anomaly_config, "name") }} = 'modified_z_score' + then + abs({{ modified_z_score_value }}) > cast( + {{ json_extract(anomaly_config, "threshold") }} + as {{ numeric_type() }} + ) + when {{ json_extract(anomaly_config, "name") }} = 'boxplot' + then + ( + {{ last_value }} + < {{ last_first_quartile }} + - ( + cast( + {{ + json_extract( + anomaly_config, + "whisker_boundary_multiplier", + ) + }} as {{ numeric_type() }} + ) + * {{ last_iqr }} + ) + or {{ last_value }} + > {{ last_third_quartile }} + + ( + cast( + {{ + json_extract( + anomaly_config, + "whisker_boundary_multiplier", + ) + }} as {{ numeric_type() }} + ) + * {{ last_iqr }} + ) + ) + else false + end + else false + end +{% endmacro %} + + +{% macro is_anomaly_absolute_threshold(anomaly_config, last_value) %} + case + when + {{ + json_extract( + anomaly_config, + "absolute_threshold.threshold", + ) + }} is not null + then + case + when + + lower( + {{ + json_extract( + anomaly_config, + "absolute_threshold.direction", + ) + }} + ) + = 'up' + then + {{ last_value }} > cast( + {{ + json_extract( + anomaly_config, + "absolute_threshold.threshold", + ) + }} as {{ numeric_type() }} + ) + + when + lower( + {{ + json_extract( + anomaly_config, + "absolute_threshold.direction", + ) + }} + ) + = 'down' + then + {{ last_value }} < cast( + {{ + json_extract( + anomaly_config, + "absolute_threshold.threshold", + ) + }} as {{ numeric_type() }} + ) + when + lower( + coalesce( + {{ + json_extract( + anomaly_config, + "absolute_threshold.direction", + ) + }}, + 'both' + ) + ) + = 'both' + then + abs({{ last_value }}) > cast( + {{ + json_extract( + anomaly_config, + "absolute_threshold.threshold", + ) + }} as {{ numeric_type() }} + ) + end + else true + end + +{% endmacro %} + +{% macro is_anomaly_change_percentage(anomaly_config, last_value, last_avg) %} + case + when + {{ + json_extract( + anomaly_config, + "change_percentage.threshold", + ) + }} is not null + then + case + when + + lower( + {{ + json_extract( + anomaly_config, + "change_percentage.direction", + ) + }} + ) + = 'up' + then + ( + {{ + change_percentage( + last_value=last_value, last_avg=last_avg + ) + }} + ) > cast( + {{ + json_extract( + anomaly_config, + "change_percentage.threshold", + ) + }} as {{ numeric_type() }} + ) + + when + lower( + {{ + json_extract( + anomaly_config, + "change_percentage.direction", + ) + }} + ) + = 'down' + then + ( + {{ + change_percentage( + last_value=last_value, last_avg=last_avg + ) + }} + ) < ( + 0.0 - ( + cast( + {{ + json_extract( + anomaly_config, + "change_percentage.threshold", + ) + }} as {{ numeric_type() }} + ) + ) + ) + when + lower( + coalesce( + {{ + json_extract( + anomaly_config, + "change_percentage.direction", + ) + }}, + 'both' + ) + ) + = 'both' + then + abs( + {{ + change_percentage( + last_value=last_value, last_avg=last_avg + ) + }} + ) > cast( + {{ + json_extract( + anomaly_config, + "change_percentage.threshold", + ) + }} as {{ numeric_type() }} + ) + end + else true + end + +{% endmacro %} + +{% macro is_anomaly_from_column( + anomaly_config, + last_value, + last_avg, + z_score_value, + modified_z_score_value, + last_first_quartile, + last_iqr, + last_third_quartile +) %} + case + when + ( + lower( + coalesce( + {{ + json_extract( + anomaly_config, + "re_data_anomaly_detector.direction", + ) + }}, + 'both' + ) + ) + = 'up' + and {{ last_value }} > {{ last_avg }} + ) + or ( + lower( + coalesce( + {{ + json_extract( + anomaly_config, + "re_data_anomaly_detector.direction", + ) + }}, + 'both' + ) + ) + = 'down' + and {{ last_value }} < {{ last_avg }} + ) + or ( + lower( + coalesce( + {{ + json_extract( + anomaly_config, + "re_data_anomaly_detector.direction", + ) + }}, + 'both' + ) + ) + != 'up' + and lower( + coalesce( + {{ + json_extract( + anomaly_config, + "re_data_anomaly_detector.direction", + ) + }}, + 'both' + ) + ) + != 'down' + ) + then + case + when + {{ json_extract(anomaly_config, "re_data_anomaly_detector.name") }} + = 'z_score' + then + abs({{ z_score_value }}) > cast( + {{ + json_extract( + anomaly_config, + "re_data_anomaly_detector.threshold", + ) + }} as {{ numeric_type() }} + ) + when + {{ json_extract(anomaly_config, "re_data_anomaly_detector.name") }} + = 'modified_z_score' + then + abs({{ modified_z_score_value }}) > cast( + {{ + json_extract( + anomaly_config, + "re_data_anomaly_detector.threshold", + ) + }} as {{ numeric_type() }} + ) + when + {{ json_extract(anomaly_config, "re_data_anomaly_detector.name") }} + = 'boxplot' + then + ( + {{ last_value }} + < {{ last_first_quartile }} + - ( + cast( + {{ + json_extract( + anomaly_config, + "re_data_anomaly_detector.whisker_boundary_multiplier", + ) + }} + as {{ numeric_type() }} + ) + * {{ last_iqr }} + ) + or {{ last_value }} + > {{ last_third_quartile }} + + ( + cast( + {{ + json_extract( + anomaly_config, + "re_data_anomaly_detector.whisker_boundary_multiplier", + ) + }} + as {{ numeric_type() }} + ) + * {{ last_iqr }} + ) + ) + else false + end + else false + end +{% endmacro %} diff --git a/macros/utils/fivetran_utils/json_extract.sql b/macros/utils/fivetran_utils/json_extract.sql index 835f84c..39b80f3 100644 --- a/macros/utils/fivetran_utils/json_extract.sql +++ b/macros/utils/fivetran_utils/json_extract.sql @@ -2,39 +2,61 @@ # This file contains significant part of code derived from # https://github.com/fivetran/dbt_fivetran_utils/tree/v0.4.0 which is licensed under Apache License 2.0. #} - {% macro json_extract(string, string_path) -%} -{{ adapter.dispatch('json_extract','re_data') (string, string_path) }} + {{ adapter.dispatch("json_extract", "re_data")(string, string_path) }} {%- endmacro %} {% macro default__json_extract(string, string_path) %} - json_extract_path_text({{string}}, {{ "'" ~ string_path ~ "'" }} ) - + json_extract_path_text({{ string }}, {{ "'" ~ string_path ~ "'" }}) + {% endmacro %} {% macro snowflake__json_extract(string, string_path) %} - json_extract_path_text(try_parse_json( {{string}} ), {{ "'" ~ string_path ~ "'" }} ) + json_extract_path_text(try_parse_json({{ string }}), {{ "'" ~ string_path ~ "'" }}) {% endmacro %} {% macro redshift__json_extract(string, string_path) %} - case when is_valid_json( {{string}} ) then json_extract_path_text({{string}}, {{ "'" ~ string_path ~ "'" }} ) else null end - + case + when is_valid_json({{ string }}) + then json_extract_path_text({{ string }}, {{ "'" ~ string_path ~ "'" }}) + else null + end + {% endmacro %} {% macro bigquery__json_extract(string, string_path) %} - json_extract_scalar({{string}}, {{ "'$." ~ string_path ~ "'" }} ) + json_extract_scalar({{ string }}, {{ "'$." ~ string_path ~ "'" }}) {% endmacro %} {% macro postgres__json_extract(string, string_path) %} - {{string}}::json->>{{"'" ~ string_path ~ "'" }} + {{ string }}::json ->>{{ "'" ~ string_path ~ "'" }} + +{% endmacro %} + + +{% macro json_extract_array(string, string_path) -%} + + {{ adapter.dispatch("json_extract_array", "re_data")(string, string_path) }} + +{%- endmacro %} + +{% macro default__json_extract_array(string, string_path) %} + + json_extract_array({{ string }}, {{ "'" ~ string_path ~ "'" }}) +{%- endmacro %} + + +{% macro bigquery__json_extract_array(string, string_path) %} + + json_extract_array({{ string }}, {{ "'$." ~ string_path ~ "'" }}) {% endmacro %} diff --git a/models/alerts/re_data_anomalies.sql b/models/alerts/re_data_anomalies.sql index 58926af..8d27d65 100644 --- a/models/alerts/re_data_anomalies.sql +++ b/models/alerts/re_data_anomalies.sql @@ -1,8 +1,4 @@ -{{ - config( - materialized='view' - ) -}} +{{ config(materialized="view") }} select z.id, z.table_name, @@ -11,6 +7,7 @@ select z.z_score_value, z.modified_z_score_value, m.anomaly_detector, + c.metric_spec, z.last_value, z.last_avg, z.last_median, @@ -18,38 +15,84 @@ select z.last_median_absolute_deviation, z.last_mean_absolute_deviation, z.last_iqr, - z.last_first_quartile - (cast( {{ json_extract('m.anomaly_detector', 'whisker_boundary_multiplier') }} as {{numeric_type()}} ) * z.last_iqr) lower_bound, - z.last_third_quartile + (cast( {{ json_extract('m.anomaly_detector', 'whisker_boundary_multiplier') }} as {{numeric_type()}} ) * z.last_iqr) upper_bound, + z.last_first_quartile - ( + cast( + {{ json_extract("m.anomaly_detector", "whisker_boundary_multiplier") }} + as {{ numeric_type() }} + ) + * z.last_iqr + ) lower_bound, + z.last_third_quartile + ( + cast( + {{ json_extract("m.anomaly_detector", "whisker_boundary_multiplier") }} + as {{ numeric_type() }} + ) + * z.last_iqr + ) upper_bound, z.last_first_quartile, z.last_third_quartile, z.time_window_end, z.interval_length_sec, z.computed_on, - {{ re_data.generate_anomaly_message('z.column_name', 'z.metric', 'z.last_value', 'z.last_avg') }} as message, - {{ re_data.generate_metric_value_text('z.metric', 'z.last_value') }} as last_value_text -from - {{ ref('re_data_z_score')}} z -left join {{ ref('re_data_selected') }} m -on {{ split_and_return_nth_value('table_name', '.', 1) }} = m.database -and {{ split_and_return_nth_value('table_name', '.', 2) }} = m.schema -and {{ split_and_return_nth_value('table_name', '.', 3) }} = m.name + {{ + re_data.generate_anomaly_message( + "z.column_name", "z.metric", "z.last_value", "z.last_avg" + ) + }} as message, + {{ re_data.generate_metric_value_text("z.metric", "z.last_value") }} + as last_value_text +from {{ ref("re_data_z_score") }} z +left join + {{ ref("re_data_selected") }} m + on {{ split_and_return_nth_value("table_name", ".", 1) }} = m.database + and {{ split_and_return_nth_value("table_name", ".", 2) }} = m.schema + and {{ split_and_return_nth_value("table_name", ".", 3) }} = m.name +left join + {{ ref("re_data_selected_columns") }} c + on {{ split_and_return_nth_value("table_name", ".", 1) }} = c.database + and {{ split_and_return_nth_value("table_name", ".", 2) }} = c.schema + and {{ split_and_return_nth_value("table_name", ".", 3) }} = c.name + and z.column_name = c.column where - case when (lower(coalesce({{ json_extract('m.anomaly_detector', 'direction') }}, 'both')) = 'up' and z.last_value > z.last_avg) - or (lower(coalesce({{ json_extract('m.anomaly_detector', 'direction') }}, 'both')) = 'down' and z.last_value < z.last_avg) - or (lower(coalesce({{ json_extract('m.anomaly_detector', 'direction') }}, 'both')) != 'up' and lower(coalesce({{ json_extract('m.anomaly_detector', 'direction') }}, 'both')) != 'down') + case + when c.metric_spec is not null then - case - when {{ json_extract('m.anomaly_detector', 'name') }} = 'z_score' - then abs(z_score_value) > cast({{ json_extract('m.anomaly_detector', 'threshold') }} as {{ numeric_type() }}) - when {{ json_extract('m.anomaly_detector', 'name') }} = 'modified_z_score' - then abs(modified_z_score_value) > cast( {{ json_extract('m.anomaly_detector', 'threshold') }} as {{numeric_type()}} ) - when {{ json_extract('m.anomaly_detector', 'name') }} = 'boxplot' - then ( - z.last_value < z.last_first_quartile - (cast( {{ json_extract('m.anomaly_detector', 'whisker_boundary_multiplier') }} as {{numeric_type()}} ) * z.last_iqr) - or - z.last_value > z.last_third_quartile + (cast( {{ json_extract('m.anomaly_detector', 'whisker_boundary_multiplier') }} as {{numeric_type()}} ) * z.last_iqr) - ) - else false - end - else false + {{ + is_anomaly_from_column( + anomaly_config="c.metric_spec", + last_value="z.last_value", + last_avg="z.last_avg", + z_score_value="z.z_score_value", + modified_z_score_value="z.modified_z_score_value", + last_first_quartile="z.last_first_quartile", + last_iqr="z.last_iqr", + last_third_quartile="z.last_third_quartile", + ) + }} + and {{ + is_anomaly_absolute_threshold( + anomaly_config="c.metric_spec", last_value="z.last_value" + ) + }} + and {{ + is_anomaly_change_percentage( + anomaly_config="c.metric_spec", + last_value="z.last_value", + last_avg="z.last_avg", + ) + }} + else + {{ + is_anomaly_from_model( + anomaly_config="m.anomaly_detector", + last_value="z.last_value", + last_avg="z.last_avg", + z_score_value="z.z_score_value", + modified_z_score_value="z.modified_z_score_value", + last_first_quartile="z.last_first_quartile", + last_iqr="z.last_iqr", + last_third_quartile="z.last_third_quartile", + ) + }} + end diff --git a/models/meta/re_data_selected.sql b/models/meta/re_data_selected.sql index 34348af..c7c07df 100644 --- a/models/meta/re_data_selected.sql +++ b/models/meta/re_data_selected.sql @@ -1,6 +1,12 @@ - -select - name, schema, database, time_filter, metrics, columns, anomaly_detector, owners -from {{ ref('re_data_monitored')}} -where - selected = true \ No newline at end of file +select + name, + schema, + database, + time_filter, + metrics, + additional_metrics, + columns, + anomaly_detector, + owners +from {{ ref("re_data_monitored") }} +where selected = true diff --git a/models/meta/re_data_selected_columns.sql b/models/meta/re_data_selected_columns.sql new file mode 100644 index 0000000..f0585b4 --- /dev/null +++ b/models/meta/re_data_selected_columns.sql @@ -0,0 +1,33 @@ +-- depends_on: {{ ref('re_data_z_score') }} +-- depends_on: {{ ref('re_data_selected') }} +{% set column_metric %} + select distinct + {{ split_and_return_nth_value("table_name", ".", 1) }} as database, + {{ split_and_return_nth_value("table_name", ".", 2) }} as schema, + {{ split_and_return_nth_value("table_name", ".", 3) }} as name, + column_name, + metric + from {{ ref('re_data_z_score')}} + where column_name != '' +{% endset %} +{% set column_metric_details = run_query(column_metric) %} +{% for col in column_metric_details %} + {% set column_name = re_data.row_value(col, "column_name") %} + {% set metric = re_data.row_value(col, "metric") %} + {% set database = re_data.row_value(col, "database") %} + {% set schema = re_data.row_value(col, "schema") %} + {% set table_name = re_data.row_value(col, "name") %} + {{ + get_column_specific_anomaly( + database=database, + schema=schema, + column_name=column_name, + table_name=table_name, + metric_name=metric, + ) + }} + {%- if not loop.last %} + union all + {%- endif %} + +{% endfor %}