-
Notifications
You must be signed in to change notification settings - Fork 642
Open
Description
Sorry for the lengthy reproduction (maybe it is possible to make it smaller and have more control)
What I do:
- Install bitnami/cassandra to my local k8s (I use kind)
helm install --namespace cassandra --create-namespace cassandra bitnami/cassandra --set "replicaCount=6" - Expose it to connect from localhost
kubectl expose service cassandra --type=NodePort --name=cassandra-external --namespace=cassandra - Get host for connection with
docker inspect kind-control-plane | grep IPAddress - Get port for connection with
kubectl get services --namespace=cassandra | grep external - Get password with
kubectl get secret --namespace "cassandra" cassandra -o jsonpath="{.data.cassandra-password}" | base64 -d - Run this code:
package main
import (
"fmt"
"math/rand/v2"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/gocql/gocql"
)
const workers = 100
const queries = 10_000
const readWorkers = 100
const readQueries = 10_000_000
// helm install --namespace cassandra --create-namespace cassandra bitnami/cassandra --set "replicaCount=6"
// kubectl expose service cassandra --type=NodePort --name=cassandra-external --namespace=cassandra
func main() {
//host from docker inspect kind-control-plane | grep IPAddress
//port from kubectl get services --namespace=cassandra | grep external
cluster := gocql.NewCluster("172.18.0.2:32146")
cluster.Authenticator = gocql.PasswordAuthenticator{
Username: "cassandra",
//from kubectl get secret --namespace "cassandra" cassandra -o jsonpath="{.data.cassandra-password}" | base64 -d
Password: "kMDfXIiu5M",
}
session, err := cluster.CreateSession()
if err != nil {
panic(err)
}
execRelease(session.Query("drop keyspace if exists k8stest"))
execRelease(session.Query("create keyspace k8stest with replication = {'class' : 'SimpleStrategy', 'replication_factor' : 3}"))
execRelease(session.Query("drop table if exists k8stest.test"))
execRelease(session.Query("create table k8stest.test (a int, b text, primary key(a))"))
var wg sync.WaitGroup
for i := 0; i <= workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
query := session.Query("insert into k8stest.test (a, b) values (?,?)")
for j := i * queries; j < (i+1)*queries; j++ {
query.Bind(j, "Message"+strconv.Itoa(j))
if err := query.Exec(); err != nil {
panic(err)
}
}
query.Release()
}()
}
wg.Wait()
var scans uint64
var errors uint64
var mu sync.RWMutex
erorsCount := make(map[string]uint64)
for i := 0; i <= readWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
query := session.Query("select b from k8stest.test where a=?")
for j := i * readQueries; j < (i+1)*readQueries; j++ {
id := rand.IntN(queries * workers)
query.Bind(id)
iter := query.Iter()
var val string
if iter.Scan(&val) {
if val != "Message"+strconv.Itoa(id) {
panic("unexpected message " + val + "instead of Message" + strconv.Itoa(id))
}
atomic.AddUint64(&scans, 1)
} else {
atomic.AddUint64(&errors, 1)
time.Sleep(time.Millisecond)
}
if err := iter.Close(); err != nil {
mu.Lock()
erorsCount[err.Error()]++
mu.Unlock()
query.Release()
query = session.Query("select b from k8stest.test where a=?")
}
}
query.Release()
}()
}
go func() {
for {
time.Sleep(time.Second)
fmt.Printf("##### %d scans, %d errors\n", scans, errors)
mu.RLock()
for err, count := range erorsCount {
fmt.Printf("error %s count %d \n", err, count)
}
mu.RUnlock()
}
}()
wg.Wait()
}
func execRelease(query *gocql.Query) {
if err := query.Exec(); err != nil {
println(err.Error())
panic(err)
}
query.Release()
}
- After reading starts:
##### 4494 scans, 0 errors
##### 11271 scans, 0 errors
##### 17814 scans, 0 errors
##### 23093 scans, 0 errors
...
I remove one pod from the statefulset
- Get a bit of errors and reading continues
##### 373219 scans, 685 errors
error Server is shutting down count 635
error writev tcp 172.18.0.1:57908->172.18.0.2:32146: writev: broken pipe count 50
- However after cassandra node joins the ring session breaks and I receive only errors:
##### 546274 scans, 685 errors
error Server is shutting down count 635
error writev tcp 172.18.0.1:57908->172.18.0.2:32146: writev: broken pipe count 50
##### 553261 scans, 685 errors
error Server is shutting down count 635
error writev tcp 172.18.0.1:57908->172.18.0.2:32146: writev: broken pipe count 50
##### 556674 scans, 41433 errors
error gocql: connection closed waiting for response count 101
error gocql: no hosts available in the pool count 40547
error Server is shutting down count 635
error writev tcp 172.18.0.1:57908->172.18.0.2:32146: writev: broken pipe count 50
##### 556674 scans, 121771 errors
error gocql: connection closed waiting for response count 101
error gocql: no hosts available in the pool count 120903
error Server is shutting down count 635
error writev tcp 172.18.0.1:57908->172.18.0.2:32146: writev: broken pipe count 50
...
##### 556674 scans, 10370862 errors
error Server is shutting down count 635
error writev tcp 172.18.0.1:57908->172.18.0.2:32146: writev: broken pipe count 50
error gocql: connection closed waiting for response count 101
error gocql: no hosts available in the pool count 10369976
I expect gocql.Session to continue serving queries. It seems that this reaction is equal for removing and waiting for recreation of any node in cluster.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels