From 71064f61d9b29ed1ca0042b5799d39671451f100 Mon Sep 17 00:00:00 2001 From: parisni Date: Sun, 20 Jul 2025 15:22:54 +0200 Subject: [PATCH 1/2] feat: allow result reuse see https://aws.amazon.com/blogs/big-data/reduce-cost-and-improve-query-performance-with-amazon-athena-query-result-reuse/ fix config param --- athenacli/athenaclirc | 4 ++++ athenacli/config.py | 25 +++++++++++++++++++++++-- athenacli/main.py | 17 +++++++++++++---- athenacli/sqlexecute.py | 37 +++++++++++++++++++++++++------------ 4 files changed, 65 insertions(+), 18 deletions(-) diff --git a/athenacli/athenaclirc b/athenacli/athenaclirc index 8448c26..4ab2882 100644 --- a/athenacli/athenaclirc +++ b/athenacli/athenaclirc @@ -17,6 +17,10 @@ s3_staging_dir = '' # Name of athena workgroup that you want to use work_group = '' # e.g. primary +# Query result reuse settings (requires Athena engine version 3) +result_reuse_enable = False +result_reuse_minutes = 60 + [main] # log_file location. log_file = ~/.athenacli/app.log diff --git a/athenacli/config.py b/athenacli/config.py index 6f92b5f..c64f420 100644 --- a/athenacli/config.py +++ b/athenacli/config.py @@ -19,15 +19,19 @@ class AWSConfig(object): def __init__(self, aws_access_key_id, aws_secret_access_key, - region, s3_staging_dir, work_group, profile, config): + region, s3_staging_dir, work_group, profile, config, + result_reuse_enable=None, result_reuse_minutes=None): key = 'aws_profile %s' % profile try: _cfg = config[key] - except: + except Exception as e: # this assumes that the profile is only known in the regular AWS config -> the boto lib will get it # from there. This is especially important if we have some kind of additional temporary session keys for # which the login fails if we set aws_access_key_id/aws_secret_access_key here _cfg = defaultdict(lambda: None) + # For result reuse settings, provide explicit defaults when profile section is missing + _cfg['result_reuse_enable'] = 'False' + _cfg['result_reuse_minutes'] = '60' self.aws_access_key_id = self.get_val(aws_access_key_id, _cfg['aws_access_key_id']) self.aws_secret_access_key = self.get_val(aws_secret_access_key, _cfg['aws_secret_access_key']) @@ -36,6 +40,23 @@ def __init__(self, aws_access_key_id, aws_secret_access_key, self.work_group = self.get_val(work_group, _cfg['work_group']) # enable connection to assume role self.role_arn = self.get_val(_cfg.get('role_arn')) + # query result reuse settings + config_reuse_enable = _cfg.get('result_reuse_enable') + if config_reuse_enable and isinstance(config_reuse_enable, str): + config_reuse_enable = config_reuse_enable.lower() in ('true', '1', 'yes', 'on') + elif config_reuse_enable is None: + config_reuse_enable = False + self.result_reuse_enable = result_reuse_enable if result_reuse_enable is not None else config_reuse_enable + + config_reuse_minutes = _cfg.get('result_reuse_minutes') + if config_reuse_minutes and isinstance(config_reuse_minutes, str): + try: + config_reuse_minutes = int(config_reuse_minutes) + except ValueError: + config_reuse_minutes = 60 + elif config_reuse_minutes is None: + config_reuse_minutes = 60 + self.result_reuse_minutes = self.get_val(result_reuse_minutes, config_reuse_minutes, 60) def get_val(self, *vals): """Return the first True value in `vals` list, otherwise return None.""" diff --git a/athenacli/main.py b/athenacli/main.py index cd41f51..3b349f8 100644 --- a/athenacli/main.py +++ b/athenacli/main.py @@ -61,7 +61,8 @@ class AthenaCli(object): MAX_LEN_PROMPT = 45 def __init__(self, region, aws_access_key_id, aws_secret_access_key, - s3_staging_dir, work_group, athenaclirc, profile, database): + s3_staging_dir, work_group, athenaclirc, profile, database, + result_reuse_enable=None, result_reuse_minutes=None): config_files = [DEFAULT_CONFIG_FILE] if os.path.exists(os.path.expanduser(athenaclirc)): @@ -71,7 +72,8 @@ def __init__(self, region, aws_access_key_id, aws_secret_access_key, self.init_logging(_cfg['main']['log_file'], _cfg['main']['log_level']) aws_config = AWSConfig( - aws_access_key_id, aws_secret_access_key, region, s3_staging_dir, work_group, profile, _cfg + aws_access_key_id, aws_secret_access_key, region, s3_staging_dir, work_group, profile, _cfg, + result_reuse_enable, result_reuse_minutes ) try: @@ -200,7 +202,9 @@ def connect(self, aws_config, database): s3_staging_dir = aws_config.s3_staging_dir, work_group = aws_config.work_group, role_arn = aws_config.role_arn, - database = database + database = database, + result_reuse_enable = aws_config.result_reuse_enable, + result_reuse_minutes = aws_config.result_reuse_minutes ) def handle_editor_command(self, text): @@ -616,10 +620,13 @@ def is_mutating(status): @click.option('--work_group', type=str, help="Amazon Athena workgroup in which query is run, default is primary") @click.option('--athenaclirc', default=ATHENACLIRC, type=click.Path(dir_okay=False), help="Location of athenaclirc file.") @click.option('--profile', type=str, default='default', help='AWS profile') +@click.option('--result-reuse-enable', default=None, type=bool, help='Enable query result reuse (requires Athena engine version 3)') +@click.option('--result-reuse-minutes', type=int, help='TTL for query result reuse in minutes (default: 60)') @click.option('--table-format', type=str, default='csv', help='Table format used with -e option.') @click.argument('database', default='default', nargs=1) def cli(execute, region, aws_access_key_id, aws_secret_access_key, - s3_staging_dir, work_group, athenaclirc, profile, table_format, database): + s3_staging_dir, work_group, athenaclirc, profile, result_reuse_enable, + result_reuse_minutes, table_format, database): '''A Athena terminal client with auto-completion and syntax highlighting. \b @@ -651,6 +658,8 @@ def cli(execute, region, aws_access_key_id, aws_secret_access_key, work_group=work_group, athenaclirc=athenaclirc, profile=profile, + result_reuse_enable=result_reuse_enable, + result_reuse_minutes=result_reuse_minutes, database=database ) diff --git a/athenacli/sqlexecute.py b/athenacli/sqlexecute.py index 2b39e38..2b81a95 100644 --- a/athenacli/sqlexecute.py +++ b/athenacli/sqlexecute.py @@ -27,7 +27,9 @@ def __init__( s3_staging_dir, work_group, role_arn, - database + database, + result_reuse_enable=False, + result_reuse_minutes=60 ): # Handle database parameter that may contain catalog.database format if database and '.' in database: @@ -42,6 +44,8 @@ def __init__( self.role_arn = role_arn self.database = database self.catalog_name = catalog_name or 'AwsDataCatalog' + self.result_reuse_enable = result_reuse_enable + self.result_reuse_minutes = result_reuse_minutes self.connect() def connect(self, database=None): @@ -50,17 +54,26 @@ def connect(self, database=None): catalog_name, database = database.split('.', 1) else: catalog_name = None - conn = pyathena.connect( - aws_access_key_id=self.aws_access_key_id, - aws_secret_access_key=self.aws_secret_access_key, - region_name=self.region_name, - s3_staging_dir=self.s3_staging_dir, - work_group=self.work_group, - schema_name=database or self.database, - role_arn=self.role_arn, - poll_interval=0.2, # 200ms - catalog_name=catalog_name or self.catalog_name - ) + + # Prepare connection parameters + conn_params = { + 'aws_access_key_id': self.aws_access_key_id, + 'aws_secret_access_key': self.aws_secret_access_key, + 'region_name': self.region_name, + 's3_staging_dir': self.s3_staging_dir, + 'work_group': self.work_group, + 'schema_name': database or self.database, + 'role_arn': self.role_arn, + 'poll_interval': 0.2, # 200ms + 'catalog_name': catalog_name or self.catalog_name + } + + # Add result reuse parameters if enabled + if self.result_reuse_enable: + conn_params['result_reuse_enable'] = True + conn_params['result_reuse_minutes'] = self.result_reuse_minutes + + conn = pyathena.connect(**conn_params) self.database = database or self.database if hasattr(self, 'conn'): From 12e7777b7537d5223774e88d32ee90feee9b00eb Mon Sep 17 00:00:00 2001 From: parisni Date: Sun, 20 Jul 2025 15:40:44 +0200 Subject: [PATCH 2/2] changelog and contrib --- AUTHORS.rst | 1 + changelog.md | 13 ++++++++++--- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/AUTHORS.rst b/AUTHORS.rst index 42d2c6c..488d924 100644 --- a/AUTHORS.rst +++ b/AUTHORS.rst @@ -28,6 +28,7 @@ Contributors: * Alex Gaynor * Branch Vincent * Jacob Williams + * Nicolas Paris Creator: -------- diff --git a/changelog.md b/changelog.md index 30a0944..e1c8f28 100644 --- a/changelog.md +++ b/changelog.md @@ -1,6 +1,14 @@ -TBD -===== +1.7.0 (TBD) +============ +Features: +--------- +* Add support for Amazon Athena query result reuse with configurable TTL + - Add `--result-reuse-enable` CLI option to enable/disable query result reuse + - Add `--result-reuse-minutes` CLI option to configure TTL in minutes + - Add `result_reuse_enable` and `result_reuse_minutes` configuration options in athenaclirc + - Requires Athena engine version 3 + - Can drastically improve query performance for repeated queries * Allow catalog to be specified as part of the database argument. ([.]) 1.6.8 (2022/05/15) @@ -25,7 +33,6 @@ Bugfix: ================== * Update the default branch to 'main' - 1.6.4 (2022/04/24) ==================