-
Notifications
You must be signed in to change notification settings - Fork 127
[DENG-8606] Generate two tables (regular and 'use_live') for queries that run more frequently than daily #8652
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 11 commits
6d8e0cd
cdeab6c
9d54b7e
8b05ccb
835852e
d80018e
88965cd
df881ce
e4f5f65
d88a103
88cddaa
be5c3de
be5a193
4c63b3a
b4836ca
8e13127
ddff5a7
d404827
c064c86
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -152,8 +152,36 @@ def query(ctx): | |||||||||||||||||||||
| default=False, | ||||||||||||||||||||||
| is_flag=True, | ||||||||||||||||||||||
| ) | ||||||||||||||||||||||
| @click.option( | ||||||||||||||||||||||
| "--use_live", | ||||||||||||||||||||||
| "--use-live", | ||||||||||||||||||||||
| help=( | ||||||||||||||||||||||
| """Using this option creates a query that consists of two tables with | ||||||||||||||||||||||
| different schedules based on a single base query, one that runs daily | ||||||||||||||||||||||
| and pulls from stable tables and another that runs more frequently and | ||||||||||||||||||||||
| pulls from live tables, plus a view that unions the two tables. | ||||||||||||||||||||||
| """ | ||||||||||||||||||||||
| ), | ||||||||||||||||||||||
| default=False, | ||||||||||||||||||||||
| is_flag=True, | ||||||||||||||||||||||
| ) | ||||||||||||||||||||||
| @click.option( | ||||||||||||||||||||||
| "--hourly", | ||||||||||||||||||||||
| help=( | ||||||||||||||||||||||
| """This options is a special case of the --use-live option for | ||||||||||||||||||||||
| tables that update hourly. | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| Using this option creates a query that consists of two tables with | ||||||||||||||||||||||
| different schedules based on a single base query, one that runs daily | ||||||||||||||||||||||
| and pulls from stable tables and another that runs hourly and | ||||||||||||||||||||||
| pulls from live tables, plus a view that unions the two tables. | ||||||||||||||||||||||
| """ | ||||||||||||||||||||||
| ), | ||||||||||||||||||||||
| default=False, | ||||||||||||||||||||||
| is_flag=True, | ||||||||||||||||||||||
| ) | ||||||||||||||||||||||
|
Comment on lines
168
to
182
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issues (blocking):
I'd be inclined to remove this |
||||||||||||||||||||||
| @click.pass_context | ||||||||||||||||||||||
| def create(ctx, name, sql_dir, project_id, owner, dag, no_schedule): | ||||||||||||||||||||||
| def create(ctx, name, sql_dir, project_id, owner, dag, no_schedule, use_live, hourly): | ||||||||||||||||||||||
| """CLI command for creating a new query.""" | ||||||||||||||||||||||
| # create directory structure for query | ||||||||||||||||||||||
| try: | ||||||||||||||||||||||
|
|
@@ -173,49 +201,63 @@ def create(ctx, name, sql_dir, project_id, owner, dag, no_schedule): | |||||||||||||||||||||
| ) | ||||||||||||||||||||||
| sys.exit(1) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| derived_path = None | ||||||||||||||||||||||
| view_path = None | ||||||||||||||||||||||
| create_view_path = False | ||||||||||||||||||||||
| view_exist_ok = False | ||||||||||||||||||||||
| path = Path(sql_dir) | ||||||||||||||||||||||
| if hourly: | ||||||||||||||||||||||
| use_live = True | ||||||||||||||||||||||
| use_live_slug = "_hourly" | ||||||||||||||||||||||
| no_schedule = True | ||||||||||||||||||||||
| elif use_live: | ||||||||||||||||||||||
| use_live_slug = "_use_live" | ||||||||||||||||||||||
| no_schedule = True | ||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggestion (non-blocking): I think it would make sense to respect the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree the daily version should be considered the main table, as is currently the case, but here you're setting |
||||||||||||||||||||||
|
|
||||||||||||||||||||||
| if dataset.endswith("_derived"): | ||||||||||||||||||||||
| # create directory for this table | ||||||||||||||||||||||
| derived_path = path / project_id / dataset / (name + version) | ||||||||||||||||||||||
| derived_path.mkdir(parents=True) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| # create a directory for the corresponding view | ||||||||||||||||||||||
| view_path = path / project_id / dataset.replace("_derived", "") / name | ||||||||||||||||||||||
| create_view_path = True | ||||||||||||||||||||||
| # new versions of existing tables may already have a view | ||||||||||||||||||||||
| view_path.mkdir(parents=True, exist_ok=True) | ||||||||||||||||||||||
| view_exist_ok = True | ||||||||||||||||||||||
| else: | ||||||||||||||||||||||
| # check if there is a corresponding derived dataset | ||||||||||||||||||||||
| # check if there is a corresponding derived dataset. If so, create | ||||||||||||||||||||||
| # the view path | ||||||||||||||||||||||
| if (path / project_id / (dataset + "_derived")).exists(): | ||||||||||||||||||||||
| derived_path = path / project_id / (dataset + "_derived") / (name + version) | ||||||||||||||||||||||
| derived_path.mkdir(parents=True) | ||||||||||||||||||||||
| view_path = path / project_id / dataset / name | ||||||||||||||||||||||
| view_path.mkdir(parents=True) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| dataset = dataset + "_derived" | ||||||||||||||||||||||
| else: | ||||||||||||||||||||||
| # some dataset that is not specified as _derived | ||||||||||||||||||||||
| # don't automatically create views | ||||||||||||||||||||||
| derived_path = path / project_id / dataset / (name + version) | ||||||||||||||||||||||
| derived_path.mkdir(parents=True) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| click.echo(f"Created query in {derived_path}") | ||||||||||||||||||||||
| create_view_path = True | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| table_name = name + version | ||||||||||||||||||||||
| derived_path = path / project_id / dataset / table_name | ||||||||||||||||||||||
| derived_path.mkdir(parents=True) | ||||||||||||||||||||||
| if use_live: | ||||||||||||||||||||||
| use_live_table_name = name + use_live_slug + version | ||||||||||||||||||||||
|
||||||||||||||||||||||
| use_live_path = path / project_id / dataset / (name + use_live_slug + version) | ||||||||||||||||||||||
| use_live_path.mkdir(parents=True) | ||||||||||||||||||||||
| if create_view_path: | ||||||||||||||||||||||
| view_path = path / project_id / dataset.replace("_derived", "") / name | ||||||||||||||||||||||
| view_path.mkdir(parents=True, exist_ok=view_exist_ok) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| if view_path and not (view_file := view_path / "view.sql").exists(): | ||||||||||||||||||||||
| if create_view_path and not (view_file := view_path / "view.sql").exists(): | ||||||||||||||||||||||
| # Don't overwrite the view_file if it already exists | ||||||||||||||||||||||
| click.echo(f"Created corresponding view in {view_path}") | ||||||||||||||||||||||
| view_dataset = dataset.replace("_derived", "") | ||||||||||||||||||||||
| view_file.write_text( | ||||||||||||||||||||||
| reformat( | ||||||||||||||||||||||
| f"""CREATE OR REPLACE VIEW | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| if use_live: | ||||||||||||||||||||||
| view_text = f"""CREATE OR REPLACE VIEW | ||||||||||||||||||||||
| `{project_id}.{view_dataset}.{name}` | ||||||||||||||||||||||
| AS SELECT * FROM | ||||||||||||||||||||||
| `{project_id}.{dataset}.{name}{version}`""" | ||||||||||||||||||||||
| ) | ||||||||||||||||||||||
| + "\n" | ||||||||||||||||||||||
| ) | ||||||||||||||||||||||
| `{project_id}.{dataset}.{table_name}` | ||||||||||||||||||||||
| UNION ALL | ||||||||||||||||||||||
| SELECT * FROM | ||||||||||||||||||||||
| `{project_id}.{dataset}.{use_live_table_name}` | ||||||||||||||||||||||
| WHERE submission_date > ( | ||||||||||||||||||||||
| SELECT MAX(submission_date) | ||||||||||||||||||||||
|
||||||||||||||||||||||
| FROM `{project_id}.{dataset}.{table_name}` | ||||||||||||||||||||||
| )""" | ||||||||||||||||||||||
|
Comment on lines
247
to
253
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This felt to me like the best way to combine these in the view. This does mean that if the main (daily) table is missing certain days before the most recent, that data will not be filled in by the |
||||||||||||||||||||||
| else: | ||||||||||||||||||||||
| view_text = f"""CREATE OR REPLACE VIEW | ||||||||||||||||||||||
| `{project_id}.{view_dataset}.{name}` | ||||||||||||||||||||||
| AS SELECT * FROM | ||||||||||||||||||||||
| `{project_id}.{dataset}.{table_name}`""" | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| view_file.write_text(reformat(view_text) + "\n") | ||||||||||||||||||||||
|
||||||||||||||||||||||
| view_file.write_text(reformat(view_text) + "\n") | |
| view_file.write_text(reformat(view_text, trailing_newline=True)) |
(ditto for the other similar usages)
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (non-blocking): IMO it would be best for the generated SQL for selecting from the live table to attempt to deduplicate the records for that date.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The approach here was to not try to generate something that does this for the user because there isn't a clear one-size-fits-all way deduplication method. The closest I'm aware of are:
- Wrap the source table in a CTE that dedupes the entire day (e.g.,
SELECT * FROM live_table WHERE DATE(submission_timestamp) = @submission_timestamp QUALIFY ROW_NUMBER() OVER (PARTITION BY document_id ORDER BY submission_timestamp) = 1) -- this does work in the general case but implies querying the whole day so far in the source table every time, which I don't think we want to make the default behavior - Dedupe just the specific interval and exclude any document_ids already present in the destination table (or in some side table where we save all the document_ids so far) -- requires us to store document_ids in the destination table (or a side table)
I don't think either of these is a great option, and I don't want to give the user the wrong impression that this solves the deduplication problem for them out of the box. The intent instead is that the live and stable versions follow the same model as live and stable tables base tables -- we don't make any promises about deduplication in the live tables. For most low latency purposes (e.g., reports that update several times a day) the slight inaccuracy is not an issue, and the numbers will be fully correct once the daily stable table runs; in these cases, we can not worry about deduplication. If deduping is needed, the user can still write the SQL to do that in the way they prefer.
That said, I do think it makes sense to make this explicit in the generated SQL (not just in documentation elsewhere) -- I'll start with adding a comment to that effect
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrap the source table in a CTE that dedupes the entire day (e.g.,
SELECT * FROM live_table WHERE DATE(submission_timestamp) = @submission_timestamp QUALIFY ROW_NUMBER() OVER (PARTITION BY document_id ORDER BY submission_timestamp) = 1) -- this does work in the general case but implies querying the whole day so far in the source table every time, which I don't think we want to make the default behavior
That appears to be the most common behavior though, which to me is a decent argument for making it the default behavior:
- In
bigquery-etlI currently see 6 ETLs selecting from live tables (counting the generatedbaseline_clients_city_seen_v1ETLs as one ETL), all of which query the whole day, and 5 of which do deduplication. - In
private-bigquery-etlI currently see 12 ETLs selecting from live tables, 9 of which query the whole day, and all of which attempt some form of deduplication.
I agree there's no great option, but as I mentioned in a couple other comments, I don't think it would be a good idea to set things up so that absolutely all hourly ETL runs have to succeed to get complete data, because transient issues do happen and Airflow triage isn't great at catching failures in DAGs that run more frequently than daily. IMO querying the whole day and overwriting the corresponding date partition is the most straightforward way to avoid that issue. The main downsides are that's less efficient, and you still need to be careful that the last ETL run targeting a particular date partition was successful to avoid missing data.
Another approach that could work in some use cases is having the query only select records after the max submission_timestamp the ETL has already seen thus far, and then appending newer records to the table rather than overwriting a partition. This does have the benefits of being more efficient and completely self-healing in the event of transient failures (any successful ETL run will fix the entire table). However, it requires the submission_timestamp to be included in the ETL table, makes it a self-referential ETL (which are trickier to deal with in certain scenarios), and precludes the ability to do the same deduplication as is done for the stable tables.
@scholtzan do you have an opinion on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For most low latency purposes (e.g., reports that update several times a day) the slight inaccuracy is not an issue, and the numbers will be fully correct once the daily stable table runs
Afaik, this is the case for many use cases I have seen. Small differences don't seem like a big issue, though if we have a good approach to make these numbers more accurate, that would be a plus.
Maybe we can have a separate parameter --deduplicate that adds the deduplication logic using the document_id (option 1 that Curtis mentioned earlier)?
Another approach that could work in some use cases is having the query only select records after the max submission_timestamp
I don't think it's guaranteed that new records have an increasing submission_timestamp. Afaik it's possible that records with an earlier submission_timestamps get added to the tables after records with later submission_timestamp. This could in the worst case result in missing records if we follow this approach.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
quibble: I think it'd be more readable to have these placeholders match how we refer to live/stable tables.
| {{% if use_live %}} | |
| table_live | |
| {{% else %}} | |
| table_stable | |
| {{% endif %}} | |
| {{% if use_live %}} | |
| live_table | |
| {{% else %}} | |
| stable_table | |
| {{% endif %}} |
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we can only partition by day or by hour, there isn't really a simple standard SQL for this case. This is what I came up with, but I'm open to other suggestions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- issue:
submission_datewouldn't be a timestamp. - suggestion (non-blocking): It would be best to quote the live ETL table name.
- note: WhiIe this approach of only running the live ETL on new records and reusing existing records is likely better for performance, it unfortunately has the same drawbacks as when using hourly partitioning that I mentioned in my comment about the
--hourlycommand line option. I'd be inclined to run the live ETL on all records for the date in question.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be worth creating a QUERY_FILE = "query.sql" here:
bigquery-etl/bigquery_etl/cli/query.py
Line 87 in 88cddaa
| MATERIALIZED_VIEW = "materialized_view.sql" |
Similar to what is done for
materialized_view.sql etc
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (blocking):
| labels = {"incremental": True, "schedule": hourly} | |
| labels = {"incremental": True, "schedule": "hourly"} |
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These will go away once the logic for calculating these parameters is in place
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: Subtracting an hour like this isn't always necessary, especially if the hourly ETLs aren't scheduled at the top of the hour.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (blocking):
| parameters = [ | |
| "submission_hour:DATE:{{(execution_date - macros.timedelta(hours=1)).strftime('%Y-%m-%d %h:00:00')}}", | |
| ] | |
| destination_table = f"{use_live_table_name}${{{{(execution_date - macros.timedelta(hours=1)).strftime('%Y%m%d%h)}}}}" | |
| parameters = [ | |
| "submission_hour:TIMESTAMP:{{(execution_date - macros.timedelta(hours=1)).strftime('%Y-%m-%d %H:00:00')}}", | |
| ] | |
| destination_table = f"{use_live_table_name}${{{{(execution_date - macros.timedelta(hours=1)).strftime('%Y%m%d%H)}}}}" |
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should interval_start be set to something for now?
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (blocking):
| "interval_start:DATE:{{}}", | |
| "interval_end:DATE:{{(execution_date - macros.timedelta(hours=1).strftime('%Y-%m-%d %h:%m:%s'))}}", | |
| "interval_start:TIMESTAMP:{{}}", | |
| "interval_end:TIMESTAMP:{{(execution_date - macros.timedelta(hours=1).strftime('%Y-%m-%d %H:%M:%S'))}}", |
Though I'm not sure subtracting an extra hour would necessarily always be advisable for arbitrary intervals.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (blocking): Specifying query_file_path like this isn't actually necessary and makes maintenance harder.
| "query_file_path": use_live_path / "query.sql", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
quibble: While the description explains the nuance/meaning of this
--use-liveoption, IMO the option name by itself implies "use live rather than stable".I'd be inclined to name the option something like
--use-live-and-stableor--from-live-and-stable.