Skip to content

Commit 9f45fc5

Browse files
authored
Merge pull request #1796 from cloudwego/release-v0.14.0
chore: release v0.14.0
2 parents 7dbbaca + 62d66db commit 9f45fc5

File tree

88 files changed

+2563
-1507
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

88 files changed

+2563
-1507
lines changed

.codecov.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,5 @@
1+
github_checks:
2+
annotations: false
3+
14
ignore:
25
- "tool/**/*.go"

.github/workflows/tests.yml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,16 @@ on: [ push, pull_request ]
44

55
jobs:
66
unit-scenario-test:
7+
strategy:
8+
matrix:
9+
go: [ "1.18", "1.24" ]
710
runs-on: [ Linux, X64 ]
811
steps:
912
- uses: actions/checkout@v4
1013
- name: Set up Go
1114
uses: actions/setup-go@v5
1215
with:
13-
go-version: "1.24"
16+
go-version: ${{ matrix.go }}
1417
cache: false
1518
- name: Scenario Tests
1619
run: |

CONTRIBUTING.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ Before you submit your Pull Request (PR) consider the following guidelines:
2929
4. [Fork](https://docs.github.com/en/github/getting-started-with-github/fork-a-repo) the cloudwego/kitex repo.
3030
5. In your forked repository, make your changes in a new git branch:
3131
```
32-
git checkout -b my-fix-branch develop
32+
git checkout -b my-fix-branch main
3333
```
3434
6. Create your patch, including appropriate test cases. Please refer to [Go-UT](https://pkg.go.dev/testing#pkg-overview) for writing guides. [Go-Mock](https://github.com/golang/mock) is recommended to mock interface, please refer to internal/mocks/readme.md for more details, and [Mockey](https://github.com/bytedance/mockey) is recommended to mock functions, please refer to its readme doc for specific usage.
3535
7. Follow our [Style Guides](#code-style-guides).
@@ -39,7 +39,7 @@ Before you submit your Pull Request (PR) consider the following guidelines:
3939
```
4040
git push origin my-fix-branch
4141
```
42-
10. In GitHub, send a pull request to `kitex:develop` with a clear and unambiguous title.
42+
10. In GitHub, send a pull request to `kitex:main` with a clear and unambiguous title.
4343
4444
## Contribution Prerequisites
4545
- Our development environment keeps up with [Go Official](https://golang.org/project/).

client/genericclient/client.go

Lines changed: 182 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,15 @@ package genericclient
2020
import (
2121
"context"
2222
"runtime"
23+
"sync"
2324

2425
"github.com/cloudwego/kitex/client"
2526
"github.com/cloudwego/kitex/client/callopt"
27+
"github.com/cloudwego/kitex/client/callopt/streamcall"
2628
"github.com/cloudwego/kitex/pkg/generic"
2729
"github.com/cloudwego/kitex/pkg/serviceinfo"
30+
"github.com/cloudwego/kitex/pkg/streaming"
31+
"github.com/cloudwego/kitex/transport"
2832
)
2933

3034
var _ Client = &genericServiceClient{}
@@ -40,25 +44,40 @@ func NewClientWithServiceInfo(destService string, g generic.Generic, svcInfo *se
4044
var options []client.Option
4145
options = append(options, client.WithGeneric(g))
4246
options = append(options, client.WithDestService(destService))
47+
options = append(options, client.WithTransportProtocol(transport.TTHeaderStreaming))
4348
options = append(options, opts...)
4449

4550
kc, err := client.NewClient(svcInfo, options...)
4651
if err != nil {
4752
return nil, err
4853
}
54+
var mp *sync.Map
55+
if !generic.HasIDLInfo(g) {
56+
mp = &sync.Map{}
57+
}
4958
cli := &genericServiceClient{
5059
svcInfo: svcInfo,
5160
kClient: kc,
61+
sClient: kc.(client.Streaming),
5262
g: g,
63+
modeMap: mp,
5364
}
5465
runtime.SetFinalizer(cli, (*genericServiceClient).Close)
5566

5667
svcInfo.GenericMethod = func(name string) serviceinfo.MethodInfo {
57-
m := svcInfo.Methods[serviceinfo.GenericMethod]
68+
key := serviceinfo.GenericMethod
69+
if mp != nil {
70+
if mode, ok := mp.Load(name); ok {
71+
key = getGenericStreamingMethodInfoKey(mode.(serviceinfo.StreamingMode))
72+
}
73+
return svcInfo.Methods[key]
74+
}
5875
n, err := g.GetMethod(nil, name)
5976
if err != nil {
60-
return m
77+
return svcInfo.Methods[key]
6178
}
79+
key = getGenericStreamingMethodInfoKey(n.StreamingMode)
80+
m := svcInfo.Methods[key]
6281
return &methodInfo{
6382
MethodInfo: m,
6483
oneway: n.Oneway,
@@ -84,12 +103,23 @@ type Client interface {
84103

85104
// GenericCall generic call
86105
GenericCall(ctx context.Context, method string, request interface{}, callOptions ...callopt.Option) (response interface{}, err error)
106+
// ClientStreaming creates an implementation of ClientStreamingClient
107+
ClientStreaming(ctx context.Context, method string, callOptions ...streamcall.Option) (ClientStreamingClient, error)
108+
// ServerStreaming creates an implementation of ServerStreamingClient
109+
ServerStreaming(ctx context.Context, method string, req interface{}, callOptions ...streamcall.Option) (ServerStreamingClient, error)
110+
// BidirectionalStreaming creates an implementation of BidiStreamingClient
111+
BidirectionalStreaming(ctx context.Context, method string, callOptions ...streamcall.Option) (BidiStreamingClient, error)
87112
}
88113

89114
type genericServiceClient struct {
90115
svcInfo *serviceinfo.ServiceInfo
91116
kClient client.Client
117+
sClient client.Streaming
92118
g generic.Generic
119+
// modeMap stores the streaming mode of methods for binary generic
120+
// because the streaming mode of a method is not stored in binary generic which doesn't have IDL info
121+
// but we can know it when creating different streaming clients
122+
modeMap *sync.Map // map[string]serviceinfo.StreamingMode
93123
}
94124

95125
func (gc *genericServiceClient) GenericCall(ctx context.Context, method string, request interface{}, callOptions ...callopt.Option) (response interface{}, err error) {
@@ -121,3 +151,153 @@ func (gc *genericServiceClient) Close() error {
121151
// Notice: don't need to close kClient because finalizer will close it.
122152
return gc.g.Close()
123153
}
154+
155+
func (gc *genericServiceClient) ClientStreaming(ctx context.Context, method string, callOptions ...streamcall.Option) (ClientStreamingClient, error) {
156+
ctx = client.NewCtxWithCallOptions(ctx, streamcall.GetCallOptions(callOptions))
157+
if gc.modeMap != nil {
158+
gc.modeMap.LoadOrStore(method, serviceinfo.StreamingClient)
159+
}
160+
st, err := gc.sClient.StreamX(ctx, method)
161+
if err != nil {
162+
return nil, err
163+
}
164+
return newClientStreamingClient(gc.svcInfo.MethodInfo(method), method, st), nil
165+
}
166+
167+
func (gc *genericServiceClient) ServerStreaming(ctx context.Context, method string, req interface{}, callOptions ...streamcall.Option) (ServerStreamingClient, error) {
168+
ctx = client.NewCtxWithCallOptions(ctx, streamcall.GetCallOptions(callOptions))
169+
if gc.modeMap != nil {
170+
gc.modeMap.LoadOrStore(method, serviceinfo.StreamingServer)
171+
}
172+
st, err := gc.sClient.StreamX(ctx, method)
173+
if err != nil {
174+
return nil, err
175+
}
176+
stream := newServerStreamingClient(gc.svcInfo.MethodInfo(method), method, st).(*serverStreamingClient)
177+
178+
args := stream.methodInfo.NewArgs().(*generic.Args)
179+
args.Method = stream.method
180+
args.Request = req
181+
if err := st.SendMsg(ctx, args); err != nil {
182+
return nil, err
183+
}
184+
if err := stream.CloseSend(ctx); err != nil {
185+
return nil, err
186+
}
187+
return stream, nil
188+
}
189+
190+
func (gc *genericServiceClient) BidirectionalStreaming(ctx context.Context, method string, callOptions ...streamcall.Option) (BidiStreamingClient, error) {
191+
ctx = client.NewCtxWithCallOptions(ctx, streamcall.GetCallOptions(callOptions))
192+
if gc.modeMap != nil {
193+
gc.modeMap.LoadOrStore(method, serviceinfo.StreamingBidirectional)
194+
}
195+
st, err := gc.sClient.StreamX(ctx, method)
196+
if err != nil {
197+
return nil, err
198+
}
199+
return newBidiStreamingClient(gc.svcInfo.MethodInfo(method), method, st), nil
200+
}
201+
202+
// ClientStreamingClient define client side generic client streaming APIs
203+
type ClientStreamingClient interface {
204+
Send(ctx context.Context, req interface{}) error
205+
CloseAndRecv(ctx context.Context) (interface{}, error)
206+
streaming.ClientStream
207+
}
208+
209+
type clientStreamingClient struct {
210+
methodInfo serviceinfo.MethodInfo
211+
method string
212+
streaming.ClientStream
213+
}
214+
215+
func newClientStreamingClient(methodInfo serviceinfo.MethodInfo, method string, st streaming.ClientStream) ClientStreamingClient {
216+
return &clientStreamingClient{
217+
methodInfo: methodInfo,
218+
method: method,
219+
ClientStream: st,
220+
}
221+
}
222+
223+
func (c *clientStreamingClient) Send(ctx context.Context, req interface{}) error {
224+
args := c.methodInfo.NewArgs().(*generic.Args)
225+
args.Method = c.method
226+
args.Request = req
227+
return c.ClientStream.SendMsg(ctx, args)
228+
}
229+
230+
func (c *clientStreamingClient) CloseAndRecv(ctx context.Context) (interface{}, error) {
231+
if err := c.ClientStream.CloseSend(ctx); err != nil {
232+
return nil, err
233+
}
234+
res := c.methodInfo.NewResult().(*generic.Result)
235+
if err := c.ClientStream.RecvMsg(ctx, res); err != nil {
236+
return nil, err
237+
}
238+
return res.GetSuccess(), nil
239+
}
240+
241+
// ServerStreamingClient define client side generic server streaming APIs
242+
type ServerStreamingClient interface {
243+
Recv(ctx context.Context) (interface{}, error)
244+
streaming.ClientStream
245+
}
246+
247+
type serverStreamingClient struct {
248+
methodInfo serviceinfo.MethodInfo
249+
method string
250+
streaming.ClientStream
251+
}
252+
253+
func newServerStreamingClient(methodInfo serviceinfo.MethodInfo, method string, st streaming.ClientStream) ServerStreamingClient {
254+
return &serverStreamingClient{
255+
methodInfo: methodInfo,
256+
method: method,
257+
ClientStream: st,
258+
}
259+
}
260+
261+
func (c *serverStreamingClient) Recv(ctx context.Context) (interface{}, error) {
262+
res := c.methodInfo.NewResult().(*generic.Result)
263+
if err := c.ClientStream.RecvMsg(ctx, res); err != nil {
264+
return nil, err
265+
}
266+
return res.GetSuccess(), nil
267+
}
268+
269+
// BidiStreamingClient define client side generic bidirectional streaming APIs
270+
type BidiStreamingClient interface {
271+
Send(ctx context.Context, req interface{}) error
272+
Recv(ctx context.Context) (interface{}, error)
273+
streaming.ClientStream
274+
}
275+
276+
type bidiStreamingClient struct {
277+
methodInfo serviceinfo.MethodInfo
278+
method string
279+
streaming.ClientStream
280+
}
281+
282+
func newBidiStreamingClient(methodInfo serviceinfo.MethodInfo, method string, st streaming.ClientStream) BidiStreamingClient {
283+
return &bidiStreamingClient{
284+
methodInfo: methodInfo,
285+
method: method,
286+
ClientStream: st,
287+
}
288+
}
289+
290+
func (c *bidiStreamingClient) Send(ctx context.Context, req interface{}) error {
291+
args := c.methodInfo.NewArgs().(*generic.Args)
292+
args.Method = c.method
293+
args.Request = req
294+
return c.ClientStream.SendMsg(ctx, args)
295+
}
296+
297+
func (c *bidiStreamingClient) Recv(ctx context.Context) (interface{}, error) {
298+
res := c.methodInfo.NewResult().(*generic.Result)
299+
if err := c.ClientStream.RecvMsg(ctx, res); err != nil {
300+
return nil, err
301+
}
302+
return res.GetSuccess(), nil
303+
}

client/genericclient/generic_stream_service.go

Lines changed: 2 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -21,91 +21,7 @@ import (
2121
"github.com/cloudwego/kitex/pkg/serviceinfo"
2222
)
2323

24+
// Deprecated, use generic.ServiceInfoWithGeneric instead
2425
func StreamingServiceInfo(g generic.Generic) *serviceinfo.ServiceInfo {
25-
return newClientStreamingServiceInfo(g)
26-
}
27-
28-
func newClientStreamingServiceInfo(g generic.Generic) *serviceinfo.ServiceInfo {
29-
if g.PayloadCodec() != nil {
30-
// TODO: support grpc binary generic
31-
panic("binary generic streaming is not supported")
32-
}
33-
34-
methods := map[string]serviceinfo.MethodInfo{
35-
serviceinfo.GenericClientStreamingMethod: serviceinfo.NewMethodInfo(
36-
nil,
37-
func() interface{} {
38-
args := &generic.Args{}
39-
args.SetCodec(g.MessageReaderWriter())
40-
return args
41-
},
42-
func() interface{} {
43-
result := &generic.Result{}
44-
result.SetCodec(g.MessageReaderWriter())
45-
return result
46-
},
47-
false,
48-
serviceinfo.WithStreamingMode(serviceinfo.StreamingClient),
49-
),
50-
serviceinfo.GenericServerStreamingMethod: serviceinfo.NewMethodInfo(
51-
nil,
52-
func() interface{} {
53-
args := &generic.Args{}
54-
args.SetCodec(g.MessageReaderWriter())
55-
return args
56-
},
57-
func() interface{} {
58-
result := &generic.Result{}
59-
result.SetCodec(g.MessageReaderWriter())
60-
return result
61-
},
62-
false,
63-
serviceinfo.WithStreamingMode(serviceinfo.StreamingServer),
64-
),
65-
serviceinfo.GenericBidirectionalStreamingMethod: serviceinfo.NewMethodInfo(
66-
nil,
67-
func() interface{} {
68-
args := &generic.Args{}
69-
args.SetCodec(g.MessageReaderWriter())
70-
return args
71-
},
72-
func() interface{} {
73-
result := &generic.Result{}
74-
result.SetCodec(g.MessageReaderWriter())
75-
return result
76-
},
77-
false,
78-
serviceinfo.WithStreamingMode(serviceinfo.StreamingBidirectional),
79-
),
80-
serviceinfo.GenericMethod: serviceinfo.NewMethodInfo(
81-
nil,
82-
func() interface{} {
83-
args := &generic.Args{}
84-
args.SetCodec(g.MessageReaderWriter())
85-
return args
86-
},
87-
func() interface{} {
88-
result := &generic.Result{}
89-
result.SetCodec(g.MessageReaderWriter())
90-
return result
91-
},
92-
false,
93-
),
94-
}
95-
svcInfo := &serviceinfo.ServiceInfo{
96-
ServiceName: g.IDLServiceName(),
97-
Methods: methods,
98-
PayloadCodec: g.PayloadCodecType(),
99-
Extra: make(map[string]interface{}),
100-
}
101-
svcInfo.Extra["generic"] = true
102-
if extra, ok := g.(generic.ExtraProvider); ok {
103-
if extra.GetExtra(generic.CombineServiceKey) == "true" {
104-
svcInfo.Extra["combine_service"] = true
105-
}
106-
if pkg := extra.GetExtra("PackageName"); pkg != "" {
107-
svcInfo.Extra["PackageName"] = pkg
108-
}
109-
}
110-
return svcInfo
26+
return generic.ServiceInfoWithGeneric(g)
11127
}

0 commit comments

Comments
 (0)