Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,5 @@ cython_debug/
.idea
test_artifacts/
artifacts/

pylintrc
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
{% from 'filter.sql' import get_filter_statement %}
{% from 'secondary_calculation.sql' import render_secondary_calculations %}

{% macro calculate_timeseries_metric_values(
Expand Down
2 changes: 0 additions & 2 deletions rasgotransforms/rasgotransforms/macros/distinct_values.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
{% from 'filter.sql' import get_filter_statement %}

{% macro get_distinct_vals(
columns,
target_metric,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
{% from 'aggregate_metrics.sql' import calculate_timeseries_metric_values %}
{% from 'filter.sql' import combine_filters %}
{% from 'secondary_calculation.sql' import render_secondary_calculations %}

{% macro calculate_expression_metric_values(
Expand Down
59 changes: 0 additions & 59 deletions rasgotransforms/rasgotransforms/macros/filter.sql
Original file line number Diff line number Diff line change
@@ -1,59 +0,0 @@
{% macro get_filter_statement(filters) %}
{% if filters %}
{% if filters is string %}
{{ filters }}
{% else %}
{% set logical_operator = namespace(value='AND') %}
{% for filter in filters %}
{% if filter is not string %}
{% if filter is not mapping %}
{% set filter = dict(filter) %}
{% endif %}
{% if filter is mapping and 'compoundBoolean' in filter and filter['compoundBoolean'] %}
{% set logical_operator.value = filter['compoundBoolean'] %}
{% endif %}
{% endif %}
{% endfor %}
(
{% for filter in filters %}
{% if filter is not string and filter is not mapping %}
{% set filter = dict(filter) %}
{% endif %}
{% if 'columnName' in filter %}
{% do filter.__setitem__('column_name', filter.columnName) %}
{% endif %}
{% if 'comparisonValue' in filter %}
{% do filter.__setitem__('comparison_value', filter.comparisonValue) %}
{% endif %}
{% if filter is not mapping %}
{{ logical_operator.value + ' ' if not loop.first }}{{ filter }}
{% elif filter.operator|upper == 'CONTAINS' %}
{{ logical_operator.value + ' ' if not loop.first }}{{ filter.column_name }} like '%{{ filter.comparison_value }}%'
{% else %}
{{ logical_operator.value + ' ' if not loop.first }}{{ filter.column_name }} {{ filter.operator }} {{ filter.comparison_value }}
{% endif %}
{% endfor %}
)
{% endif %}
{% else %}
true
{% endif %}
{% endmacro %}


{% macro combine_filters(filters_a, filters_b, condition) %}
{% set condition = condition if condition is defined else 'AND' %}
{% if filters_a and not filters_b %}
{{ get_filter_statement(filters_a) }}
{% elif filters_b and not filters_a %}
{{ get_filter_statement(filters_b) }}
{% elif not filters_a and not filters_b %}
true
{% else %}
(
{{ get_filter_statement(filters_a)|indent }}
{{ condition }}
{{ get_filter_statement(filters_b)|indent }}
)
{% endif %}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
{% from 'filter.sql' import get_filter_statement %}
{% from 'secondary_calculation.sql' import render_secondary_calculations %}

{% macro calculate_timeseries_metric_values(
Expand Down
128 changes: 106 additions & 22 deletions rasgotransforms/rasgotransforms/render/environment.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import functools
import re
from datetime import datetime, timedelta
from itertools import combinations, permutations, product
from typing import Callable, Optional, Dict, Union
from pathlib import Path
from os.path import getmtime
from pathlib import Path
from typing import Callable, Dict, List, Optional, Union

from jinja2 import Environment, BaseLoader
from jinja2.exceptions import TemplateNotFound
Expand All @@ -12,19 +13,34 @@
from rasgotransforms.exceptions import RenderException
from rasgotransforms.main import DataWarehouse

ALLOWED_OPERATORS = (
">",
"<",
"=",
"<>",
">=",
"<=",
"CONTAINS",
"IS NULL",
"IS NOT NULL",
"NOT CONTAINS",
"IN",
"NOT IN",
)


class RasgoEnvironment(Environment):
def __init__(self, run_query: Optional[Callable] = None, dw_type: Optional[str] = None, *args, **kwargs):
super().__init__(*args, extensions=self.rasgo_extensions, loader=RasgoLoader(), **kwargs)
if not dw_type:
dw_type = 'snowflake'
dw_type = "snowflake"
self._dw_type = DataWarehouse(dw_type)
for filter_name, method in self.rasgo_filters.items():
self.filters[filter_name] = method
for name, value in self.rasgo_globals.items():
self.globals[name] = value
self._run_query = run_query
self.globals['run_query'] = self._run_query
self.globals["run_query"] = self._run_query

@property
def dw_type(self) -> DataWarehouse:
Expand All @@ -36,14 +52,16 @@ def dw_type(self, dw_type: str):

@property
def rasgo_extensions(self):
return ['jinja2.ext.do', 'jinja2.ext.loopcontrols']
return ["jinja2.ext.do", "jinja2.ext.loopcontrols"]

@property
def rasgo_globals(self):
return {
"min": min,
"max": max,
"cleanse_name": cleanse_template_symbol,
"get_filter_statement": functools.partial(get_filter_statement, dw_type=self.dw_type),
"combine_filters": functools.partial(combine_filters, dw_type=self.dw_type),
"raise_exception": raise_exception,
"itertools": {"combinations": combinations, "permutations": permutations, "product": product},
"dw_type": lambda: self.dw_type.value,
Expand Down Expand Up @@ -73,57 +91,123 @@ def render(

Source columns can is a mapping of table names to columns (e.g. {table_name: {column_name: column_type}})
"""
arguments['source_table'] = source_table
arguments["source_table"] = source_table

if not override_globals:
override_globals = {}

if source_columns and 'get_columns' not in override_globals:
if source_columns and "get_columns" not in override_globals:

def get_columns(fqtn):
return source_columns[fqtn]

override_globals['get_columns'] = get_columns
if 'get_columns' in override_globals:
self.globals['get_columns'] = override_globals['get_columns']
override_globals["get_columns"] = get_columns
if "get_columns" in override_globals:
self.globals["get_columns"] = override_globals["get_columns"]
try:
template = self.from_string(source_code)
rendered = template.render(**arguments, **override_globals)
except Exception as e:
raise RenderException(e)
raise RenderException(e) from e
return trim_blank_lines(rendered)


def cleanse_template_symbol(symbol: str) -> str:
symbol = str(symbol).strip().replace(' ', '_').replace('-', '_')
symbol = re.sub('[^A-Za-z0-9_]+', '', symbol)
symbol = '_' + symbol if not symbol or symbol[0].isdecimal() else symbol
symbol = str(symbol).strip().replace(" ", "_").replace("-", "_")
symbol = re.sub("[^A-Za-z0-9_]+", "", symbol)
symbol = "_" + symbol if not symbol or symbol[0].isdecimal() else symbol
return symbol


def combine_filters(
filters_a: Union[List, str],
filters_b: Union[List, str],
condition: str,
dw_type: DataWarehouse,
) -> str:
"""
Parse & combine multiple filters, return a single SQL statement
"""
condition = condition or "AND"
if filters_a and not filters_b:
return get_filter_statement(filters_a, dw_type)
elif filters_b and not filters_a:
return get_filter_statement(filters_b, dw_type)
elif not filters_a and not filters_b:
return "TRUE"
return f"({get_filter_statement(filters_a, dw_type)} {condition} {get_filter_statement(filters_b, dw_type)})"


def get_filter_statement(
filters: Union[List, str],
dw_type: DataWarehouse,
) -> str:
"""
Parse a list of string or dict filters to a simple SQL string
"""
if isinstance(filters, str):
return filters

filter_string = "TRUE"
compound_boolean = "AND"
for fil in filters:
if isinstance(fil, dict):
# Handle variable casing from past versions: only support snake eventually
column_name = fil.get("column_name", fil.get("columnName"))
operator = fil.get("operator", "").upper()
comparison_value = fil.get("comparison_value", fil.get("comparisonValue"))
compound_boolean = fil.get("compound_boolean", fil.get("compoundBoolean", compound_boolean))

# Handle override operators
if operator not in ALLOWED_OPERATORS:
raise_exception(f"operator {operator} is not supported")
if operator == "CONTAINS":
operator = "LIKE"

# Parse complex comparison values
if isinstance(comparison_value, dict):
# Relative Date filter
if comparison_value.get("type", "").upper() == "RELATIVEDATE":
date_part = comparison_value.get("date_part", comparison_value.get("datePart"))
interval = f"{'-' if comparison_value['direction'] == 'past' else ''}{comparison_value['offset']}"
if not (date_part and interval):
raise_exception("relativedate comparisons must pass arguments: date_part, offset, direction")
print(dw_type)
if dw_type is DataWarehouse.BIGQUERY:
comparison_value = f"DATE_ADD(CURRENT_DATE(), INTERVAL {interval} {date_part})"
elif dw_type is DataWarehouse.SNOWFLAKE:
comparison_value = f"DATEADD({date_part}, {interval}, CURRENT_DATE)"
else:
comparison_value = f"(CURRENT_DATE + INTERVAL '{interval} {date_part}')"
filter_string += f" {compound_boolean} {column_name} {operator} {comparison_value} \n"
elif isinstance(fil, str) and fil != "":
filter_string += f" {compound_boolean} {fil} \n"
return filter_string


def raise_exception(message: str) -> None:
raise Exception(message)


def trim_blank_lines(sql: str) -> str:
return re.sub(r'[\n][\s]*\n', '\n', sql)
return re.sub(r"[\n][\s]*\n", "\n", sql)


def get_timedelta(time_grain: str, interval: int) -> timedelta:
time_grain = time_grain.lower()
if time_grain == 'hour':
if time_grain == "hour":
return timedelta(hours=interval)
elif time_grain == 'day':
elif time_grain == "day":
return timedelta(days=interval)
elif time_grain == 'week':
elif time_grain == "week":
return timedelta(weeks=interval)
elif time_grain == 'month':
elif time_grain == "month":
interval *= 31
return timedelta(days=interval)
elif time_grain == 'quarter':
elif time_grain == "quarter":
interval *= 122
return timedelta(days=interval)
elif time_grain == 'year':
elif time_grain == "year":
interval *= 365
return timedelta(days=interval)
else:
Expand All @@ -133,7 +217,7 @@ def get_timedelta(time_grain: str, interval: int) -> timedelta:
class RasgoLoader(BaseLoader):
def __init__(self, root_path=None):
if not root_path:
root_path = Path(__file__).parent.parent / 'macros'
root_path = Path(__file__).parent.parent / "macros"
self.root_path = root_path

def get_source(self, environment: RasgoEnvironment, template):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
{{ output }} as {{ output_col_name }}
{%- endmacro -%}

{%- macro get_filter_statement(columns) -%}
{%- macro get_where_clause(columns) -%}
{%- set filter_statements = [] -%}
{%- for column in columns.keys() %}
{%- set output_col_name = column if columns[column].name is not defined else cleanse_name(columns[column].name) -%}
Expand All @@ -68,7 +68,7 @@
{{ "where " if loop.first else "" }}{{ filter_statement }}{{ " \n and " if not loop.last else "" }}
{%- endfor %}
{%- endmacro -%}
{%- set filter_statement = get_filter_statement(columns) -%}
{%- set filter_statement = get_where_clause(columns) -%}

{%- if filter_statement == "" -%}
select
Expand Down
4 changes: 2 additions & 2 deletions rasgotransforms/rasgotransforms/transforms/clean/clean.sql
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
{{ output }} as {{ output_col_name }}
{%- endmacro -%}

{%- macro get_filter_statement(columns) -%}
{%- macro get_where_clause(columns) -%}
{%- set filter_statements = [] -%}
{%- for column in columns.keys() %}
{%- set output_col_name = column if columns[column].name is not defined else cleanse_name(columns[column].name) -%}
Expand All @@ -46,7 +46,7 @@
{{ "where " if loop.first else "" }}{{ filter_statement }}{{ " \n and " if not loop.last else "" }}
{%- endfor %}
{%- endmacro -%}
{%- set filter_statement = get_filter_statement(columns) -%}
{%- set filter_statement = get_where_clause(columns) -%}

{%- if filter_statement == "" -%}
select
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
{% from 'filter.sql' import get_filter_statement %}
{% if items is not defined %}
{% if filter_statements is not defined %}
{{ raise_exception('items is empty: there are no filters to apply') }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
{% from 'expression_metrics.sql' import calculate_expression_metric_values %}
{% from 'distinct_values.sql' import get_distinct_vals %}
{% from 'pivot.sql' import pivot_plot_values %}
{% from 'filter.sql' import get_filter_statement, combine_filters %}
{% from 'secondary_calculation.sql' import render_secondary_calculations, adjust_start_date %}
{% set dimensions = group_by_dimensions if group_by_dimensions is defined else [] %}
{% set max_num_groups = max_num_groups if max_num_groups is defined else 10 %}
Expand Down
1 change: 0 additions & 1 deletion rasgotransforms/rasgotransforms/transforms/plot/plot.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
{% from 'expression_metrics.sql' import calculate_expression_metric_values %}
{% from 'distinct_values.sql' import get_distinct_vals %}
{% from 'pivot.sql' import pivot_plot_values %}
{% from 'filter.sql' import get_filter_statement, combine_filters %}
{% from 'secondary_calculation.sql' import render_secondary_calculations, adjust_start_date %}
{% set dimensions = dimensions if dimensions is defined else [] %}
{% set max_num_groups = max_num_groups if max_num_groups is defined else 10 %}
Expand Down
2 changes: 1 addition & 1 deletion rasgotransforms/rasgotransforms/version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""
Package version for pypi
"""
__version__ = "2.4.1"
__version__ = "2.5.0a1"