Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Getty 是一个使用 Golang 开发的异步网络 I/O 库。它适用于 TCP、

如果您使用 WebSocket,您无需担心心跳请求/响应,因为 Getty 在 session.go 的 (Session)handleLoop 方法内通过发送和接收 WebSocket ping/pong 帧来处理此任务。您只需在 codec.go 的 (Codec)OnCron 方法内使用 session.go 的 (Session)GetActive 方法检查 WebSocket 会话是否已超时。

有关代码示例,请参阅 https://github.com/AlexStocks/getty-examples
有关代码示例,请参阅 https://github.com/AlexStocks/getty-examples

## 关于 Getty 中的网络传输

Expand Down
99 changes: 99 additions & 0 deletions transport/callback.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package getty

// callbackCommon 表示回调链表中的一个节点
// 每个节点包含处理器标识、键值、回调函数和指向下一个节点的指针
type callbackCommon struct {
handler interface{} // 处理器标识,用于标识回调的来源或类型
key interface{} // 回调的唯一标识键,与 handler 组合使用
call func() // 实际要执行的回调函数
next *callbackCommon // 指向下一个节点的指针,形成链表结构
}

// callbacks 是一个单向链表结构,用于管理多个回调函数
// 支持动态添加、移除和执行回调
type callbacks struct {
first *callbackCommon // 指向链表第一个节点的指针
last *callbackCommon // 指向链表最后一个节点的指针,用于快速添加新节点
}

// Add 向回调链表中添加一个新的回调函数
// 参数说明:
// - handler: 处理器标识,可以是任意类型
// - key: 回调的唯一标识键,与 handler 组合使用
// - callback: 要执行的回调函数,如果为 nil 则忽略
func (t *callbacks) Add(handler, key interface{}, callback func()) {
// 防止添加空回调函数
if callback == nil {
return
}

// 创建新的回调节点
newItem := &callbackCommon{handler, key, callback, nil}

if t.first == nil {
// 如果链表为空,新节点成为第一个节点
t.first = newItem
} else {
// 否则将新节点添加到链表末尾
t.last.next = newItem
}
// 更新最后一个节点的指针
t.last = newItem
}

// Remove 从回调链表中移除指定的回调函数
// 参数说明:
// - handler: 要移除的回调的处理器标识
// - key: 要移除的回调的唯一标识键
// 注意: 如果找不到匹配的回调,此方法不会产生任何效果
func (t *callbacks) Remove(handler, key interface{}) {
var prev *callbackCommon

// 遍历链表查找要移除的节点
for callback := t.first; callback != nil; prev, callback = callback, callback.next {
// 找到匹配的节点
if callback.handler == handler && callback.key == key {
if t.first == callback {
// 如果是第一个节点,更新 first 指针
t.first = callback.next
} else if prev != nil {
// 如果是中间节点,更新前一个节点的 next 指针
prev.next = callback.next
}

if t.last == callback {
// 如果是最后一个节点,更新 last 指针
t.last = prev
}

// 找到并移除后立即返回
return
}
}
}

// Invoke 执行链表中所有注册的回调函数
// 按照添加的顺序依次执行每个回调
// 注意: 如果某个回调函数为 nil,会被跳过
func (t *callbacks) Invoke() {
// 从头节点开始遍历整个链表
for callback := t.first; callback != nil; callback = callback.next {
// 确保回调函数不为 nil 再执行
if callback.call != nil {
callback.call()
}
}
}

// Count 返回链表中回调函数的数量
// 返回值: 当前注册的回调函数总数
func (t *callbacks) Count() int {
var count int

// 遍历链表计数
for callback := t.first; callback != nil; callback = callback.next {
count++
}

return count
}
92 changes: 92 additions & 0 deletions transport/callback_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package getty

import (
"testing"
)

func TestCallback(t *testing.T) {
// Test empty list
cb := &callbacks{}
if cb.Count() != 0 {
t.Errorf("Expected count for empty list is 0, but got %d", cb.Count())
}

// Test adding callback functions
var count, expected, remove, totalCount int
totalCount = 10
remove = 5

// Add multiple callback functions
for i := 1; i < totalCount; i++ {
expected = expected + i
func(ii int) {
cb.Add(ii, ii, func() { count = count + ii })
}(i)
}

// Verify count after adding
expectedCallbacks := totalCount - 1
if cb.Count() != expectedCallbacks {
t.Errorf("Expected callback count is %d, but got %d", expectedCallbacks, cb.Count())
}

// Test adding nil callback
cb.Add(remove, remove, nil)
if cb.Count() != expectedCallbacks {
t.Errorf("Expected count after adding nil callback is %d, but got %d", expectedCallbacks, cb.Count())
}

// Remove specified callback
cb.Remove(remove, remove)

// Try to remove non-existent callback
cb.Remove(remove+1, remove+2)

// Execute all callbacks
cb.Invoke()

// Verify execution result
expectedCount := expected - remove
if count != expectedCount {
t.Errorf("Expected execution result is %d, but got %d", expectedCount, count)
}

// Test string type handler and key
cb2 := &callbacks{}

// Add callbacks
cb2.Add("handler1", "key1", func() {})
cb2.Add("handler2", "key2", func() {})
cb2.Add("handler3", "key3", func() {})

if cb2.Count() != 3 {
t.Errorf("Expected callback count is 3, but got %d", cb2.Count())
}

// Remove middle callback
cb2.Remove("handler2", "key2")
if cb2.Count() != 2 {
t.Errorf("Expected count after removing middle callback is 2, but got %d", cb2.Count())
}

// Remove first callback
cb2.Remove("handler1", "key1")
if cb2.Count() != 1 {
t.Errorf("Expected count after removing first callback is 1, but got %d", cb2.Count())
}

// Remove last callback
cb2.Remove("handler3", "key3")
if cb2.Count() != 0 {
t.Errorf("Expected count after removing last callback is 0, but got %d", cb2.Count())
}

// Test removing non-existent callback
cb2.Add("handler1", "key1", func() {})
cb2.Remove("handler2", "key2") // Try to remove non-existent callback

// Should still have 1 callback
if cb2.Count() != 1 {
t.Errorf("Expected callback count is 1, but got %d", cb2.Count())
}
}
17 changes: 17 additions & 0 deletions transport/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ type Session interface {
WriteBytes([]byte) (int, error)
WriteBytesArray(...[]byte) (int, error)
Close()

AddCloseCallback(handler, key any, callback CallBackFunc)
RemoveCloseCallback(handler, key any)
}

// getty base session
Expand Down Expand Up @@ -135,6 +138,10 @@ type session struct {
grNum uatomic.Int32
lock sync.RWMutex
packetLock sync.RWMutex

// callbacks
closeCallback callbacks
closeCallbackMutex sync.RWMutex
}

func newSession(endPoint EndPoint, conn Connection) *session {
Expand Down Expand Up @@ -868,6 +875,16 @@ func (s *session) stop() {
}
}
close(s.done)

go func() {
defer func() {
if r := recover(); r != nil {
log.Errorf("invokeCloseCallbacks panic: %v", r)
}
}()
s.invokeCloseCallbacks()
}()

clt, cltFound := s.GetAttribute(sessionClientKey).(*client)
ignoreReconnect, flagFound := s.GetAttribute(ignoreReconnectKey).(bool)
if cltFound && flagFound && !ignoreReconnect {
Expand Down
68 changes: 68 additions & 0 deletions transport/session_callback.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package getty

// AddCloseCallback adds a close callback function to the Session
//
// Parameters:
// - handler: handler identifier, used to identify the source or type of the callback
// - key: unique identifier key for the callback, used in combination with handler
// - f: callback function to be executed when the session is closed
//
// Notes:
// - If the session is already closed, this addition will be ignored
// - The combination of handler and key must be unique, otherwise it will override previous callbacks
// - Callback functions will be executed in the order they were added when the session closes
func (s *session) AddCloseCallback(handler, key any, f CallBackFunc) {
if s.IsClosed() {
return
}
s.closeCallbackMutex.Lock()
s.closeCallback.Add(handler, key, f)
s.closeCallbackMutex.Unlock()
}

// RemoveCloseCallback removes the specified Session close callback function
//
// Parameters:
// - handler: handler identifier of the callback to be removed
// - key: unique identifier key of the callback to be removed
//
// Return value: none
//
// Notes:
// - If the session is already closed, this removal operation will be ignored
// - If no matching callback is found, this operation will have no effect
// - The removal operation is thread-safe
func (s *session) RemoveCloseCallback(handler, key any) {
if s.IsClosed() {
return
}
s.closeCallbackMutex.Lock()
s.closeCallback.Remove(handler, key)
s.closeCallbackMutex.Unlock()
}

// invokeCloseCallbacks executes all registered close callback functions
//
// Function description:
// - Executes all registered close callbacks in the order they were added
// - Uses read lock to protect the callback list, ensuring concurrency safety
// - This method is typically called automatically when the session closes
//
// Notes:
// - This is an internal method, not recommended for external direct calls
// - If panic occurs during callback execution, it will be caught and logged
// - Callback functions should avoid long blocking operations, async processing is recommended for time-consuming tasks
func (s *session) invokeCloseCallbacks() {
s.closeCallbackMutex.RLock()
s.closeCallback.Invoke()
s.closeCallbackMutex.RUnlock()
}

// CallBackFunc defines the callback function type when Session closes
//
// Usage notes:
// - Callback function accepts no parameters
// - Callback function returns no values
// - Callback function should handle resource cleanup, state updates, etc.
// - It's recommended to avoid accessing closed session state in callback functions
type CallBackFunc func()
Loading
Loading