Conversation
* *: add basic support for shared lock Signed-off-by: zyguan <zhongyangguan@gmail.com> * update kvproto Signed-off-by: zyguan <zhongyangguan@gmail.com> * make some minor updates Signed-off-by: zyguan <zhongyangguan@gmail.com> * add basic tests for LockKeys Signed-off-by: zyguan <zhongyangguan@gmail.com> * bump kvproto Signed-off-by: zyguan <zhongyangguan@gmail.com> * shared lock: support rollback/resolve, add tests (tikv#2) * implement resolve lock Signed-off-by: you06 <you1474600@gmail.com> * remote comment code Signed-off-by: you06 <you1474600@gmail.com> * add license Signed-off-by: you06 <you1474600@gmail.com> * add test for gc shared lock Signed-off-by: you06 <you1474600@gmail.com> * address comment Signed-off-by: you06 <you1474600@gmail.com> --------- Signed-off-by: you06 <you1474600@gmail.com> * fix test (tikv#3) Signed-off-by: you06 <you1474600@gmail.com> * txnkv: prevent shared locks from being resolved by accidently Signed-off-by: zyguan <zhongyangguan@gmail.com> --------- Signed-off-by: zyguan <zhongyangguan@gmail.com> Signed-off-by: you06 <you1474600@gmail.com> Co-authored-by: you06 <you1474600@gmail.com>
* add metrics for shared lock Signed-off-by: you06 <you1474600@gmail.com> * remove SharedLockKeysDuration Signed-off-by: you06 <you1474600@gmail.com> * add span in tracing Signed-off-by: you06 <you1474600@gmail.com> --------- Signed-off-by: you06 <you1474600@gmail.com>
…tikv#1820) Signed-off-by: zyguan <zhongyangguan@gmail.com>
* update tests Signed-off-by: you06 <you1474600@gmail.com> * stabilize test Signed-off-by: you06 <you1474600@gmail.com> --------- Signed-off-by: you06 <you1474600@gmail.com>
Signed-off-by: you06 <you1474600@gmail.com>
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
📝 WalkthroughWalkthroughImplements shared-lock support end-to-end: new tests, lock flags and memdb flags, shared-mode pessimistic ops, 2PC/shared-lock accounting, prewrite/resolve handling for SharedLockInfos, GC/scan expansion, metrics, and some test helper exports and small refactors. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as KVTxn
participant Locker as PessimisticHandler
participant Store as KVStore
participant Resolver as LockResolver
Client->>Locker: lockKeys(ctx, lockCtx)
alt lockCtx.InShareMode == true
Client->>Locker: build mutation with Op_SharedPessimisticLock
else
Client->>Locker: build mutation with Op_PessimisticLock
end
Locker->>Store: send pessimistic lock request
Store-->>Locker: reply (ok or key error with SharedLockInfos)
alt reply contains SharedLockInfos or conflicts
Locker->>Resolver: handleKeyErrorForResolve(keyError)
Resolver->>Resolver: expand SharedLockInfos → individual locks
Resolver->>Resolver: apply TTL/threshold filtering & resolution
Resolver-->>Locker: resolved locks / skip wrapper
end
Locker-->>Client: result
Client->>Client: UpdateFlags(key, SetKeyLockedInShareMode / SetKeyLockedInExclusiveMode)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@kv/keyflags.go`:
- Around line 81-86: The code allows flagKeyLockedInShareMode to be set without
flagKeyLocked; update ApplyFlagsOps to enforce the invariant by rejecting or
asserting when SetKeyLockedInShareMode or SetKeyLockedInExclusiveMode is applied
unless SetKeyLocked is also present, and modify DelKeyLocked to clear both
flagKeyLocked and flagKeyLockedInShareMode (i.e., ensure origin &=
^(flagKeyLocked | flagKeyLockedInShareMode)) so the share flag cannot persist
after unlocking; reference ApplyFlagsOps, DelKeyLocked, SetKeyLocked,
SetKeyLockedInShareMode, SetKeyLockedInExclusiveMode, flagKeyLocked,
flagKeyLockedInShareMode, and note persistentFlags/initKeysAndMutations remain
as-is.
In `@txnkv/transaction/txn_test.go`:
- Around line 110-128: Update the typo in the test failure message: in
TestLockKeys change the require.FailNow call inside the requireNoRequest closure
(used by txn.store.client.onSend) to say "locking keys in optimistic mode do not
invoke SendRequest" (fix "optismistic" → "optimistic") so the error string is
spelled correctly when txn.lockKeys is expected not to call SendRequest; no
logic changes needed (references: TestLockKeys, requireNoRequest, txn.lockKeys,
newTestTxn, GetMemBuffer().GetFlags).
🧹 Nitpick comments (6)
tikv/gc.go (1)
290-301: Guard against emptySharedLockInfosto avoid dropping locks.If
GetSharedLockInfos()can be non-nil but empty, the wrapper lock will be skipped and no lock will be returned for that entry. Consider guarding withlen(...) > 0and falling back to the wrapper otherwise. Please confirm kvproto’s guarantee here.♻️ Proposed defensive tweak
- for _, lockInfo := range locksInfo { - if sharedLockInfos := lockInfo.GetSharedLockInfos(); sharedLockInfos != nil { + for _, lockInfo := range locksInfo { + if sharedLockInfos := lockInfo.GetSharedLockInfos(); len(sharedLockInfos) > 0 { // expand shared lock into multiple locks, and drop the dummy wrapper lock. for _, sharedLockInfo := range sharedLockInfos { locks = append(locks, txnlock.NewLock(sharedLockInfo)) } - continue - } - locks = append(locks, txnlock.NewLock(lockInfo)) + } else { + locks = append(locks, txnlock.NewLock(lockInfo)) + } }txnkv/transaction/txn_test.go (1)
59-86: Consider:GetOracle()returns nil which could cause NPE if accessed.The
mockStore.GetOracle()returnsnil. While this works for the current tests that don't require the oracle, it could cause unexpected panics if future tests or the code under test attempts to use it.Consider returning a mock oracle
+type mockOracle struct{} + +func (o *mockOracle) GetTimestamp(ctx context.Context, opt *oracle.Option) (uint64, error) { + return 1, nil +} + +// ... other required interface methods + func (m *mockStore) GetOracle() oracle.Oracle { - return nil + return &mockOracle{} }txnkv/transaction/txn.go (1)
660-660: Verify: TODO for pipelined DML shared lock handling.This TODO indicates that pipelined DML does not prewrite shared locks even when
flags.HasLockedInShareMode()is true. Ensure this is tracked and addressed if pipelined DML with shared locks becomes a requirement.Do you want me to open a new issue to track this task?
integration_tests/shared_lock_test.go (3)
43-60: Consider documenting the rationale for the specific TTL/backoff values.The setup modifies global state (
ManagedLockTTLto 3s,CommitMaxBackoffto 1s). While this is necessary for testing, a brief comment explaining why these specific values were chosen would improve maintainability.Add clarifying comments
func (s *testSharedLockSuite) SetupSuite() { + // Set shorter TTL for faster test execution while still allowing lock heartbeat validation atomic.StoreUint64(&transaction.ManagedLockTTL, 3000) // 3s + // Reduce backoff to speed up conflict resolution tests atomic.StoreUint64(&transaction.CommitMaxBackoff, 1000) s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("reachable")`)) }
84-134: Consider argument order ins.Equalfor clearer error messages.The testify convention is
s.Equal(expected, actual). Several assertions have the arguments reversed, which affects error message clarity when tests fail.Proposed fixes for argument order
- s.Equal(txn1.GetCommitter().GetPrimaryKey(), pk1) + s.Equal(pk1, txn1.GetCommitter().GetPrimaryKey())Apply similar changes to lines 102, 112, etc.
113-133: Test relies on timing which may cause flakiness.The test uses
time.Sleep(500 * time.Millisecond)to wait for the lock to be blocked, and then verifies timing withafterRelease.After(beforeRelease). In high-load CI environments, this could potentially be flaky.Consider using a synchronization mechanism to ensure txn3 has started waiting before checking the timing.
…master-shared-lock
Signed-off-by: you06 <you1474600@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@txnkv/transaction/pessimistic.go`:
- Around line 303-321: The code treats a non-nil but empty sharedLockInfos slice
as if shared locks exist and skips wrapper-lock handling; update the condition
around keyErr.GetLocked() / sharedLockInfos so you only enter the shared-lock
branch when sharedLockInfos has elements (e.g., check sharedLockInfos != nil &&
len(sharedLockInfos) > 0), leaving empty slices to fall through to the
wrapper-lock logic (affecting the lock construction via txnlock.NewLock and
respect of skipResolveThresholdMs).
cfzjywxk
left a comment
There was a problem hiding this comment.
Does TiKV return Locked.SharedLockInfos wrappers for Prewrite?
There are some implementation differences, some path like lock_resolver.go use ExtractLockFromKeyErr to handle errors which doesn’t expand SharedLockInfos.
| // to indicate the lock mode. If this flag is set, then flagKeyLocked must also be set, indicating that the key has | ||
| // been locked in share mode. If only flagKeyLocked is set (and flagKeyLockedInShareMode is not), then it indicates | ||
| // that the key has been locked in exclusive mode. | ||
| flagKeyLockedInShareMode |
There was a problem hiding this comment.
Should this be a persistent flag? MemDB staging keeps only AndPersistent() flags when reverting values, would the 2PC convert the shared lock to exclusive lock type unexpectedly?
|
|
||
| var _ oracle.Oracle = (*unimplementedOracle)(nil) | ||
|
|
||
| type unimplementedOracle struct{} |
There was a problem hiding this comment.
For test purpose, should they be put into "*_test.go" file or test directory?
There was a problem hiding this comment.
By now the unimplementedOracle is used in txnkv/tranasaction/txn_test.go only, but it's similar to those things in test_probe.go, providing the possibility of being imported by *_test.go files in other packages.
| locks = make([]*txnlock.Lock, 0, len(locksInfo)) | ||
| for _, lockInfo := range locksInfo { | ||
| if sharedLockInfos := lockInfo.GetSharedLockInfos(); sharedLockInfos != nil { | ||
| // expand shared lock into multiple locks, and drop the dummy wrapper lock. |
There was a problem hiding this comment.
After expanding, len(locks) no longer matches the server-side Limit, but ResolveLocksForRange uses
len(locks) < scanLimit to decide region completion or advance. This can cause extra rescans and confusing.
If this is already considered, we may leave a TODO here.
There was a problem hiding this comment.
Better to limit the exact number of locks in TiKV side, I'll open a PR to compatible with the limit parameter of ScanLock RPC.
Signed-off-by: you06 <you1474600@gmail.com> address comments Signed-off-by: you06 <you1474600@gmail.com>
Signed-off-by: you06 <you1474600@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@internal/locate/region_cache.go`:
- Around line 764-770: Change the comment above NewTestRegionCache to a proper
GoDoc that begins with the exported function name and fix the typo; e.g.,
replace the current "// only used fot test." with a GoDoc sentence like
"NewTestRegionCache returns a RegionCache configured for tests." so the comment
references NewTestRegionCache and describes its purpose concisely; keep the
function body (initializing RegionCache, c.codec = apicodec.NewCodecV1(...),
c.bg = newBackgroundRunner(...), c.mu = *newRegionIndexMu(nil)) unchanged.
| // only used fot test. | ||
| func newTestRegionCache() *RegionCache { | ||
| func NewTestRegionCache() *RegionCache { | ||
| c := &RegionCache{} | ||
| c.codec = apicodec.NewCodecV1(apicodec.ModeTxn) | ||
| c.bg = newBackgroundRunner(context.Background()) | ||
| c.mu = *newRegionIndexMu(nil) | ||
| return c |
There was a problem hiding this comment.
Doc comment should match exported name (and fix typo).
Exported functions should have GoDoc-style comments; also fix the typo.
💡 Suggested update
-// only used fot test.
+// NewTestRegionCache creates a RegionCache for tests only.
func NewTestRegionCache() *RegionCache {📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // only used fot test. | |
| func newTestRegionCache() *RegionCache { | |
| func NewTestRegionCache() *RegionCache { | |
| c := &RegionCache{} | |
| c.codec = apicodec.NewCodecV1(apicodec.ModeTxn) | |
| c.bg = newBackgroundRunner(context.Background()) | |
| c.mu = *newRegionIndexMu(nil) | |
| return c | |
| // NewTestRegionCache creates a RegionCache for tests only. | |
| func NewTestRegionCache() *RegionCache { | |
| c := &RegionCache{} | |
| c.codec = apicodec.NewCodecV1(apicodec.ModeTxn) | |
| c.bg = newBackgroundRunner(context.Background()) | |
| c.mu = *newRegionIndexMu(nil) | |
| return c | |
| } |
🤖 Prompt for AI Agents
In `@internal/locate/region_cache.go` around lines 764 - 770, Change the comment
above NewTestRegionCache to a proper GoDoc that begins with the exported
function name and fix the typo; e.g., replace the current "// only used fot
test." with a GoDoc sentence like "NewTestRegionCache returns a RegionCache
configured for tests." so the comment references NewTestRegionCache and
describes its purpose concisely; keep the function body (initializing
RegionCache, c.codec = apicodec.NewCodecV1(...), c.bg =
newBackgroundRunner(...), c.mu = *newRegionIndexMu(nil)) unchanged.
Signed-off-by: you06 <you1474600@gmail.com>
…master-shared-lock
Signed-off-by: you06 <you1474600@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@integration_tests/shared_lock_test.go`:
- Around line 89-139: The test currently assumes txn3's exclusive LockKeys call
will return a conflict (NotNil) but uses a 1000ms lock wait while only sleeping
500ms before releasing share locks, making the expectation flaky; change the
goroutine to capture the error from txn3.LockKeys (callsite:
txn3.LockKeys(context.Background(), kv.NewLockCtx(...), key)) into a channel
instead of asserting NotNil inline, then after releasing txn1/txn2 read the
error from that channel and assert it is nil (i.e., exclusive lock succeeds
after release). Also ensure you create txn3's lock context with the same/new
kv.NewLockCtx so its wait behavior remains deterministic.
- Around line 141-192: TestExclusiveLockBlockSharedLock uses timing-sensitive
assertions; make goroutines explicitly signal when they begin attempting the
shared LockKeys so the main test only releases txn1 after both goroutines are
confirmed to be waiting. Add two "started" channels (e.g., txn2Started and
txn3Started) alongside txn2LockDone/txn3LockDone in
TestExclusiveLockBlockSharedLock, have each goroutine send to its started
channel immediately before calling txnX.LockKeys(...), then have the main thread
wait to receive both started signals before recording beforeRelease and
committing/rolling-back txn1; keep the existing reads from
txn2LockDone/txn3LockDone and the time-after assertions to verify they blocked
until txn1 released.
| func (s *testSharedLockSuite) TestSharedLockBlockExclusiveLock() { | ||
| for _, commit := range []bool{true, false} { | ||
| txn1 := s.begin() | ||
| txn2 := s.begin() | ||
| txn3 := s.begin() | ||
|
|
||
| pk1 := []byte("TestSharedLockBlockExclusiveLock_pk1") | ||
| pk2 := []byte("TestSharedLockBlockExclusiveLock_pk2") | ||
| pk3 := []byte("TestSharedLockBlockExclusiveLock_pk3") | ||
| key := []byte("TestSharedLockBlockExclusiveLock_shared_key") | ||
|
|
||
| s.Nil(txn1.LockKeys(context.Background(), kv.NewLockCtx(s.getTS(), 1000, time.Now()), pk1)) | ||
| s.Equal(txn1.GetCommitter().GetPrimaryKey(), pk1) | ||
| lockctx1 := kv.NewLockCtx(s.getTS(), 1000, time.Now()) | ||
| lockctx1.InShareMode = true | ||
| s.Nil(txn1.LockKeys(context.Background(), lockctx1, key)) | ||
|
|
||
| s.Nil(txn2.LockKeys(context.Background(), kv.NewLockCtx(s.getTS(), 1000, time.Now()), pk2)) | ||
| s.Equal(txn2.GetCommitter().GetPrimaryKey(), pk2) | ||
| lockctx2 := kv.NewLockCtx(s.getTS(), 1000, time.Now()) | ||
| lockctx2.InShareMode = true | ||
| s.Nil(txn2.LockKeys(context.Background(), lockctx2, key)) | ||
|
|
||
| flags, err := txn2.GetMemBuffer().GetFlags(key) | ||
| s.Nil(err) | ||
| s.True(flags.HasLockedInShareMode()) | ||
|
|
||
| s.Nil(txn3.LockKeys(context.Background(), kv.NewLockCtx(s.getTS(), 1000, time.Now()), pk3)) | ||
| s.Equal(txn3.GetCommitter().GetPrimaryKey(), pk3) | ||
| lockDone := make(chan time.Time) | ||
| go func() { | ||
| s.NotNil(txn3.LockKeys(context.Background(), kv.NewLockCtx(s.getTS(), 1000, time.Now()), key)) // should block and return conflict | ||
| lockDone <- time.Now() | ||
| }() | ||
|
|
||
| time.Sleep(500 * time.Millisecond) | ||
| beforeRelease := time.Now() | ||
|
|
||
| if commit { | ||
| s.Nil(txn1.Commit(context.Background())) | ||
| s.Nil(txn2.Commit(context.Background())) | ||
| } else { | ||
| s.Nil(txn1.Rollback()) | ||
| s.Nil(txn2.Rollback()) | ||
| } | ||
|
|
||
| afterRelease := <-lockDone | ||
| s.True(afterRelease.After(beforeRelease), "txn3(exclusive lock) should block until txn1(shared lock) and txn2(shared lock) commit") | ||
| s.Nil(txn3.Rollback()) | ||
| } | ||
| } |
There was a problem hiding this comment.
Potentially flaky expectation if the exclusive lock succeeds after release.
The lock wait time (1000ms) exceeds the 500ms delay before releasing shared locks, so the lock can legitimately succeed, making the NotNil assertion nondeterministic. Consider asserting success after release (or shortening the wait to force conflict).
💡 Suggested stabilization
- lockDone := make(chan time.Time)
+ lockDone := make(chan struct {
+ at time.Time
+ err error
+ })
go func() {
- s.NotNil(txn3.LockKeys(context.Background(), kv.NewLockCtx(s.getTS(), 1000, time.Now()), key)) // should block and return conflict
- lockDone <- time.Now()
+ err := txn3.LockKeys(context.Background(), kv.NewLockCtx(s.getTS(), 1000, time.Now()), key)
+ lockDone <- struct {
+ at time.Time
+ err error
+ }{time.Now(), err}
}()
@@
- afterRelease := <-lockDone
- s.True(afterRelease.After(beforeRelease), "txn3(exclusive lock) should block until txn1(shared lock) and txn2(shared lock) commit")
+ res := <-lockDone
+ s.True(res.at.After(beforeRelease), "txn3(exclusive lock) should block until txn1(shared lock) and txn2(shared lock) commit")
+ s.Nil(res.err)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (s *testSharedLockSuite) TestSharedLockBlockExclusiveLock() { | |
| for _, commit := range []bool{true, false} { | |
| txn1 := s.begin() | |
| txn2 := s.begin() | |
| txn3 := s.begin() | |
| pk1 := []byte("TestSharedLockBlockExclusiveLock_pk1") | |
| pk2 := []byte("TestSharedLockBlockExclusiveLock_pk2") | |
| pk3 := []byte("TestSharedLockBlockExclusiveLock_pk3") | |
| key := []byte("TestSharedLockBlockExclusiveLock_shared_key") | |
| s.Nil(txn1.LockKeys(context.Background(), kv.NewLockCtx(s.getTS(), 1000, time.Now()), pk1)) | |
| s.Equal(txn1.GetCommitter().GetPrimaryKey(), pk1) | |
| lockctx1 := kv.NewLockCtx(s.getTS(), 1000, time.Now()) | |
| lockctx1.InShareMode = true | |
| s.Nil(txn1.LockKeys(context.Background(), lockctx1, key)) | |
| s.Nil(txn2.LockKeys(context.Background(), kv.NewLockCtx(s.getTS(), 1000, time.Now()), pk2)) | |
| s.Equal(txn2.GetCommitter().GetPrimaryKey(), pk2) | |
| lockctx2 := kv.NewLockCtx(s.getTS(), 1000, time.Now()) | |
| lockctx2.InShareMode = true | |
| s.Nil(txn2.LockKeys(context.Background(), lockctx2, key)) | |
| flags, err := txn2.GetMemBuffer().GetFlags(key) | |
| s.Nil(err) | |
| s.True(flags.HasLockedInShareMode()) | |
| s.Nil(txn3.LockKeys(context.Background(), kv.NewLockCtx(s.getTS(), 1000, time.Now()), pk3)) | |
| s.Equal(txn3.GetCommitter().GetPrimaryKey(), pk3) | |
| lockDone := make(chan time.Time) | |
| go func() { | |
| s.NotNil(txn3.LockKeys(context.Background(), kv.NewLockCtx(s.getTS(), 1000, time.Now()), key)) // should block and return conflict | |
| lockDone <- time.Now() | |
| }() | |
| time.Sleep(500 * time.Millisecond) | |
| beforeRelease := time.Now() | |
| if commit { | |
| s.Nil(txn1.Commit(context.Background())) | |
| s.Nil(txn2.Commit(context.Background())) | |
| } else { | |
| s.Nil(txn1.Rollback()) | |
| s.Nil(txn2.Rollback()) | |
| } | |
| afterRelease := <-lockDone | |
| s.True(afterRelease.After(beforeRelease), "txn3(exclusive lock) should block until txn1(shared lock) and txn2(shared lock) commit") | |
| s.Nil(txn3.Rollback()) | |
| } | |
| } | |
| func (s *testSharedLockSuite) TestSharedLockBlockExclusiveLock() { | |
| for _, commit := range []bool{true, false} { | |
| txn1 := s.begin() | |
| txn2 := s.begin() | |
| txn3 := s.begin() | |
| pk1 := []byte("TestSharedLockBlockExclusiveLock_pk1") | |
| pk2 := []byte("TestSharedLockBlockExclusiveLock_pk2") | |
| pk3 := []byte("TestSharedLockBlockExclusiveLock_pk3") | |
| key := []byte("TestSharedLockBlockExclusiveLock_shared_key") | |
| s.Nil(txn1.LockKeys(context.Background(), kv.NewLockCtx(s.getTS(), 1000, time.Now()), pk1)) | |
| s.Equal(txn1.GetCommitter().GetPrimaryKey(), pk1) | |
| lockctx1 := kv.NewLockCtx(s.getTS(), 1000, time.Now()) | |
| lockctx1.InShareMode = true | |
| s.Nil(txn1.LockKeys(context.Background(), lockctx1, key)) | |
| s.Nil(txn2.LockKeys(context.Background(), kv.NewLockCtx(s.getTS(), 1000, time.Now()), pk2)) | |
| s.Equal(txn2.GetCommitter().GetPrimaryKey(), pk2) | |
| lockctx2 := kv.NewLockCtx(s.getTS(), 1000, time.Now()) | |
| lockctx2.InShareMode = true | |
| s.Nil(txn2.LockKeys(context.Background(), lockctx2, key)) | |
| flags, err := txn2.GetMemBuffer().GetFlags(key) | |
| s.Nil(err) | |
| s.True(flags.HasLockedInShareMode()) | |
| s.Nil(txn3.LockKeys(context.Background(), kv.NewLockCtx(s.getTS(), 1000, time.Now()), pk3)) | |
| s.Equal(txn3.GetCommitter().GetPrimaryKey(), pk3) | |
| lockDone := make(chan struct { | |
| at time.Time | |
| err error | |
| }) | |
| go func() { | |
| err := txn3.LockKeys(context.Background(), kv.NewLockCtx(s.getTS(), 1000, time.Now()), key) | |
| lockDone <- struct { | |
| at time.Time | |
| err error | |
| }{time.Now(), err} | |
| }() | |
| time.Sleep(500 * time.Millisecond) | |
| beforeRelease := time.Now() | |
| if commit { | |
| s.Nil(txn1.Commit(context.Background())) | |
| s.Nil(txn2.Commit(context.Background())) | |
| } else { | |
| s.Nil(txn1.Rollback()) | |
| s.Nil(txn2.Rollback()) | |
| } | |
| res := <-lockDone | |
| s.True(res.at.After(beforeRelease), "txn3(exclusive lock) should block until txn1(shared lock) and txn2(shared lock) commit") | |
| s.Nil(res.err) | |
| s.Nil(txn3.Rollback()) | |
| } | |
| } |
🤖 Prompt for AI Agents
In `@integration_tests/shared_lock_test.go` around lines 89 - 139, The test
currently assumes txn3's exclusive LockKeys call will return a conflict (NotNil)
but uses a 1000ms lock wait while only sleeping 500ms before releasing share
locks, making the expectation flaky; change the goroutine to capture the error
from txn3.LockKeys (callsite: txn3.LockKeys(context.Background(),
kv.NewLockCtx(...), key)) into a channel instead of asserting NotNil inline,
then after releasing txn1/txn2 read the error from that channel and assert it is
nil (i.e., exclusive lock succeeds after release). Also ensure you create txn3's
lock context with the same/new kv.NewLockCtx so its wait behavior remains
deterministic.
| func (s *testSharedLockSuite) TestExclusiveLockBlockSharedLock() { | ||
| for _, commit := range []bool{true, false} { | ||
| txn1 := s.begin() | ||
| txn2 := s.begin() | ||
| txn3 := s.begin() | ||
|
|
||
| pk1 := []byte("TestExclusiveLockBlockSharedLock_pk1") | ||
| pk2 := []byte("TestExclusiveLockBlockSharedLock_pk2") | ||
| pk3 := []byte("TestExclusiveLockBlockSharedLock_pk3") | ||
| key := []byte("TestExclusiveLockBlockSharedLock_shared_key") | ||
|
|
||
| s.Nil(txn1.LockKeys(context.Background(), kv.NewLockCtx(s.getTS(), 1000, time.Now()), pk1)) | ||
| s.Equal(txn1.GetCommitter().GetPrimaryKey(), pk1) | ||
| s.Nil(txn1.LockKeys(context.Background(), kv.NewLockCtx(s.getTS(), 1000, time.Now()), key)) | ||
|
|
||
| s.Nil(txn2.LockKeys(context.Background(), kv.NewLockCtx(s.getTS(), 1000, time.Now()), pk2)) | ||
| s.Equal(txn2.GetCommitter().GetPrimaryKey(), pk2) | ||
| s.Nil(txn3.LockKeys(context.Background(), kv.NewLockCtx(s.getTS(), 1000, time.Now()), pk3)) | ||
| s.Equal(txn3.GetCommitter().GetPrimaryKey(), pk3) | ||
|
|
||
| txn2LockDone := make(chan time.Time) | ||
| go func() { | ||
| lockctx := kv.NewLockCtx(s.getTS(), 1000, time.Now()) | ||
| lockctx.InShareMode = true | ||
| s.NotNil(txn2.LockKeys(context.Background(), lockctx, key)) // should block and return conflict | ||
| txn2LockDone <- time.Now() | ||
| }() | ||
| txn3LockDone := make(chan time.Time) | ||
| go func() { | ||
| lockctx := kv.NewLockCtx(s.getTS(), 1000, time.Now()) | ||
| lockctx.InShareMode = true | ||
| s.NotNil(txn3.LockKeys(context.Background(), lockctx, key)) // should block and return conflict | ||
| txn3LockDone <- time.Now() | ||
| }() | ||
|
|
||
| time.Sleep(500 * time.Millisecond) | ||
| beforeRelease := time.Now() | ||
|
|
||
| if commit { | ||
| s.Nil(txn1.Commit(context.Background())) | ||
| } else { | ||
| s.Nil(txn1.Rollback()) | ||
| } | ||
|
|
||
| txn2Locked := <-txn2LockDone | ||
| txn3Locked := <-txn3LockDone | ||
| s.True(txn2Locked.After(beforeRelease), "txn2(shared lock) should block until txn1(exclusive lock) commit/rollback") | ||
| s.True(txn3Locked.After(beforeRelease), "txn3(shared lock) should block until txn1(exclusive lock) commit/rollback") | ||
| s.Nil(txn2.Rollback()) | ||
| s.Nil(txn3.Rollback()) | ||
| } | ||
| } |
There was a problem hiding this comment.
Same timing-sensitive assumption for shared-lock acquisition.
These assertions can also be nondeterministic if the exclusive lock releases before the wait timeout. Consider the same stabilization as above.
💡 Suggested stabilization
- txn2LockDone := make(chan time.Time)
+ txn2LockDone := make(chan struct {
+ at time.Time
+ err error
+ })
go func() {
lockctx := kv.NewLockCtx(s.getTS(), 1000, time.Now())
lockctx.InShareMode = true
- s.NotNil(txn2.LockKeys(context.Background(), lockctx, key)) // should block and return conflict
- txn2LockDone <- time.Now()
+ err := txn2.LockKeys(context.Background(), lockctx, key)
+ txn2LockDone <- struct {
+ at time.Time
+ err error
+ }{time.Now(), err}
}()
- txn3LockDone := make(chan time.Time)
+ txn3LockDone := make(chan struct {
+ at time.Time
+ err error
+ })
go func() {
lockctx := kv.NewLockCtx(s.getTS(), 1000, time.Now())
lockctx.InShareMode = true
- s.NotNil(txn3.LockKeys(context.Background(), lockctx, key)) // should block and return conflict
- txn3LockDone <- time.Now()
+ err := txn3.LockKeys(context.Background(), lockctx, key)
+ txn3LockDone <- struct {
+ at time.Time
+ err error
+ }{time.Now(), err}
}()
@@
- txn2Locked := <-txn2LockDone
- txn3Locked := <-txn3LockDone
- s.True(txn2Locked.After(beforeRelease), "txn2(shared lock) should block until txn1(exclusive lock) commit/rollback")
- s.True(txn3Locked.After(beforeRelease), "txn3(shared lock) should block until txn1(exclusive lock) commit/rollback")
+ res2 := <-txn2LockDone
+ res3 := <-txn3LockDone
+ s.True(res2.at.After(beforeRelease), "txn2(shared lock) should block until txn1(exclusive lock) commit/rollback")
+ s.True(res3.at.After(beforeRelease), "txn3(shared lock) should block until txn1(exclusive lock) commit/rollback")
+ s.Nil(res2.err)
+ s.Nil(res3.err)🤖 Prompt for AI Agents
In `@integration_tests/shared_lock_test.go` around lines 141 - 192,
TestExclusiveLockBlockSharedLock uses timing-sensitive assertions; make
goroutines explicitly signal when they begin attempting the shared LockKeys so
the main test only releases txn1 after both goroutines are confirmed to be
waiting. Add two "started" channels (e.g., txn2Started and txn3Started)
alongside txn2LockDone/txn3LockDone in TestExclusiveLockBlockSharedLock, have
each goroutine send to its started channel immediately before calling
txnX.LockKeys(...), then have the main thread wait to receive both started
signals before recording beforeRelease and committing/rolling-back txn1; keep
the existing reads from txn2LockDone/txn3LockDone and the time-after assertions
to verify they blocked until txn1 released.
Signed-off-by: you06 <you1474600@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@integration_tests/shared_lock_test.go`:
- Around line 386-395: The closure passed to s.Eventually contains a Testify
assertion call (s.LessOrEqual(len(locks), currLocks)) which will panic on
failure; change the closure to only compute and return a boolean (e.g., compare
len(locks) to currLocks and return the desired true/false), and move the Testify
assertion (using s.LessOrEqual or s.Equal) outside the Eventually call after it
returns so failures produce proper test errors; update the code around
scanLocks(key, s.getTS()) / s.Eventually(...) to perform pure boolean checks
inside the closure and assert results with s.LessOrEqual or s.Equal afterward.
🧹 Nitpick comments (1)
integration_tests/shared_lock_test.go (1)
72-83: Minor inconsistency in assertion helpers.
begin()usess.Require().Nil(err)which fails immediately on error, whilegetTS()usess.Nil(err)which continues execution. Consider usings.Require().Nil(err)consistently in helper methods to fail fast on setup errors.
|
|
||
| currLocks := 3 - i | ||
| s.Eventually(func() bool { | ||
| locks = s.scanLocks(key, s.getTS()) | ||
| s.LessOrEqual(len(locks), currLocks) | ||
| if len(locks) == currLocks { | ||
| return false | ||
| } | ||
| return len(locks) == currLocks-1 | ||
| }, 5*time.Second, 100*time.Millisecond, "after txn %d commit/rollback, expect %d locks remain", i+1, currLocks-1) |
There was a problem hiding this comment.
Assertion inside Eventually callback can cause panics.
The s.LessOrEqual(len(locks), currLocks) assertion inside the Eventually callback (line 390) will panic if it fails, potentially causing the test to fail with an unhelpful stack trace rather than the intended error message. Testify's Eventually expects the callback to return a boolean, not to assert.
Consider moving the assertion outside or returning false when the condition isn't met:
💡 Suggested fix
s.Eventually(func() bool {
locks = s.scanLocks(key, s.getTS())
- s.LessOrEqual(len(locks), currLocks)
- if len(locks) == currLocks {
- return false
+ if len(locks) > currLocks {
+ return false // unexpected state, keep waiting
}
return len(locks) == currLocks-1
}, 5*time.Second, 100*time.Millisecond, "after txn %d commit/rollback, expect %d locks remain", i+1, currLocks-1)
+s.LessOrEqual(len(locks), currLocks-1, "too many locks remain after txn %d commit/rollback", i+1)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| currLocks := 3 - i | |
| s.Eventually(func() bool { | |
| locks = s.scanLocks(key, s.getTS()) | |
| s.LessOrEqual(len(locks), currLocks) | |
| if len(locks) == currLocks { | |
| return false | |
| } | |
| return len(locks) == currLocks-1 | |
| }, 5*time.Second, 100*time.Millisecond, "after txn %d commit/rollback, expect %d locks remain", i+1, currLocks-1) | |
| currLocks := 3 - i | |
| s.Eventually(func() bool { | |
| locks = s.scanLocks(key, s.getTS()) | |
| if len(locks) > currLocks { | |
| return false // unexpected state, keep waiting | |
| } | |
| return len(locks) == currLocks-1 | |
| }, 5*time.Second, 100*time.Millisecond, "after txn %d commit/rollback, expect %d locks remain", i+1, currLocks-1) | |
| s.LessOrEqual(len(locks), currLocks-1, "too many locks remain after txn %d commit/rollback", i+1) |
🤖 Prompt for AI Agents
In `@integration_tests/shared_lock_test.go` around lines 386 - 395, The closure
passed to s.Eventually contains a Testify assertion call
(s.LessOrEqual(len(locks), currLocks)) which will panic on failure; change the
closure to only compute and return a boolean (e.g., compare len(locks) to
currLocks and return the desired true/false), and move the Testify assertion
(using s.LessOrEqual or s.Equal) outside the Eventually call after it returns so
failures produce proper test errors; update the code around scanLocks(key,
s.getTS()) / s.Eventually(...) to perform pure boolean checks inside the closure
and assert results with s.LessOrEqual or s.Equal afterward.

ref tikv/tikv#19087
Support shared lock for client-go with some limitations:
Summary by CodeRabbit
New Features
Behavior Changes
Metrics
Tests