-
Notifications
You must be signed in to change notification settings - Fork 882
feat: PostgreSQL support for DomainAudit #7665
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
feat: PostgreSQL support for DomainAudit #7665
Conversation
Signed-off-by: Joanna Lau <118241363+joannalauu@users.noreply.github.com>
- Created data structures of `DomainAuditLogRow` and `DomainAuditLogFilter` - Implemented sqlplugin interfaces for `InsertIntoDomainAuditLog` and `SelectFromDomainAuditLogs` functions - Implemented `InsertIntoDomainAuditLog` and `SelectFromDomainAuditLogs` functions in Postgresql - Updated SQL factory to create `newSQLDomainAuditStore` Signed-off-by: Joanna Lau <118241363+joannalauu@users.noreply.github.com>
Signed-off-by: Joanna Lau <118241363+joannalauu@users.noreply.github.com>
|
👋 Thanks! I've got this in my plans to review, unfortunately I've been blocked by some on-call work. Will get to this ASAP! |
| argIndex := 5 | ||
|
|
||
| // Handle pagination token | ||
| if filter.PageTokenMinCreatedTime != nil && filter.PageTokenMinEventID != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the logic of handling pagination should be done inside sql_domain_audit_store.go to avoid duplicate logic inside each SQL plugin.
You can check the code in sql_execution_store.go to see how we handle pagination in that way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the relevant function:
cadence/common/persistence/sql/sql_execution_store.go
Lines 783 to 837 in 57b7157
| func (m *sqlExecutionStore) ListConcreteExecutions( | |
| ctx context.Context, | |
| request *p.ListConcreteExecutionsRequest, | |
| ) (*p.InternalListConcreteExecutionsResponse, error) { | |
| filter := &sqlplugin.ExecutionsFilter{} | |
| if len(request.PageToken) > 0 { | |
| err := gobDeserialize(request.PageToken, &filter) | |
| if err != nil { | |
| return nil, &types.InternalServiceError{ | |
| Message: fmt.Sprintf("ListConcreteExecutions failed. Error: %v", err), | |
| } | |
| } | |
| } else { | |
| filter = &sqlplugin.ExecutionsFilter{ | |
| ShardID: m.shardID, | |
| WorkflowID: "", | |
| } | |
| } | |
| filter.Size = request.PageSize | |
| executions, err := m.db.SelectFromExecutions(ctx, filter) | |
| if err != nil { | |
| if err == sql.ErrNoRows { | |
| return &p.InternalListConcreteExecutionsResponse{}, nil | |
| } | |
| return nil, convertCommonErrors(m.db, "ListConcreteExecutions", "", err) | |
| } | |
| if len(executions) == 0 { | |
| return &p.InternalListConcreteExecutionsResponse{}, nil | |
| } | |
| lastExecution := executions[len(executions)-1] | |
| nextFilter := &sqlplugin.ExecutionsFilter{ | |
| ShardID: m.shardID, | |
| WorkflowID: lastExecution.WorkflowID, | |
| } | |
| token, err := gobSerialize(nextFilter) | |
| if err != nil { | |
| return nil, &types.InternalServiceError{ | |
| Message: fmt.Sprintf("ListConcreteExecutions failed. Error: %v", err), | |
| } | |
| } | |
| concreteExecutions, err := m.populateInternalListConcreteExecutions(executions) | |
| if err != nil { | |
| return nil, &types.InternalServiceError{ | |
| Message: fmt.Sprintf("ListConcreteExecutions failed. Error: %v", err), | |
| } | |
| } | |
| return &p.InternalListConcreteExecutionsResponse{ | |
| Executions: concreteExecutions, | |
| NextPageToken: token, | |
| }, nil | |
| } |
At the sql level we perform the pagination (add the page token to the filter, handle the result of the request and return the next page token) while at the datastore implementation level (e.g postgreSQL/SQL) we just implement a list query:
cadence/common/persistence/sql/sqlplugin/postgres/execution.go
Lines 196 to 215 in 57b7157
| func (pdb *db) SelectFromExecutions(ctx context.Context, filter *sqlplugin.ExecutionsFilter) ([]sqlplugin.ExecutionsRow, error) { | |
| dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(filter.ShardID), pdb.GetTotalNumDBShards()) | |
| var rows []sqlplugin.ExecutionsRow | |
| var err error | |
| if len(filter.DomainID) == 0 && filter.Size > 0 { | |
| err = pdb.driver.SelectContext(ctx, dbShardID, &rows, listExecutionQuery, filter.ShardID, filter.WorkflowID, filter.Size) | |
| if err != nil { | |
| return nil, err | |
| } | |
| } else { | |
| var row sqlplugin.ExecutionsRow | |
| err = pdb.driver.GetContext(ctx, dbShardID, &row, getExecutionQuery, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID) | |
| if err != nil { | |
| return nil, err | |
| } | |
| rows = append(rows, row) | |
| } | |
| return rows, err | |
| } |
Signed-off-by: Joanna Lau <118241363+joannalauu@users.noreply.github.com>
170adae to
ab40f58
Compare
Signed-off-by: Joanna Lau <118241363+joannalauu@users.noreply.github.com>
Signed-off-by: Joanna Lau <118241363+joannalauu@users.noreply.github.com>
ab40f58 to
5dcbfe1
Compare
common/persistence/sql/sqlplugin/postgres/domain_audit_log_test.go
Outdated
Show resolved
Hide resolved
Signed-off-by: Joanna Lau <118241363+joannalauu@users.noreply.github.com>
09ac487 to
2c6171e
Compare
Signed-off-by: Joanna Lau <118241363+joannalauu@users.noreply.github.com>
2c6171e to
d567cd1
Compare
Code Review ✅ Approved 6 resolved / 6 findingsWell-structured PostgreSQL DomainAudit implementation with good test coverage. Previous findings have been addressed: nil DataBlob now correctly maps to empty bytes for NOT NULL columns, and SelectContext is used instead of GetContext for multi-row queries. The "SchemaUpdateCqlFiles" manifest key is a project-wide convention, not a bug. ✅ 6 resolved✅ Bug: Schema NOT NULL on
|
| Auto-apply | Compact |
|
|
Was this helpful? React with 👍 / 👎 | Gitar
c-warren
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good! Thanks for making those changes.
| domain_id BYTEA NOT NULL, | ||
| event_id BYTEA NOT NULL, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd err on the side of TEXT here so that the IDs are human readable natively in a databases tooling.
| identity TEXT NOT NULL, | ||
| identity_type TEXT NOT NULL, | ||
| comment TEXT NOT NULL DEFAULT '', | ||
| PRIMARY KEY (domain_id, operation_type, created_time, event_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a composite primary key in Cassandra because its support for secondary indexes is limited (in our implementation anyway). Because of that we have to stuff everything we want to query by into the primary key. For this implementation I think its fine to keep that behaviour as we will be passing all of these things from the layer above postgres, but I think you could equivalently just use the event_id as the primary key so that it is possible to fetch a row purely by event_id.
I'd also recommend adding secondary indexes on created_time and operation_type so that we can effectively filter by those records when building other views (e.g the most recent updates to visibility for example).
| DomainID serialization.UUID | ||
| EventID serialization.UUID | ||
| StateBefore []byte | ||
| StateBeforeEncoding string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have an encoding type in commonconsts which we should use here - we want to ensure that we're using a valid/supported encoding.
| StateBefore []byte | ||
| StateBeforeEncoding string | ||
| StateAfter []byte | ||
| StateAfterEncoding string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
| domainID := serialization.MustParseUUID("d1111111-1111-1111-1111-111111111111") | ||
| eventID := serialization.MustParseUUID("e1111111-1111-1111-1111-111111111111") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: I recommend creating a unique UUID whenever the test is run as a best practice. It prevents hardcoding/reusing expected UUIDs and ensures the code path can't be affected by any other tests. In this particular test it doesn't matter much, but something to keep in mind.
| auditLog := &persistence.InternalDomainAuditLog{ | ||
| EventID: row.EventID.String(), | ||
| DomainID: row.DomainID.String(), | ||
| OperationType: row.OperationType, | ||
| CreatedTime: row.CreatedTime, | ||
| LastUpdatedTime: row.LastUpdatedTime, | ||
| Identity: row.Identity, | ||
| IdentityType: row.IdentityType, | ||
| Comment: row.Comment, | ||
| } | ||
|
|
||
| if len(row.StateBefore) > 0 { | ||
| auditLog.StateBefore = &persistence.DataBlob{ | ||
| Encoding: constants.EncodingType(row.StateBeforeEncoding), | ||
| Data: row.StateBefore, | ||
| } | ||
| } | ||
|
|
||
| if len(row.StateAfter) > 0 { | ||
| auditLog.StateAfter = &persistence.DataBlob{ | ||
| Encoding: constants.EncodingType(row.StateAfterEncoding), | ||
| Data: row.StateAfter, | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be handy to have as a common deserialization function for whenever we add GetAuditLog(event_id) for example. It'll also be easier to test deserialization from a PostgreSQL row and any error handling we have.
What changed?
Implement SQL plugin interface and add PostgreSQL support for DomainAudit.
domain_audit_logtable in Postgresql schemaDomainAuditLogRowandDomainAuditLogFilterin sqlpluginInsertIntoDomainAuditLogandSelectFromDomainAuditLogsfunctionsInsertIntoDomainAuditLogandSelectFromDomainAuditLogsfunctions in PostgresqlnewSQLDomainAuditStoreFixes: #7602
Why?
DomainAudit allows all modifications made to a domain (e.g failovers) to be stored and retrieved. This has previously only been supported by NoSQL databases (Cassandra, MongoDB, DynamoDB). The SQL plugin is now configured to handle DomainAudit requests, so other SQL databases can also be easily configured to support DomainAudit. PostgreSQL support for DomainAudit is now added.
How did you test it?
go test -v ./common/persistence/sql -run TestCreateDomainAuditLog TestGetDomainAuditLogs TestFactoryNewDomainAuditStore TestInsertIntoDomainAuditLog TestSelectFromDomainAuditLogs TestGetDataBlobEncoding TestGetDataBlobBytesTestPostgresSQLDomainAuditPersistenceon github actionsPotential risks
Release notes
Added PostgreSQL support for DomainAudit
Documentation Changes
N/A
Potential Breaking change
Detailed Description
[In-depth description of the changes made to the schema or interfaces, specifying new fields, removed fields, or modified data structures]
domain_audit_logtable in Postgresql schemaDomainAuditLogRowandDomainAuditLogFilterin sqlpluginInsertIntoDomainAuditLogandSelectFromDomainAuditLogsfunctionsInsertIntoDomainAuditLogandSelectFromDomainAuditLogsfunctions in PostgresqlnewSQLDomainAuditStoreImpact Analysis
Testing Plan
go test -v ./common/persistence/sql -run TestCreateDomainAuditLog TestGetDomainAuditLogs TestFactoryNewDomainAuditStore TestInsertIntoDomainAuditLog TestSelectFromDomainAuditLogs TestGetDataBlobEncoding TestGetDataBlobBytesTestPostgresSQLDomainAuditPersistenceon github actionsRollout Plan
nil, nilshould revert effects of all changesReviewer Validation
PR Description Quality (check these before reviewing code):
go testinvocation)