-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathserver.go
More file actions
131 lines (116 loc) · 3.61 KB
/
server.go
File metadata and controls
131 lines (116 loc) · 3.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package airport
import (
"fmt"
"log/slog"
"os"
"github.com/apache/arrow-go/v18/arrow/memory"
"google.golang.org/grpc"
"github.com/hugr-lab/airport-go/auth"
"github.com/hugr-lab/airport-go/flight"
)
// NewServer registers Airport Flight service handlers on the provided gRPC server.
// This is the main entry point for the airport package.
//
// The function:
// 1. Validates the ServerConfig
// 2. Creates Flight service implementation
// 3. Registers it on grpcServer
//
// Returns error if config is invalid (e.g., nil Catalog).
// Does NOT start the gRPC server - user controls lifecycle via grpcServer.Serve().
//
// For authentication, use ServerOptions() to create a gRPC server with auth interceptors:
//
// opts := airport.ServerOptions(airport.ServerConfig{
// Auth: airport.BearerAuth(validateToken),
// })
// grpcServer := grpc.NewServer(opts...)
// err := airport.NewServer(grpcServer, config)
//
// Basic example without authentication:
//
// grpcServer := grpc.NewServer()
// err := airport.NewServer(grpcServer, airport.ServerConfig{
// Catalog: myCatalog,
// })
// if err != nil {
// log.Fatal(err)
// }
// lis, _ := net.Listen("tcp", ":50051")
// grpcServer.Serve(lis)
func NewServer(grpcServer *grpc.Server, config ServerConfig) error {
// Validate configuration
if err := validateConfig(config); err != nil {
return fmt.Errorf("%w: %v", ErrInvalidConfig, err)
}
// Use defaults for optional fields
allocator := config.Allocator
if allocator == nil {
allocator = memory.DefaultAllocator
}
logger := config.Logger
if logger == nil {
// Create logger with specified level or default to Info
level := slog.LevelInfo
if config.LogLevel != nil {
level = *config.LogLevel
}
handler := slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
Level: level,
})
logger = slog.New(handler)
}
// Create Flight server with optional transaction manager
var flightServer *flight.Server
if config.TransactionManager != nil {
flightServer = flight.NewServerWithTxManager(config.Catalog, allocator, logger, config.Address, config.TransactionManager)
} else {
flightServer = flight.NewServer(config.Catalog, allocator, logger, config.Address)
}
// Register Flight service
flight.RegisterFlightServer(grpcServer, flightServer)
// Log successful registration
logger.Info("Airport Flight server registered",
"has_auth", config.Auth != nil,
"has_tx_manager", config.TransactionManager != nil,
"max_message_size", config.MaxMessageSize,
)
return nil
}
// validateConfig checks that required ServerConfig fields are valid.
func validateConfig(config ServerConfig) error {
if config.Catalog == nil {
return fmt.Errorf("catalog is required")
}
return nil
}
// ServerOptions returns gRPC server options with authentication interceptors.
// Use this when creating a gRPC server if you want authentication enabled.
//
// Example:
//
// config := airport.ServerConfig{
// Catalog: catalog,
// Auth: airport.BearerAuth(validateToken),
// }
// opts := airport.ServerOptions(config)
// grpcServer := grpc.NewServer(opts...)
// airport.NewServer(grpcServer, config)
func ServerOptions(config ServerConfig) []grpc.ServerOption {
var opts []grpc.ServerOption
// Add auth interceptors if authenticator is provided
if config.Auth != nil {
opts = append(opts,
grpc.UnaryInterceptor(auth.UnaryServerInterceptor(config.Auth)),
grpc.StreamInterceptor(auth.StreamServerInterceptor(config.Auth)),
)
}
// Add max message size if specified
if config.MaxMessageSize > 0 {
opts = append(opts,
grpc.MaxRecvMsgSize(config.MaxMessageSize),
grpc.MaxSendMsgSize(config.MaxMessageSize),
)
}
return opts
}