[DENG-8606] Generate two tables (regular and 'use_live') for queries that run more frequently than daily#8652
Conversation
bigquery_etl/cli/query.py
Outdated
| derived_path = path / project_id / dataset / table_name | ||
| derived_path.mkdir(parents=True) | ||
| if use_live: | ||
| use_live_table_name = name + use_live_slug + version |
There was a problem hiding this comment.
This results in names like my_table_v1 and my_table_use_live_v1. This is not my favorite because it means the two tables can be alphabetically separated, but it also breaks our current conventions to have anything other than the version at the end of a table name. Improving our naming standards is a larger issue that I'd prefer to not get involved in here
There was a problem hiding this comment.
quibble: IMO _use_live reads very awkwardly. I'd be inclined to have the slug just be _live, or maybe _from_live.
There was a problem hiding this comment.
I thought about _live but thought that could lead to confusion with the actual live tables. I like _from_live, I'll make that change
There was a problem hiding this comment.
fwiw there are already some queries that follow a similar pattern and do have the _live_v<n> suffix. E.g. experiment_events_live_v1, monitoring_derived.topsites_rate_fenix_release_live_v1 and more
| UNION ALL | ||
| SELECT * FROM | ||
| `{project_id}.{dataset}.{use_live_table_name}` | ||
| WHERE submission_date > ( | ||
| SELECT MAX(submission_date) | ||
| FROM `{project_id}.{dataset}.{table_name}` | ||
| )""" |
There was a problem hiding this comment.
This felt to me like the best way to combine these in the view. This does mean that if the main (daily) table is missing certain days before the most recent, that data will not be filled in by the _use_live table, but that feels appropriate -- a missing day in the stable table is a genuine problem in the data; the _use_live data is ephemeral and should not be used to paper over missing historical data
bigquery_etl/cli/query.py
Outdated
| else: | ||
| macro_file.write_text( | ||
| reformat( | ||
| f"""{{% macro {table_name}(use_live) %}} |
There was a problem hiding this comment.
Because we can only partition by day or by hour, there isn't really a simple standard SQL for this case. This is what I came up with, but I'm open to other suggestions
bigquery_etl/cli/query.py
Outdated
| parameters = [ | ||
| "submission_hour:DATE:{{(execution_date - macros.timedelta(hours=1)).strftime('%Y-%m-%d %h:00:00')}}", | ||
| ] | ||
| destination_table = f"{use_live_table_name}${{{{(execution_date - macros.timedelta(hours=1)).strftime('%Y%m%d%h)}}}}" |
There was a problem hiding this comment.
These will go away once the logic for calculating these parameters is in place
There was a problem hiding this comment.
note: Subtracting an hour like this isn't always necessary, especially if the hourly ETLs aren't scheduled at the top of the hour.
bigquery_etl/cli/query.py
Outdated
| parameters = [ | ||
| "interval_start:DATE:{{}}", | ||
| "interval_end:DATE:{{(execution_date - macros.timedelta(hours=1).strftime('%Y-%m-%d %h:%m:%s'))}}", | ||
| ] | ||
| destination_table = f"{use_live_table_name}${{{{(execution_date - macros.timedelta(hours=1)).strftime('%Y%m%d)}}}}" |
There was a problem hiding this comment.
Same as above
bigquery_etl/cli/query.py
Outdated
| WHERE submission_date > ( | ||
| SELECT MAX(submission_date) |
There was a problem hiding this comment.
This could potentially create an invalid query if the source table doesn't have submission_date. I think this is fine though. CI will show a failure in the dryrun task and it's reasonable to expect the PR creator to manually adjust the query.
There was a problem hiding this comment.
@curtismorales I suspect you used submission_date in these queries because submission_date was being used in the existing query.sql template, but I don't think that's a good approach to continue, as live/stable tables only contain submission_timestamp columns (submission_date columns are only present in derived tables).
I'd suggest taking this opportunity to consistently use submission_timestamp in the generated ETL queries and as the default partitioning column (i.e. presuming the ETL will pass through submission_timestamp, which I think is a safer bet than presuming the ETL is going to have a derived submission_date column).
bigquery_etl/cli/query.py
Outdated
| + "\n" | ||
| ) | ||
| click.echo(f"Created base query in {macro_file}") | ||
| query_file = derived_path / "query.sql" |
There was a problem hiding this comment.
might be worth creating a QUERY_FILE = "query.sql" here:
bigquery-etl/bigquery_etl/cli/query.py
Line 87 in 88cddaa
Similar to what is done for
materialized_view.sql etc
bigquery_etl/cli/query.py
Outdated
| expiration_days=30, | ||
| ) | ||
| parameters = [ | ||
| "interval_start:DATE:{{}}", |
There was a problem hiding this comment.
Should interval_start be set to something for now?
| @click.option( | ||
| "--use_live", | ||
| "--use-live", | ||
| help=( | ||
| """Using this option creates a query that consists of two tables with | ||
| different schedules based on a single base query, one that runs daily | ||
| and pulls from stable tables and another that runs more frequently and | ||
| pulls from live tables, plus a view that unions the two tables. | ||
| """ | ||
| ), | ||
| default=False, | ||
| is_flag=True, | ||
| ) |
There was a problem hiding this comment.
quibble: While the description explains the nuance/meaning of this --use-live option, IMO the option name by itself implies "use live rather than stable".
I'd be inclined to name the option something like --use-live-and-stable or --from-live-and-stable.
| @click.option( | ||
| "--hourly", | ||
| help=( | ||
| """This options is a special case of the --use-live option for | ||
| tables that update hourly. | ||
|
|
||
| Using this option creates a query that consists of two tables with | ||
| different schedules based on a single base query, one that runs daily | ||
| and pulls from stable tables and another that runs hourly and | ||
| pulls from live tables, plus a view that unions the two tables. | ||
| """ | ||
| ), | ||
| default=False, | ||
| is_flag=True, | ||
| ) |
There was a problem hiding this comment.
issues (blocking):
- While the description explains the nuance/meaning of this
--hourlyoption, IMO it's unnecessarily confusing, as just based on the name I think people could reasonably expect to be able to runbqetl query create --hourly <name>and get an hourly ETL that doesn't necessarily involve a combination of live and stable table sources. - This
--hourlyoption is currently making the live ETL table use hourly partitioning, but to date hourly partitioning has been used very rarely (I currently only see one ETL using hourly partitioning), and I don't think it'd be advisable to encourage hourly partitioning due to some drawbacks:- It precludes the ETL doing aggregation at the daily level, which is a very common use case.
- It makes it critical to ensure the success of all hourly ETL tasks to avoid missing any data, which is problematic because frequently running ETLs are more likely have failures (e.g. due to transient issues with BigQuery or GKE), and such ETL failures are prone to getting missed during Airflow triage as by the time triage happens there could easily have been subsequent successful hourly DAG runs.
I'd be inclined to remove this --hourly option and have the generated live ETL simply assume an hourly schedule with daily partitioning, as that's the most common sub-daily ETL setup being used thus far.
| no_schedule = True | ||
| elif use_live: | ||
| use_live_slug = "_use_live" | ||
| no_schedule = True |
There was a problem hiding this comment.
suggestion (non-blocking): I think it would make sense to respect the --dag argument if it's specified (for the main ETL).
There was a problem hiding this comment.
The --dag argument is still respected for the daily (non _live) table. Are you thinking that we should treat the live version as the main table? I was thinking the daily version is kind of the main table.
There was a problem hiding this comment.
I agree the daily version should be considered the main table, as is currently the case, but here you're setting no_schedule = True, which means that even if a DAG was specified via --dag it won't be used for the main daily table.
bigquery_etl/cli/query.py
Outdated
| AS SELECT * FROM | ||
| `{project_id}.{dataset}.{table_name}`""" | ||
|
|
||
| view_file.write_text(reformat(view_text) + "\n") |
There was a problem hiding this comment.
suggestion (non-blocking): I know this approach of manually appending a newline is just following the convention of what the existing code was already doing, but it's worth noting that reformat() accepts a trailing_newline boolean argument specifically for that purpose.
| view_file.write_text(reformat(view_text) + "\n") | |
| view_file.write_text(reformat(view_text, trailing_newline=True)) |
(ditto for the other similar usages)
bigquery_etl/cli/query.py
Outdated
| if use_live: | ||
| use_live_metadata_file = use_live_path / "metadata.yaml" | ||
| if hourly: | ||
| labels = {"incremental": True, "schedule": hourly} |
There was a problem hiding this comment.
suggestion (blocking):
| labels = {"incremental": True, "schedule": hourly} | |
| labels = {"incremental": True, "schedule": "hourly"} |
bigquery_etl/cli/query.py
Outdated
| "interval_start:DATE:{{}}", | ||
| "interval_end:DATE:{{(execution_date - macros.timedelta(hours=1).strftime('%Y-%m-%d %h:%m:%s'))}}", |
There was a problem hiding this comment.
suggestion (blocking):
| "interval_start:DATE:{{}}", | |
| "interval_end:DATE:{{(execution_date - macros.timedelta(hours=1).strftime('%Y-%m-%d %h:%m:%s'))}}", | |
| "interval_start:TIMESTAMP:{{}}", | |
| "interval_end:TIMESTAMP:{{(execution_date - macros.timedelta(hours=1).strftime('%Y-%m-%d %H:%M:%S'))}}", |
Though I'm not sure subtracting an extra hour would necessarily always be advisable for arbitrary intervals.
bigquery_etl/cli/query.py
Outdated
| "date_partition_parameter": None, | ||
| "parameters": parameters, | ||
| "destination_table": destination_table, | ||
| "query_file_path": use_live_path / "query.sql", |
There was a problem hiding this comment.
suggestion (blocking): Specifying query_file_path like this isn't actually necessary and makes maintenance harder.
| "query_file_path": use_live_path / "query.sql", |
bigquery_etl/cli/query.py
Outdated
| parameters = [ | ||
| "submission_hour:DATE:{{(execution_date - macros.timedelta(hours=1)).strftime('%Y-%m-%d %h:00:00')}}", | ||
| ] | ||
| destination_table = f"{use_live_table_name}${{{{(execution_date - macros.timedelta(hours=1)).strftime('%Y%m%d%h)}}}}" |
There was a problem hiding this comment.
suggestion (blocking):
| parameters = [ | |
| "submission_hour:DATE:{{(execution_date - macros.timedelta(hours=1)).strftime('%Y-%m-%d %h:00:00')}}", | |
| ] | |
| destination_table = f"{use_live_table_name}${{{{(execution_date - macros.timedelta(hours=1)).strftime('%Y%m%d%h)}}}}" | |
| parameters = [ | |
| "submission_hour:TIMESTAMP:{{(execution_date - macros.timedelta(hours=1)).strftime('%Y-%m-%d %H:00:00')}}", | |
| ] | |
| destination_table = f"{use_live_table_name}${{{{(execution_date - macros.timedelta(hours=1)).strftime('%Y%m%d%H)}}}}" |
| WHERE | ||
| {{% if use_live %}} | ||
| submission_timestamp >= @interval_start | ||
| AND submission_timestamp < @interval_end | ||
| {{% else %}} | ||
| TIMESTAMP_TRUNC(submission_date, DAY) = @submission_date | ||
| {{% endif %}} | ||
| {{% if use_live %}} | ||
| -- Overwrite the daily partition with a combination of new records for | ||
| -- the given interval (above) and existing records outside the given | ||
| -- interval (below) | ||
| UNION ALL | ||
| SELECT | ||
| * | ||
| FROM | ||
| {project_id}.{dataset}.{use_live_table_name} | ||
| WHERE | ||
| TIMESTAMP_TRUNC(submission_date, DAY) = TIMESTAMP_TRUNC(@interval_start, DAY) | ||
| AND ( | ||
| submission_timestamp < @interval_start | ||
| OR submission_timestamp >= @interval_end | ||
| ) | ||
| {{% endif %}} |
There was a problem hiding this comment.
- issue:
submission_datewouldn't be a timestamp. - suggestion (non-blocking): It would be best to quote the live ETL table name.
- note: WhiIe this approach of only running the live ETL on new records and reusing existing records is likely better for performance, it unfortunately has the same drawbacks as when using hourly partitioning that I mentioned in my comment about the
--hourlycommand line option. I'd be inclined to run the live ETL on all records for the date in question.
bigquery_etl/cli/query.py
Outdated
| FROM | ||
| {{% if use_live %}} | ||
| table_live |
There was a problem hiding this comment.
suggestion (non-blocking): IMO it would be best for the generated SQL for selecting from the live table to attempt to deduplicate the records for that date.
There was a problem hiding this comment.
The approach here was to not try to generate something that does this for the user because there isn't a clear one-size-fits-all way deduplication method. The closest I'm aware of are:
- Wrap the source table in a CTE that dedupes the entire day (e.g.,
SELECT * FROM live_table WHERE DATE(submission_timestamp) = @submission_timestamp QUALIFY ROW_NUMBER() OVER (PARTITION BY document_id ORDER BY submission_timestamp) = 1) -- this does work in the general case but implies querying the whole day so far in the source table every time, which I don't think we want to make the default behavior - Dedupe just the specific interval and exclude any document_ids already present in the destination table (or in some side table where we save all the document_ids so far) -- requires us to store document_ids in the destination table (or a side table)
I don't think either of these is a great option, and I don't want to give the user the wrong impression that this solves the deduplication problem for them out of the box. The intent instead is that the live and stable versions follow the same model as live and stable tables base tables -- we don't make any promises about deduplication in the live tables. For most low latency purposes (e.g., reports that update several times a day) the slight inaccuracy is not an issue, and the numbers will be fully correct once the daily stable table runs; in these cases, we can not worry about deduplication. If deduping is needed, the user can still write the SQL to do that in the way they prefer.
That said, I do think it makes sense to make this explicit in the generated SQL (not just in documentation elsewhere) -- I'll start with adding a comment to that effect
There was a problem hiding this comment.
Wrap the source table in a CTE that dedupes the entire day (e.g.,
SELECT * FROM live_table WHERE DATE(submission_timestamp) = @submission_timestamp QUALIFY ROW_NUMBER() OVER (PARTITION BY document_id ORDER BY submission_timestamp) = 1) -- this does work in the general case but implies querying the whole day so far in the source table every time, which I don't think we want to make the default behavior
That appears to be the most common behavior though, which to me is a decent argument for making it the default behavior:
- In
bigquery-etlI currently see 6 ETLs selecting from live tables (counting the generatedbaseline_clients_city_seen_v1ETLs as one ETL), all of which query the whole day, and 5 of which do deduplication. - In
private-bigquery-etlI currently see 12 ETLs selecting from live tables, 9 of which query the whole day, and all of which attempt some form of deduplication.
I agree there's no great option, but as I mentioned in a couple other comments, I don't think it would be a good idea to set things up so that absolutely all hourly ETL runs have to succeed to get complete data, because transient issues do happen and Airflow triage isn't great at catching failures in DAGs that run more frequently than daily. IMO querying the whole day and overwriting the corresponding date partition is the most straightforward way to avoid that issue. The main downsides are that's less efficient, and you still need to be careful that the last ETL run targeting a particular date partition was successful to avoid missing data.
Another approach that could work in some use cases is having the query only select records after the max submission_timestamp the ETL has already seen thus far, and then appending newer records to the table rather than overwriting a partition. This does have the benefits of being more efficient and completely self-healing in the event of transient failures (any successful ETL run will fix the entire table). However, it requires the submission_timestamp to be included in the ETL table, makes it a self-referential ETL (which are trickier to deal with in certain scenarios), and precludes the ability to do the same deduplication as is done for the stable tables.
@scholtzan do you have an opinion on this?
There was a problem hiding this comment.
For most low latency purposes (e.g., reports that update several times a day) the slight inaccuracy is not an issue, and the numbers will be fully correct once the daily stable table runs
Afaik, this is the case for many use cases I have seen. Small differences don't seem like a big issue, though if we have a good approach to make these numbers more accurate, that would be a plus.
Maybe we can have a separate parameter --deduplicate that adds the deduplication logic using the document_id (option 1 that Curtis mentioned earlier)?
Another approach that could work in some use cases is having the query only select records after the max submission_timestamp
I don't think it's guaranteed that new records have an increasing submission_timestamp. Afaik it's possible that records with an earlier submission_timestamps get added to the tables after records with later submission_timestamp. This could in the worst case result in missing records if we follow this approach.
bigquery_etl/cli/query.py
Outdated
| {{% if use_live %}} | ||
| table_live | ||
| {{% else %}} | ||
| table_stable | ||
| {{% endif %}} |
There was a problem hiding this comment.
quibble: I think it'd be more readable to have these placeholders match how we refer to live/stable tables.
| {{% if use_live %}} | |
| table_live | |
| {{% else %}} | |
| table_stable | |
| {{% endif %}} | |
| {{% if use_live %}} | |
| live_table | |
| {{% else %}} | |
| stable_table | |
| {{% endif %}} |
Description
This PR adds two new flags to the
bqetl query createCLI command that will generate standard ETL for queries that run more frequently than daily (and pull from live tables):--use-live: Creates an additional_use_livetable and a view that joins the two--hourly: A special case of--use-livefor queries that run hourlyThere is some minor refactoring for DRY but most of the logic here is new and should not affect the existing logic.
A few notes:
@submission_date-ish parameters based on the scheduleuse_livetable includes the somewhat gnarly calculation of parameters that currently is used -- this will be removed in (2)--no-schedule. I plan to deal with this later; I don't think it's a huge deal now that the user has to add the dag names in the metadata filesI'm adding a few more inline notesRelated Tickets & Documents
Reviewer, please follow this checklist