-
Notifications
You must be signed in to change notification settings - Fork 706
Do commits at the end of transactions. #3093
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements transaction commit handling at transaction boundaries to ensure data consistency and prevent cascading abort errors. The changes move commit operations from the end of query execution to PostgreSQL's transaction callback system, allowing proper handling of transaction aborts without triggering recursive error loops.
Key Changes:
- Implements transaction callbacks to commit changes at
PRE_COMMITevents - Handles transaction aborts gracefully without cascading errors
- Adds flush operations after COPY statements to ensure data consistency
- Refactors dataset refresh mechanism to use a single mutable dataset instead of separate read-only instances
Reviewed changes
Copilot reviewed 17 out of 18 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| cpp/deeplake_pg/pg_deeplake.cpp | Adds transaction callback registration and implementation for commit/rollback handling |
| cpp/deeplake_pg/extension_init.cpp | Adds COPY statement flush logic and error handling in executor_end with PG_TRY/CATCH |
| cpp/deeplake_pg/table_storage.cpp | Adds dataset deletion error handling for missing datasets |
| cpp/deeplake_pg/table_storage.hpp | Adds commit_all() method for transaction-level commits |
| cpp/deeplake_pg/table_data.cpp | Refactors refresh mechanism and adds flush() call to commit() |
| cpp/deeplake_pg/table_data.hpp | Updates refresh() signature and dataset types |
| cpp/deeplake_pg/table_data_impl.hpp | Implements new refresh mechanism and removes auto-commit from flush() |
| cpp/deeplake_pg/pg_deeplake.hpp | Refactors index_info to use dataset() accessor method |
| cpp/deeplake_pg/table_scan_impl.hpp | Removes cached has_streamer_columns_ vector |
| cpp/deeplake_pg/duckdb_pg_convert.cpp | Adds support for converting nested DuckDB lists to PostgreSQL multi-dimensional arrays |
| cpp/deeplake_pg/duckdb_deeplake_scan.cpp | Adds debug logging and error handling; refactors column view access |
| postgres/tests/sql/update_table.sql | Changes ORDER BY clause from vector distance to id for deterministic results |
| postgres/tests/sql/bulk_insert.sql | Adds row count assertion after first COPY operation |
| postgres/tests/py_tests/test_transaction_abort_handling.py | New comprehensive test suite for transaction abort scenarios |
| postgres/tests/py_tests/test_drop_table_missing_dataset.py | New test for DROP TABLE with externally deleted datasets |
| postgres/tests/py_tests/test_array.py | Adds extensive array operation test coverage |
| postgres/tests/py_tests/requirements.txt | Adds deeplake dependency |
| postgres/scripts/run_pg_server.sh | Adds commented debug logging configuration |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (!duckdb::UUID::FromString(std::string(value), uuid_value)) { | ||
| std::string tmp(value); | ||
| elog(ERROR, "Failed to parse UUID string: %s", tmp.c_str()); |
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The temporary string tmp on line 866 is redundant since value is already converted to std::string on line 865. Consider using the string from line 865 directly in the error message or constructing it once.
| if (!duckdb::UUID::FromString(std::string(value), uuid_value)) { | |
| std::string tmp(value); | |
| elog(ERROR, "Failed to parse UUID string: %s", tmp.c_str()); | |
| std::string value_str(value); | |
| if (!duckdb::UUID::FromString(value_str, uuid_value)) { | |
| elog(ERROR, "Failed to parse UUID string: %s", value_str.c_str()); |
| pg::table_storage::instance().rollback_all(); | ||
| ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to flush table storage"))); | ||
| elog(WARNING, "Failed to flush data during executor end, changes rolled back"); |
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The warning message on line 979 doesn't provide any details about what error occurred during flush. Consider including the error details from the error state before flushing it, to aid in debugging and troubleshooting.
| pg::table_storage::instance().rollback_all(); | |
| ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to flush table storage"))); | |
| elog(WARNING, "Failed to flush data during executor end, changes rolled back"); | |
| ErrorData* edata = CopyErrorData(); | |
| pg::table_storage::instance().rollback_all(); | |
| if (edata != nullptr && edata->message != nullptr) | |
| { | |
| elog(WARNING, | |
| "Failed to flush data during executor end, changes rolled back: %s", | |
| edata->message); | |
| } | |
| else | |
| { | |
| elog(WARNING, "Failed to flush data during executor end, changes rolled back"); | |
| } | |
| FreeErrorData(edata); |
| finally: | ||
| try: | ||
| await db_conn.execute("DROP TABLE IF EXISTS test_abort_insert") | ||
| except: |
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bare except clause catches all exceptions including system exceptions like KeyboardInterrupt. Use a more specific exception type or at minimum except Exception: to avoid catching system-exiting exceptions.
| except: | |
| except Exception: | |
| # Ignore cleanup errors to avoid masking test failures |
| except: | ||
| pass |
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bare except clause catches all exceptions including system exceptions like KeyboardInterrupt. Use a more specific exception type or at minimum except Exception: to avoid catching system-exiting exceptions.
| except: | ||
| pass |
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bare except clause catches all exceptions including system exceptions like KeyboardInterrupt. Use a more specific exception type or at minimum except Exception: to avoid catching system-exiting exceptions.
| except: | ||
| pass |
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bare except clause catches all exceptions including system exceptions like KeyboardInterrupt. Use a more specific exception type or at minimum except Exception: to avoid catching system-exiting exceptions.
| except: | ||
| pass |
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bare except clause catches all exceptions including system exceptions like KeyboardInterrupt. Use a more specific exception type or at minimum except Exception: to avoid catching system-exiting exceptions.
| except: | ||
| pass |
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bare except clause catches all exceptions including system exceptions like KeyboardInterrupt. Use a more specific exception type or at minimum except Exception: to avoid catching system-exiting exceptions.
| print("✓ Inserted test data") | ||
|
|
||
| # Delete the dataset directly using deeplake API | ||
| deeplake.delete(dataset_path) |
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The deeplake.delete() call should be wrapped in error handling to ensure test cleanup proceeds even if deletion fails unexpectedly.
| print("✓ Both datasets exist") | ||
|
|
||
| # Delete one dataset externally | ||
| deeplake.delete(dataset_path_deleted) |
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The deeplake.delete() call should be wrapped in error handling to ensure test cleanup proceeds even if deletion fails unexpectedly.



🚀 🚀 Pull Request
Impact
Description
Things to be aware of
Things to worry about
Additional Context