diff --git a/go.mod b/go.mod index adf39e8..dedcd9e 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,8 @@ module go.linecorp.com/garr require github.com/valyala/fastrand v1.1.0 + +require ( + github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 +) diff --git a/go.sum b/go.sum index c5ca588..f7c61ae 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,6 @@ +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8= github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ= diff --git a/http-client/README.md b/http-client/README.md new file mode 100644 index 0000000..526ad50 --- /dev/null +++ b/http-client/README.md @@ -0,0 +1 @@ +# http client diff --git a/http-client/actions.go b/http-client/actions.go new file mode 100644 index 0000000..3f34d25 --- /dev/null +++ b/http-client/actions.go @@ -0,0 +1,47 @@ +// Copyright 2022 LINE Corporation +// +// LINE Corporation licenses this file to you under the Apache License, +// version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at: +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package httpclient + +import "net/http" + +// EndpointAction specifies in which cases a request should be passed +// to the next endpoint or retrying on current endpoint. +// +// Similar to: http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_next_upstream +type EndpointAction byte + +const ( + // None indicates that there is no need for taking any actions. + None EndpointAction = iota + + // NextEndpoint indicates that client should retry request on next endpoint. + NextEndpoint + + // Retrying indicates that client could retry request on same endpoint. + Retrying +) + +// OnStatus5xx is builtin decider which judge on 5xx status code. +func OnStatus5xx(statusCode int, _ http.Header) (action EndpointAction) { + switch statusCode { + case http.StatusBadGateway: + action = Retrying + case http.StatusInternalServerError, http.StatusServiceUnavailable: + action = NextEndpoint + default: + action = None + } + return +} diff --git a/http-client/actions_test.go b/http-client/actions_test.go new file mode 100644 index 0000000..eeb3943 --- /dev/null +++ b/http-client/actions_test.go @@ -0,0 +1,38 @@ +// Copyright 2022 LINE Corporation +// +// LINE Corporation licenses this file to you under the Apache License, +// version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at: +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package httpclient + +import ( + "net/http" + "testing" +) + +func TestOnStatus5xx(t *testing.T) { + if action := OnStatus5xx(http.StatusBadGateway, nil); action != Retrying { + t.FailNow() + } + + if action := OnStatus5xx(http.StatusInternalServerError, nil); action != NextEndpoint { + t.FailNow() + } + + if action := OnStatus5xx(http.StatusServiceUnavailable, nil); action != NextEndpoint { + t.FailNow() + } + + if action := OnStatus5xx(http.StatusOK, nil); action != None { + t.FailNow() + } +} diff --git a/http-client/balancer.go b/http-client/balancer.go new file mode 100644 index 0000000..f9a471f --- /dev/null +++ b/http-client/balancer.go @@ -0,0 +1,36 @@ +// Copyright 2022 LINE Corporation +// +// LINE Corporation licenses this file to you under the Apache License, +// version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at: +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package httpclient + +// LB is load balancer for endpoints. +type LB interface { + // Initialize endpoints for LB. + Initialize(endpoints Endpoints) + + // Endpoints returned saved/ordered endpoints of LB. + Endpoints() Endpoints + + // Pick returned index of picked endpoint. + Pick() (index int) +} + +// LBBuilder is builder for LB. +type LBBuilder interface { + Build() LB +} + +func defaultLBBuilder() LBBuilder { + return &RoundRobinLBBuilder{} +} diff --git a/http-client/balancer_pf.go b/http-client/balancer_pf.go new file mode 100644 index 0000000..28752ad --- /dev/null +++ b/http-client/balancer_pf.go @@ -0,0 +1,43 @@ +// Copyright 2022 LINE Corporation +// +// LINE Corporation licenses this file to you under the Apache License, +// version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at: +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package httpclient + +// PickfirstLB is pickfirst load balancer. +type PickfirstLB struct { + endpoints Endpoints +} + +// Initialize endpoints for PickfirstLB. +func (p *PickfirstLB) Initialize(endpoints Endpoints) { + p.endpoints = endpoints +} + +// Endpoints returned saved endpoints inside PickfirstLB +func (p *PickfirstLB) Endpoints() Endpoints { + return p.endpoints +} + +// Pick returns index of picked endpoint. +func (p *PickfirstLB) Pick() int { + return 0 +} + +// PickfirstLBBuilder is builder for Pickfirst load-balancer +type PickfirstLBBuilder struct{} + +// Build pickfirst balancer. +func (p *PickfirstLBBuilder) Build() LB { + return &PickfirstLB{} +} diff --git a/http-client/balancer_pf_test.go b/http-client/balancer_pf_test.go new file mode 100644 index 0000000..b02bfa0 --- /dev/null +++ b/http-client/balancer_pf_test.go @@ -0,0 +1,78 @@ +// Copyright 2022 LINE Corporation +// +// LINE Corporation licenses this file to you under the Apache License, +// version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at: +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package httpclient + +import ( + "testing" + "time" +) + +func TestPickFirst(t *testing.T) { + builder := &PickfirstLBBuilder{} + + lb := builder.Build() + lb.Initialize(validEndpoints()) + + endpoints := lb.Endpoints() + valid := validEndpoints() + if len(endpoints) != len(valid) { + t.FailNow() + } + for i := range endpoints { + if !endpoints[i].Equal(valid[i]) { + t.FailNow() + } + } + if lb.Pick()+lb.Pick()+lb.Pick() != 0 { + t.FailNow() + } +} + +func TestPickFirstRace(t *testing.T) { + builder := PickfirstLBBuilder{} + + lb := builder.Build() + lb.Initialize(validEndpoints()) + + type counter struct { + v [3]int + } + ch := make(chan counter, 5) + for i := 0; i < 5; i++ { + go func() { + time.Sleep(200 * time.Millisecond) + + var counting counter + for j := 0; j < 18000; j++ { + counting.v[lb.Pick()]++ + } + + ch <- counting + }() + } + + var sum counter + for i := 0; i < 5; i++ { + v := <-ch + sum.v[0] += v.v[0] + sum.v[1] += v.v[1] + sum.v[2] += v.v[2] + } + + // 90000 = 18000 * 5 + if sum.v[0] != 90000 || sum.v[1] != 0 || sum.v[2] != 0 { + t.FailNow() + } +} diff --git a/http-client/balancer_rr.go b/http-client/balancer_rr.go new file mode 100644 index 0000000..67092c2 --- /dev/null +++ b/http-client/balancer_rr.go @@ -0,0 +1,55 @@ +// Copyright 2022 LINE Corporation +// +// LINE Corporation licenses this file to you under the Apache License, +// version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at: +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package httpclient + +import ( + "sync/atomic" + + "github.com/valyala/fastrand" +) + +// RoundRobinLB is round robin strategy. +type RoundRobinLB struct { + endpoints Endpoints + index uint32 + n uint32 +} + +// Initialize endpoints for RoundRobinLB. +func (p *RoundRobinLB) Initialize(endpoints Endpoints) { + p.endpoints = endpoints + p.n = uint32(len(endpoints)) +} + +// Endpoints returned saved endpoints inside RoundRobinLB +func (p *RoundRobinLB) Endpoints() Endpoints { + return p.endpoints +} + +// Pick returns index of picked endpoint. +func (p *RoundRobinLB) Pick() (chosen int) { + if p.n > 0 { + chosen = int(atomic.AddUint32(&p.index, 1) % p.n) + } + return +} + +// RoundRobinLBBuilder is builder for RoundRobin load-balancer +type RoundRobinLBBuilder struct{} + +// Build round robin balancer. +func (p *RoundRobinLBBuilder) Build() LB { + return &RoundRobinLB{index: fastrand.Uint32()} +} diff --git a/http-client/balancer_rr_test.go b/http-client/balancer_rr_test.go new file mode 100644 index 0000000..a30fc8e --- /dev/null +++ b/http-client/balancer_rr_test.go @@ -0,0 +1,78 @@ +// Copyright 2022 LINE Corporation +// +// LINE Corporation licenses this file to you under the Apache License, +// version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at: +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package httpclient + +import ( + "testing" + "time" +) + +func TestRoundRobin(t *testing.T) { + builder := defaultLBBuilder() + + lb := builder.Build() + lb.Initialize(validEndpoints()) + + endpoints := lb.Endpoints() + valid := validEndpoints() + if len(endpoints) != len(valid) { + t.FailNow() + } + for i := range endpoints { + if !endpoints[i].Equal(valid[i]) { + t.FailNow() + } + } + if lb.Pick()+lb.Pick()+lb.Pick() != 3 { + t.FailNow() + } +} + +func TestRoundRobinRace(t *testing.T) { + builder := defaultLBBuilder() + + lb := builder.Build() + lb.Initialize(validEndpoints()) + + type counter struct { + v [3]int + } + ch := make(chan counter, 5) + for i := 0; i < 7; i++ { + go func() { + time.Sleep(200 * time.Millisecond) + + var counting counter + for j := 0; j < 18000; j++ { + counting.v[lb.Pick()]++ + } + + ch <- counting + }() + } + + var sum counter + for i := 0; i < 7; i++ { + v := <-ch + sum.v[0] += v.v[0] + sum.v[1] += v.v[1] + sum.v[2] += v.v[2] + } + + // 42000 = 18000 / 3 * 7 + if sum.v[0] != 42000 || sum.v[1] != 42000 || sum.v[2] != 42000 { + t.FailNow() + } +} diff --git a/http-client/client.go b/http-client/client.go new file mode 100644 index 0000000..eaaddab --- /dev/null +++ b/http-client/client.go @@ -0,0 +1,152 @@ +// Copyright 2022 LINE Corporation +// +// LINE Corporation licenses this file to you under the Apache License, +// version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at: +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package httpclient + +import ( + "net/http" + "sync/atomic" + "time" + + cb "go.linecorp.com/garr/circuit-breaker" + "go.linecorp.com/garr/retry" +) + +// Client instruments http.Client with feature rich. +type Client struct { + // native http client + c *http.Client + + // resolvers + resolvers []Resolver + healthChecker Resolver + chain resolverChain + chainReady int32 + + // load balancer + lbBuilder LBBuilder + lb atomic.Value // LB + + // circuit breaker builder + cbBuilder *cb.CircuitBreakerBuilder + + // retry-backoff strategy/algorithm + backoff retry.Backoff +} + +// NewClient returns new Client with user defined options. +// +// Initialization timeout specifies wait duration for client to resolve endpoint(s) +// through resolver(s). +func NewClient(initializationTimeout time.Duration, endpoints Endpoints, opts ...ClientOption) (c *Client, err error) { + // create new client + c = &Client{ + c: &http.Client{}, + } + + for i := range opts { + opts[i](c) + } + + // validate endpoint(s) + if len(endpoints) > 0 { + err = endpoints.normalize() + } else if len(c.resolvers) == 0 { + err = ErrNoEndpoints + } + + if err == nil { + // if not defined -> using default circuit breaker builder + if c.cbBuilder == nil { + c.cbBuilder = defaultCircuitBreakerBuilder() + } + + // try to build one breaker to test setting(s) + _, err = c.cbBuilder.Build() + } + + if err == nil { + // check balancer builder + // if not define -> using RoundRobin + if c.lbBuilder == nil { + c.lbBuilder = defaultLBBuilder() + } + + // if not defined -> using default backoff + if c.backoff == nil { + c.backoff = defaultBackoff() + } + + // if not defined -> using default health checker + if c.healthChecker == nil { + c.healthChecker = defaultHealthChecker() + } + + // initialize chain of resolver(s) + c.resolvers = append(c.resolvers, c.healthChecker, c) + c.chain = newResolverChain(c.resolvers) + + // warm up chain if need + if len(endpoints) > 0 { + c.chain.push(endpoints) + } + + // get first resolved endpoint(s) through chain + c.chain.wait(initializationTimeout) + } + + return +} + +// Close stops client and underlying daemons. +func (c *Client) Close() (err error) { + c.chain.close() + return +} + +// Resolve endpoints and push to next resolver in chain. +func (c *Client) Resolve(in <-chan Endpoints, out chan<- Endpoints) { + var currentEndpoints Endpoints + for endpoints := range in { + if len(endpoints) > 0 { + // create new LB + lb := c.lbBuilder.Build() + lb.Initialize(endpoints) + + // compare with current endpoints/LB + if endpoints = lb.Endpoints(); !endpoints.Equal(currentEndpoints) { + // there are changes -> assign + currentEndpoints = endpoints + + // inject new circuit breaker + for i := range endpoints { + endpoints[i].setupCB(c.cbBuilder) + } + + // update LB + c.lb.Store(lb) + + // notify chain ready + if atomic.CompareAndSwapInt32(&c.chainReady, 0, 1) { + out <- nil + } + } + } + } +} + +func (c *Client) loadLB() LB { + lb, _ := c.lb.Load().(LB) + return lb +} diff --git a/http-client/client_do.go b/http-client/client_do.go new file mode 100644 index 0000000..a8257f0 --- /dev/null +++ b/http-client/client_do.go @@ -0,0 +1,203 @@ +// Copyright 2022 LINE Corporation +// +// LINE Corporation licenses this file to you under the Apache License, +// version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at: +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package httpclient + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/hashicorp/go-multierror" +) + +// Do sends a request and returns a response, following +// policy (such as redirects, cookies, auth) as configured on the +// client. +func (c *Client) Do(req *Request) (resp *Response) { + resp = &Response{} + + lb := c.loadLB() + if lb == nil { + resp.err = ErrNoEndpoints + return + } + + endpoints := lb.Endpoints() + if len(endpoints) == 0 { + resp.err = ErrNoEndpoints + return + } + + // executing request + picked := lb.Pick() + need := c.instrument(endpoints[picked], req, resp) + + // we need some more acts, retry or try on next endpoint? + if need != None && len(endpoints) > 1 { + c.judge(need, endpoints, picked, req, resp) + } + + return +} + +func (c *Client) judge(need EndpointAction, endpoints Endpoints, pickedEndpoint int, req *Request, resp *Response) { + var ( + n = len(endpoints) + ind = pickedEndpoint + lastErr *multierror.Error + err error + retryCount int + nextDelayMillis int64 + ) + +loop: + switch need { + case Retrying: + // inc retry counter + retryCount++ + + if nextDelayMillis = c.backoff.NextDelayMillis(retryCount); nextDelayMillis >= 0 { + // wait before retrying + if nextDelayMillis > 0 { + time.Sleep(time.Duration(nextDelayMillis) * time.Millisecond) + } + + // reset response + resp.reset() + + // retrying + need = c.instrument(endpoints[ind], req, resp) + + goto loop + } else if resp.err == nil { + resp.err = fmt.Errorf("Retried host:[%v] url:[%v] but failed. Attempts so far: %d", endpoints[ind].URL.Host, req.r.URL, retryCount) + } + + case NextEndpoint: + // reset retry count + retryCount = 0 + + if ind++; ind == n { + ind = 0 + } + + if ind != pickedEndpoint { + // recording last error + if resp.err != nil { + lastErr = multierror.Append(lastErr, resp.err) + } + + // reset response + resp.reset() + + // try on next endpoint + need = c.instrument(endpoints[ind], req, resp) + + goto loop + } else { // loop all over but still failed + + // aggregating errors + if lastErr != nil { + err = lastErr.ErrorOrNil() + } + + // build up final error + if err != nil { + resp.err = fmt.Errorf("Retrying request on all endpoints but failed. Last error: %v", err) + } else { + resp.err = ErrEndpointsUnavailable + } + } + } +} + +func (c *Client) instrument(endpoint *Endpoint, req *Request, resp *Response) (action EndpointAction) { + // check if endpoint could make request + if endpoint.canRequest() { + originalURL := injectTarget(endpoint, req) + action = c.exec(endpoint, req, resp) + revert(req, originalURL) + } else { + // notify that we need to try on next endpoint + action = NextEndpoint + } + return +} + +func (c *Client) exec(endpoint *Endpoint, req *Request, resp *Response) (action EndpointAction) { + // mark instrumented request that response belongs to + resp.req = req + + // mark as noop (default) + action = None + + // execute request + if _resp, err := c.c.Do(req.r); err == nil { + // report connect success to CB + endpoint.onConnectSuccess() + + // mark original response for later draining + originalResponseBody := _resp.Body + + // verify with header-judger + if req.onRespHeader != nil { + if action = req.onRespHeader(_resp.StatusCode, _resp.Header); action != None { + drainAndClose(originalResponseBody) + return + } + } + + // do transformation(s) + for i := range req.transforms { + if _resp, err = req.transforms[i].Transform(_resp); err != nil { + resp.err = transformError(req.r.URL, err) + drainAndClose(originalResponseBody) + return + } + } + resp._resp = _resp + + // do decode + if err = decode(req, _resp); err == nil { // optimistic branching + resp.data = req.expect + } else { + resp.err = decodingError(req.r.URL, err) + } + + // drain up and close request body for connection reusing + drainAndClose(originalResponseBody) + } else { + switch { + case errors.Is(err, context.DeadlineExceeded), errors.Is(err, context.Canceled): + resp.err = requestCtxCanceledOrTimeout(req.r.URL, err) + if req.onRequestCtxCanceledOrTimeout == nil { + action = None + } else { + action = req.onRequestCtxCanceledOrTimeout() + } + + default: + // report connect failure to CB + endpoint.onConnectFailure() + + // there is something wrong with the connection, should retry on next endpoint + resp.err = connectionError(req.r.URL, err) + action = NextEndpoint + } + } + + return +} diff --git a/http-client/client_options.go b/http-client/client_options.go new file mode 100644 index 0000000..5262415 --- /dev/null +++ b/http-client/client_options.go @@ -0,0 +1,181 @@ +// Copyright 2022 LINE Corporation +// +// LINE Corporation licenses this file to you under the Apache License, +// version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at: +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package httpclient + +import ( + "net/http" + "time" + + cb "go.linecorp.com/garr/circuit-breaker" + "go.linecorp.com/garr/retry" +) + +// ClientOption is setter for client's option(s). +type ClientOption func(c *Client) + +// CheckRedirect specifies the policy for handling redirects. +// If CheckRedirect is not nil, the client calls it before +// following an HTTP redirect. The arguments req and via are +// the upcoming request and the requests made already, oldest +// first. If CheckRedirect returns an error, the Client's Get +// method returns both the previous Response (with its Body +// closed) and CheckRedirect's error (wrapped in a url.Error) +// instead of issuing the Request req. +// As a special case, if CheckRedirect returns ErrUseLastResponse, +// then the most recent response is returned with its body +// unclosed, along with a nil error. +// +// If CheckRedirect is nil, the Client uses its default policy, +// which is to stop after 10 consecutive requests. +// CheckRedirect func(req *Request, via []*Request) error + +// WithTransport attaches transport with client. +// +// Transport specifies the mechanism by which individual +// HTTP requests are made. +// If nil, DefaultTransport is used. +func WithTransport(t http.RoundTripper) ClientOption { + return func(c *Client) { + c.c.Transport = t + } +} + +// WithTimeout specifies a time limit for requests made by this +// Client. The timeout includes connection time, any +// redirects, and reading the response body. The timer remains +// running after Get, Head, Post, or Do return and will +// interrupt reading of the response body. +func WithTimeout(timeout time.Duration) ClientOption { + return func(c *Client) { + c.c.Timeout = timeout + } +} + +// WithCookieJar specifies the cookie jar. +// +// The Jar is used to insert relevant cookies into every +// outbound Request and is updated with the cookie values +// of every inbound Response. The Jar is consulted for every +// redirect that the Client follows. +// +// If Jar is nil, cookies are only sent if they are explicitly +// set on the Request. +func WithCookieJar(jar http.CookieJar) ClientOption { + return func(c *Client) { + c.c.Jar = jar + } +} + +// WithCheckRedirect specifies the policy for handling redirects. +// +// If checkRedirect is not nil, the client calls it before +// following an HTTP redirect. The arguments req and via are +// the upcoming request and the requests made already, oldest +// first. If checkRedirect returns an error, the Client's Get +// method returns both the previous Response (with its Body +// closed) and checkRedirect's error (wrapped in a url.Error) +// instead of issuing the Request req. +// +// As a special case, if checkRedirect returns http.ErrUseLastResponse, +// then the most recent response is returned with its body +// unclosed, along with a nil error. +// +// If checkRedirect is nil, the Client uses its default policy, +// which is to stop after 10 consecutive requests. +func WithCheckRedirect(checkRedirect func(req *http.Request, via []*http.Request) error) ClientOption { + return func(c *Client) { + c.c.CheckRedirect = checkRedirect + } +} + +// WithResolver appends endpoints resolver to chain. +// +// Chain of resolvers are used to resolve endpoints' url. +// It's disabled when endpoint factory presents. +// +// Chain respects the order of ClientOption arguments. +// Thus, if client is built with: +// +// client := NewClient(WithResolver(r3), WithResolver(r1), WithResolver(r2)) +// // order of resolving will be +// r3 -> r1 -> r2 +func WithResolver(r Resolver) ClientOption { + return func(c *Client) { + c.resolvers = append(c.resolvers, r) + } +} + +// WithResolvers appends endpoints resolvers to chain. +func WithResolvers(r []Resolver) ClientOption { + return func(c *Client) { + c.resolvers = append(c.resolvers, r...) + } +} + +// WithHealthChecker attachs health check resolver. Interval indicates +// the duration between each check and timeout indicates for tcp dial timeout. +// +// Default: +// - interval: 500 milllis +// - timeout: 100 millis +func WithHealthChecker(interval, timeout time.Duration) ClientOption { + return func(c *Client) { + if c.healthChecker != nil { + c.healthChecker.(*healthChecker).stopWorkers() + } + c.healthChecker = newHealthChecker(interval, timeout) + } +} + +// WithLoadBalanceBuilder specifies load balance builder which is +// used to generate LB on demand. +// +// Default: PickfirstLB +func WithLoadBalanceBuilder(builder LBBuilder) ClientOption { + return func(c *Client) { + c.lbBuilder = builder + } +} + +// WithCircuitBreakerBuilder specifies circuit breaker builder which +// is used to generate CB on deman. +// +// Default using circuit breaker with settings: +// FailureRateThreshold = 0.8 +// MinimumRequestThreshold = 10 +// TrialRequestInterval = time.Duration(3 * time.Second) +// CircuitOpenWindow = time.Duration(10 * time.Second) +// CounterSlidingWindow = time.Duration(20 * time.Second) +// CounterUpdateInterval = time.Duration(1 * time.Second) +func WithCircuitBreakerBuilder(builder *cb.CircuitBreakerBuilder) ClientOption { + return func(c *Client) { + c.cbBuilder = builder + } +} + +// WithBackoff specifies retry-backoff. +// +// Default: +// exponential: +// - initialDelayMillis: 50 +// - maxDelayMillis: 5000 (5 seconds) +// - multipiler: 1.15 +// jitter: 0.1 +// limit: 3 (try 3 times) +func WithBackoff(b retry.Backoff) ClientOption { + return func(c *Client) { + c.backoff = b + } +} diff --git a/http-client/client_test.go b/http-client/client_test.go new file mode 100644 index 0000000..007f062 --- /dev/null +++ b/http-client/client_test.go @@ -0,0 +1,328 @@ +// Copyright 2022 LINE Corporation +// +// LINE Corporation licenses this file to you under the Apache License, +// version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at: +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package httpclient + +import ( + "context" + "fmt" + "net/http" + "runtime" + "sync" + "sync/atomic" + "testing" + "time" +) + +const ( + numReq = 10000 + concurrency = 8 + initTimeout = 500 * time.Millisecond + idleConnPerHost = 20 +) + +func init() { + runtime.GOMAXPROCS(numCPU << 3) +} + +func TestClientWithInvalidParams(t *testing.T) { + _, err := NewClient(100, nil) + if err == nil { + t.FailNow() + } + + client := &Client{} + resp := client.Do(nil) + if resp.err != ErrNoEndpoints { + t.FailNow() + } + + client = &Client{} + client.lb.Store(defaultLBBuilder().Build()) + resp = client.Do(nil) + if resp.err != ErrNoEndpoints { + t.FailNow() + } +} + +func TestClient(t *testing.T) { + testClient(t, func(endpoints Endpoints) (*Client, error) { + return NewClient(initTimeout, endpoints, + WithTransport(&http.Transport{ + MaxIdleConnsPerHost: idleConnPerHost, + }), + ) + }, doRequests) +} + +func TestClientWithFailedServer(t *testing.T) { + testClient(t, func(endpoints Endpoints) (*Client, error) { + return NewClient(initTimeout, endpoints, + WithTransport(&http.Transport{ + MaxIdleConnsPerHost: idleConnPerHost, + }), + ) + }, doRequestsWithServerFailure) +} + +func TestClientWithFailedServers(t *testing.T) { + testClient(t, func(endpoints Endpoints) (*Client, error) { + return NewClient(initTimeout, endpoints, + WithTransport(&http.Transport{ + MaxIdleConnsPerHost: idleConnPerHost, + }), + WithHealthChecker(2*time.Second, 100*time.Millisecond), + ) + }, doRequestsWithAllServersFailure) +} + +func TestClientWithRequestTimeout(t *testing.T) { + testClient(t, func(endpoints Endpoints) (*Client, error) { + return NewClient(initTimeout, endpoints, + WithTransport(&http.Transport{ + MaxIdleConnsPerHost: idleConnPerHost, + }), + ) + }, doRequestsWithTimeout) + + testClient(t, func(endpoints Endpoints) (*Client, error) { + return NewClient(initTimeout, endpoints, + WithTransport(&http.Transport{ + MaxIdleConnsPerHost: idleConnPerHost, + }), + ) + }, doRequestsWithTimeoutAndMockAction) +} + +func testClient(t *testing.T, f func(Endpoints) (*Client, error), exec func([]*http.Server, *Client)) { + addresses, servers := newServers(3) + defer stopServers(servers) + + // setup endpoints + endpoints, err := ParseFromURLs(addresses) + if err != nil { + t.FailNow() + } + + client, err := f(endpoints) + if err != nil { + t.FailNow() + } + if client == nil { + t.FailNow() + } + + start := time.Now() + exec(servers, client) + t.Log("Execution time:", time.Since(start).Seconds()) + + err = client.Close() + if err != nil { + t.FailNow() + } +} + +func doRequests(servers []*http.Server, client *Client) { + var wg sync.WaitGroup + wg.Add(concurrency) + + for i := 0; i < concurrency; i++ { + go func() { + defer wg.Done() + for j := 0; j < numReq; j++ { + req, err := http.NewRequest(http.MethodGet, "/json", nil) + if err != nil { + panic(err) + } + + expect := make(map[string]string) + r := NewRequest(req, WithDecoder(JSON), WithExpect(&expect)) + + resp := client.Do(r) + if resp.StatusCode() != http.StatusOK { + panic(resp.Error()) + } + } + }() + } + wg.Wait() +} + +func doRequestsWithServerFailure(servers []*http.Server, client *Client) { + go func() { + time.Sleep(time.Second) + stopServer(servers[0]) + }() + + var wg sync.WaitGroup + wg.Add(concurrency) + + for i := 0; i < concurrency; i++ { + go func() { + defer wg.Done() + for j := 0; j < numReq; j++ { + req, err := http.NewRequest(http.MethodGet, "/json", nil) + if err != nil { + panic(err) + } + + expect := make(map[string]string) + r := NewRequest(req, WithDecoder(JSON), WithExpect(&expect)) + + resp := client.Do(r) + if resp.StatusCode() != http.StatusOK { + panic(resp.Error()) + } + } + }() + } + wg.Wait() +} + +func doRequestsWithAllServersFailure(servers []*http.Server, client *Client) { + go func() { + time.Sleep(time.Second) + stopServers(servers) + }() + + var wg sync.WaitGroup + wg.Add(concurrency) + + for i := 0; i < concurrency; i++ { + go func() { + defer wg.Done() + for j := 0; j < numReq; j++ { + req, _ := http.NewRequest(http.MethodGet, "/json", nil) + expect := make(map[string]string) + r := NewRequest(req, WithDecoder(JSON), WithExpect(&expect)) + resp := client.Do(r) + if resp.StatusCode() != http.StatusOK && resp.StatusCode() != -1 { + panic(resp.StatusCode()) + } + } + }() + } + wg.Wait() +} + +func doRequestsWithTimeout(servers []*http.Server, client *Client) { + var wg sync.WaitGroup + wg.Add(concurrency) + for i := 0; i < concurrency; i++ { + go func() { + defer wg.Done() + for j := 0; j < numReq; j++ { + req, err := http.NewRequest(http.MethodGet, "/json", nil) + if err != nil { + panic(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Nanosecond) + req = req.WithContext(ctx) + + expect := make(map[string]string) + r := NewRequest(req, WithDecoder(JSON), WithExpect(&expect)) + cancel() + + resp := client.Do(r) + if !resp.IsRequestCtxCanceledOrTimeout() { + panic(resp.Error()) + } + } + }() + } + wg.Wait() +} + +func doRequestsWithTimeoutAndMockAction(servers []*http.Server, client *Client) { + var wg sync.WaitGroup + wg.Add(concurrency) + for i := 0; i < concurrency; i++ { + go func() { + defer wg.Done() + for j := 0; j < numReq; j++ { + req, err := http.NewRequest(http.MethodGet, "/json", nil) + if err != nil { + panic(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Nanosecond) + req = req.WithContext(ctx) + + expect := make(map[string]string) + r := NewRequest(req, WithDecoder(JSON), WithExpect(&expect), OnRequestCtxCanceledOrTimeout(func() EndpointAction { + return None + })) + cancel() + + resp := client.Do(r) + if !resp.IsRequestCtxCanceledOrTimeout() { + panic(resp.Error()) + } + } + }() + } + wg.Wait() +} + +var ( + port int32 = 19909 +) + +func newServer() (addr string, server *http.Server) { + mux := http.NewServeMux() + mux.Handle("/json", http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"hello":"world"}`)) + })) + + port := atomic.AddInt32(&port, 1) + addr = fmt.Sprintf(":%d", port) + server = &http.Server{ + Addr: addr, + Handler: mux, + } + addr = fmt.Sprintf("http://127.0.0.1:%d", port) + + go func() { + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + panic(err) + } + }() + + time.Sleep(200 * time.Millisecond) + + return +} + +func newServers(num int) (addrs []string, servers []*http.Server) { + for i := 0; i < num; i++ { + addr, server := newServer() + addrs = append(addrs, addr) + servers = append(servers, server) + } + return +} + +func stopServers(servers []*http.Server) { + for i := range servers { + stopServer(servers[i]) + } +} + +func stopServer(server *http.Server) { + if err := server.Shutdown(context.Background()); err != nil { + panic(err) + } +} diff --git a/http-client/decoder.go b/http-client/decoder.go new file mode 100644 index 0000000..4b7abf8 --- /dev/null +++ b/http-client/decoder.go @@ -0,0 +1,22 @@ +// Copyright 2022 LINE Corporation +// +// LINE Corporation licenses this file to you under the Apache License, +// version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at: +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package httpclient + +import ( + "io" +) + +// Decoder parses the data from r and stores the result in the value pointed to by "expected". +type Decoder func(r io.Reader, expected interface{}) (err error) diff --git a/http-client/decoder_json.go b/http-client/decoder_json.go new file mode 100644 index 0000000..55a2178 --- /dev/null +++ b/http-client/decoder_json.go @@ -0,0 +1,27 @@ +// Copyright 2022 LINE Corporation +// +// LINE Corporation licenses this file to you under the Apache License, +// version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at: +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package httpclient + +import ( + "encoding/json" + "io" +) + +// JSON is builtin decoder for json encoded data. +func JSON(r io.Reader, expected interface{}) (err error) { + decoder := json.NewDecoder(r) + err = decoder.Decode(expected) + return +} diff --git a/http-client/decoder_test.go b/http-client/decoder_test.go new file mode 100644 index 0000000..68cb4dc --- /dev/null +++ b/http-client/decoder_test.go @@ -0,0 +1,89 @@ +// Copyright 2022 LINE Corporation +// +// LINE Corporation licenses this file to you under the Apache License, +// version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at: +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package httpclient + +import ( + "bytes" + "testing" +) + +func TestDecoders(t *testing.T) { + // expect is write closer + req, resp := &Request{ + expect: &mockWriteCloser{Buffer: bytes.NewBuffer(make([]byte, 0, 50))}, + }, createMockResponse() + err := decode(req, resp) + if err != nil { + t.FailNow() + } + if !req.expect.(*mockWriteCloser).closeState { + t.FailNow() + } + + payload := req.expect.(*mockWriteCloser).Bytes() + if len(payload) != 100 { + t.FailNow() + } + for i := range payload { + if payload[i] != byte(i) { + t.FailNow() + } + } + + // expect is an object and need json decoder + v := make(map[string]string) + req, resp = &Request{ + expect: &v, + decoder: JSON, + }, createMockJSONResponse(map[string]string{"a": "B", "c": "D"}) + err = decode(req, resp) + if err != nil { + t.FailNow() + } + if !equalMapString(v, map[string]string{"a": "B", "c": "D"}) { + t.FailNow() + } + + // writer but error + req, resp = &Request{ + expect: &mockErrWriter{}, + }, createMockResponse() + if err = decode(req, resp); err == nil { + t.FailNow() + } + + // write ok, but close error + req, resp = &Request{ + expect: &mockWriteErrCloser{}, + }, createMockResponse() + if err = decode(req, resp); err == nil { + t.FailNow() + } +} + +func equalMapString(map1, map2 map[string]string) bool { + if len(map1) != len(map2) { + return false + } + + for k, v1 := range map1 { + v2, exist := map2[k] + if !exist || v1 != v2 { + return false + } + } + + return true +} diff --git a/http-client/endpoint.go b/http-client/endpoint.go new file mode 100644 index 0000000..64651e7 --- /dev/null +++ b/http-client/endpoint.go @@ -0,0 +1,190 @@ +// Copyright 2022 LINE Corporation +// +// LINE Corporation licenses this file to you under the Apache License, +// version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at: +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package httpclient + +import ( + "net" + "net/url" + "strconv" + "time" + + cb "go.linecorp.com/garr/circuit-breaker" +) + +// EndpointMetadata represents endpoint's metadata. +type EndpointMetadata struct { + Weight uint `json:"weight" yaml:"weight"` +} + +// Equal performs deep equal checking. +func (e *EndpointMetadata) Equal(other *EndpointMetadata) (eq bool) { + eq = other != nil && + e.Weight == other.Weight + return +} + +// RawEndpoint is composite of raw url alongs and its metadata. +type RawEndpoint struct { + RawURL string `json:"url" yaml:"url"` + Metadata EndpointMetadata `json:"meta" yaml:"meta"` +} + +// ToEndpoint converts to Endpoint. +func (r *RawEndpoint) ToEndpoint() (endp *Endpoint, err error) { + u, err := url.Parse(r.RawURL) + if err == nil { + endp = &Endpoint{ + URL: *u, + Metadata: r.Metadata, + } + err = endp.normalize() + } + return +} + +// Endpoint is composite of url.URL and its metadata. +type Endpoint struct { + url.URL + network string + breaker cb.CircuitBreaker + Metadata EndpointMetadata +} + +// Equal performs deep equal checking. +func (e *Endpoint) Equal(other *Endpoint) (eq bool) { + eq = other != nil && + e.URL.Host == other.URL.Host && + e.URL.Scheme == other.URL.Scheme && + e.URL.Opaque == other.URL.Opaque && + e.equalUser(other) && + e.Metadata.Equal(&other.Metadata) + return +} + +func (e *Endpoint) equalUser(other *Endpoint) (eq bool) { + if e.URL.User == nil { + eq = other.URL.User == nil + } else if eq = other.URL.User != nil && e.URL.User.Username() == other.URL.User.Username(); eq { + p1, s1 := e.URL.User.Password() + p2, s2 := other.URL.User.Password() + eq = s1 == s2 && p1 == p2 + } + return +} + +func (e *Endpoint) canRequest() bool { + return e.breaker.CanRequest() +} + +func (e *Endpoint) onConnectFailure() { + e.breaker.OnFailure() +} + +func (e *Endpoint) onConnectSuccess() { + e.breaker.OnSuccess() +} + +// setup circuit breaker +func (e *Endpoint) setupCB(builder *cb.CircuitBreakerBuilder) { + e.breaker, _ = builder.Build() +} + +func (e *Endpoint) normalize() (err error) { + network, p, err := lookupPortByScheme(e.Scheme) + if err == nil { + if e.Port() == "" { + e.Host = net.JoinHostPort(e.Host, strconv.Itoa(p)) + } + e.network = network + } + return +} + +// Dial endpoint. +func (e *Endpoint) Dial(timeout time.Duration) (success bool) { + var ( + conn net.Conn + err error + ) + + if timeout > 0 { + conn, err = net.DialTimeout(e.network, e.Host, timeout) + } else { + conn, err = net.Dial(e.network, e.Host) + } + + if err == nil { + if conn != nil { + _ = conn.Close() + } + success = true + } + + return +} + +// Endpoints represents a collection of endpoint(s). +type Endpoints []*Endpoint + +// Equal performs deep equal checking. +func (e Endpoints) Equal(other Endpoints) (eq bool) { + if len(e) == len(other) { + for i := range e { + if !e[i].Equal(other[i]) { + return + } + } + eq = true + } + return +} + +// Clone endpoints. +func (e Endpoints) Clone() Endpoints { + eps := make([]*Endpoint, len(e)) + copy(eps, e) + return eps +} + +func (e Endpoints) normalize() (err error) { + for i := range e { + if err = e[i].normalize(); err != nil { + return + } + } + return +} + +// Parse endpoint(s) from raw(s). +func Parse(raws []RawEndpoint) (eps Endpoints, err error) { + eps = make([]*Endpoint, len(raws)) + + for i := range raws { + if eps[i], err = raws[i].ToEndpoint(); err != nil { + return + } + } + + return +} + +// ParseFromURLs endpoint(s) from url(s). +func ParseFromURLs(urls []string) (eps Endpoints, err error) { + raws := make([]RawEndpoint, len(urls)) + for i := range urls { + raws[i].RawURL = urls[i] + } + return Parse(raws) +} diff --git a/http-client/endpoint_test.go b/http-client/endpoint_test.go new file mode 100644 index 0000000..d893b1a --- /dev/null +++ b/http-client/endpoint_test.go @@ -0,0 +1,134 @@ +// Copyright 2022 LINE Corporation +// +// LINE Corporation licenses this file to you under the Apache License, +// version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at: +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package httpclient + +import ( + "testing" + "time" +) + +func TestParseEndpoints(t *testing.T) { + _, err := ParseFromURLs([]string{}) + if err != nil { + t.FailNow() + } + + _, err = ParseFromURLs([]string{"://github.com"}) + if err == nil { + t.FailNow() + } + + _, err = ParseFromURLs([]string{"phantom://github.com"}) + if err == nil { + t.FailNow() + } + + endpoints, err := ParseFromURLs([]string{"https://github.com"}) + if err != nil { + t.FailNow() + } + if len(endpoints) != 1 { + t.FailNow() + } + if endpoints[0].Scheme != "https" || + endpoints[0].Host != "github.com:443" || + endpoints[0].Port() != "443" { + t.FailNow() + } +} + +func TestEndpointsNormalization(t *testing.T) { + endpoints, err := ParseFromURLs([]string{"https://github.com", "https://google.com"}) + if err != nil { + t.FailNow() + } + if len(endpoints) != 2 { + t.FailNow() + } + + // inject wrong scheme + endpoints[0].Scheme = "phantom" + if endpoints.normalize() == nil { + t.FailNow() + } +} + +func TestEndpointsClone(t *testing.T) { + endpoints, _ := ParseFromURLs([]string{"https://github.com", "https://google.com"}) + cloned := endpoints.Clone() + if !endpoints.Equal(cloned) { + t.FailNow() + } +} + +func TestEndpointsEqual(t *testing.T) { + endpoints1, _ := ParseFromURLs([]string{"https://github.com", "https://google.com"}) + endpoints2, _ := ParseFromURLs([]string{"https://linxGnu@github.com", "https://google.com"}) + if endpoints1.Equal(endpoints2) { + t.FailNow() + } + + endpoints1, _ = ParseFromURLs([]string{"https://test@github.com", "https://google.com"}) + endpoints2, _ = ParseFromURLs([]string{"https://linxGnu@github.com", "https://google.com"}) + if endpoints1.Equal(endpoints2) { + t.FailNow() + } + + endpoints1, _ = ParseFromURLs([]string{"https://test@github.com", "https://google.com"}) + endpoints2, _ = ParseFromURLs([]string{"https://github.com", "https://google.com"}) + if endpoints1.Equal(endpoints2) { + t.FailNow() + } + + endpoints1, _ = ParseFromURLs([]string{"https://test@github.com", "https://google.com"}) + endpoints2, _ = ParseFromURLs([]string{"https://test@github.com", "https://google.com"}) + if !endpoints1.Equal(endpoints2) { + t.FailNow() + } +} + +func TestEndpointDial(t *testing.T) { + dialEndpoint := func(rawURL string, validCase bool) { + r := &RawEndpoint{RawURL: rawURL} + e, err := r.ToEndpoint() + if err != nil { + t.FailNow() + } + if e.Dial(0) != validCase { + t.FailNow() + } + if e.Dial(100*time.Millisecond) != validCase { + t.FailNow() + } + } + + // valid cases + valids := []string{ + "https://github.com", + "https://google.com", + } + for i := range valids { + dialEndpoint(valids[i], true) + } + + // invalid cases + invalids := []string{ + "https://google1.com", + "http://127.0.0.1:9578", + } + for i := range invalids { + dialEndpoint(invalids[i], false) + } +} diff --git a/http-client/errors.go b/http-client/errors.go new file mode 100644 index 0000000..ee66164 --- /dev/null +++ b/http-client/errors.go @@ -0,0 +1,89 @@ +// Copyright 2022 LINE Corporation +// +// LINE Corporation licenses this file to you under the Apache License, +// version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at: +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package httpclient + +import ( + "fmt" + "net/url" +) + +var ( + // ErrNoEndpoints indicates no endpoints avaiable. + ErrNoEndpoints = fmt.Errorf("There is no endpoints avaiable") + + // ErrEndpointsUnavailable indicates endpoints are unavailable. + ErrEndpointsUnavailable = fmt.Errorf("All endpoints are unavailable or all circuit-breakers of endpoints are opened") +) + +type errorCategory int + +const ( + // indicates no error. + errNone errorCategory = iota + + // indicates connection error category, i.e dial, unstable/terminal state, etc. + errConnection + + // indicates decoding error category. + errDecoding + + // indicates transform error category. + errTransform + + // indicates request context canceled explicitly/timeout. + errRequestCtxCanceledOrTimeout +) + +// wraps over error +type errorWrap struct { + category errorCategory + err error +} + +func (e *errorWrap) Error() string { + return e.err.Error() +} + +func (e *errorWrap) Unwrap() error { + return e.err +} + +func connectionError(url *url.URL, err error) error { + return &errorWrap{ + category: errConnection, + err: fmt.Errorf("Connection to host:[%s] got error:[%w]", url.Host, err), + } +} + +func decodingError(url *url.URL, err error) error { + return &errorWrap{ + category: errDecoding, + err: fmt.Errorf("Decoding of response body from host:[%s] got error:[%w]", url.Host, err), + } +} + +func transformError(url *url.URL, err error) error { + return &errorWrap{ + category: errTransform, + err: fmt.Errorf("Transformation of response body from host:[%s] got error:[%w]", url.Host, err), + } +} + +func requestCtxCanceledOrTimeout(url *url.URL, err error) error { + return &errorWrap{ + category: errRequestCtxCanceledOrTimeout, + err: fmt.Errorf("Request context canceled or timeout. Detail: Host:[%s] Error:[%w]", url.Host, err), + } +} diff --git a/http-client/errors_test.go b/http-client/errors_test.go new file mode 100644 index 0000000..a998677 --- /dev/null +++ b/http-client/errors_test.go @@ -0,0 +1,56 @@ +// Copyright 2022 LINE Corporation +// +// LINE Corporation licenses this file to you under the Apache License, +// version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at: +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package httpclient + +import ( + "errors" + "fmt" + "net/url" + "testing" +) + +func TestErrors(t *testing.T) { + u, _ := url.Parse("https://google.com:8091") + + e := fmt.Errorf("Decoding error") + v := decodingError(u, e) + if !errors.Is(v.(*errorWrap).err, e) { + t.FailNow() + } + if v.(*errorWrap).category != errDecoding { + t.FailNow() + } + + e = fmt.Errorf("Transform error") + v = transformError(u, e) + if !errors.Is(v.(*errorWrap).err, e) { + t.FailNow() + } + if v.(*errorWrap).category != errTransform { + t.FailNow() + } + + e = fmt.Errorf("Fake Connection error") + v = connectionError(u, e) + if !errors.Is(v.(*errorWrap).err, e) { + t.FailNow() + } + if v.(*errorWrap).category != errConnection { + t.FailNow() + } + if v.Error() != "Connection to host:[google.com:8091] got error:[Fake Connection error]" { + t.FailNow() + } +} diff --git a/http-client/request.go b/http-client/request.go new file mode 100644 index 0000000..3eeb8a7 --- /dev/null +++ b/http-client/request.go @@ -0,0 +1,55 @@ +// Copyright 2022 LINE Corporation +// +// LINE Corporation licenses this file to you under the Apache License, +// version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at: +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package httpclient + +import "net/http" + +// Request instruments http.Request with additional properties/hooks +// that help performing RoundTrip action. +type Request struct { + // original http request + r *http.Request + + // transformations chain for response + transforms []Transformer + + // response body decoder + decoder Decoder + + // action on received response header + onRespHeader func(statusCode int, header http.Header) EndpointAction + + // action when request context canceled or timeout + onRequestCtxCanceledOrTimeout func() EndpointAction + + // expect output + expect interface{} +} + +// NewRequest creates new Request. +func NewRequest(r *http.Request, opts ...RequestOption) *Request { + v := &Request{ + r: r, + } + + // make empty to trigger using url.URL + r.Host = "" + + for i := range opts { + opts[i](v) + } + + return v +} diff --git a/http-client/request_options.go b/http-client/request_options.go new file mode 100644 index 0000000..b5838af --- /dev/null +++ b/http-client/request_options.go @@ -0,0 +1,79 @@ +// Copyright 2022 LINE Corporation +// +// LINE Corporation licenses this file to you under the Apache License, +// version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at: +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package httpclient + +import "net/http" + +// RequestOption is setter for request's option. +type RequestOption func(r *Request) + +// WithTransform adds a transformation for received response. +// Response would be transformed in order of RequestOption arguments. +// +// Default: empty transformers +func WithTransform(t Transformer) RequestOption { + return func(r *Request) { + r.transforms = append(r.transforms, t) + } +} + +// WithTransforms adds transformation(s) for received response. +// Response would be transformed in order of RequestOption arguments. +// +// Default: empty transformers +func WithTransforms(t []Transformer) RequestOption { + return func(r *Request) { + r.transforms = append(r.transforms, t...) + } +} + +// WithDecoder attaches a decoder for received response. +// +// Default: nil +func WithDecoder(d Decoder) RequestOption { + return func(r *Request) { + r.decoder = d + } +} + +// WithExpect indicates expectation of decoded response data. +// This option is not mandatory. +// +// Default: nil +func WithExpect(expect interface{}) RequestOption { + return func(r *Request) { + r.expect = expect + } +} + +// OnResponseHeader specifies action to take according to received response header. +// This option is not mandatory. +// +// Default: nil +func OnResponseHeader(f func(statusCode int, header http.Header) EndpointAction) RequestOption { + return func(r *Request) { + r.onRespHeader = f + } +} + +// OnRequestCtxCanceledOrTimeout specifies action to take in case request context is canceled +// or timeout. +// +// Default: nil +func OnRequestCtxCanceledOrTimeout(f func() EndpointAction) RequestOption { + return func(r *Request) { + r.onRequestCtxCanceledOrTimeout = f + } +} diff --git a/http-client/request_test.go b/http-client/request_test.go new file mode 100644 index 0000000..13a38fd --- /dev/null +++ b/http-client/request_test.go @@ -0,0 +1,63 @@ +// Copyright 2022 LINE Corporation +// +// LINE Corporation licenses this file to you under the Apache License, +// version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at: +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package httpclient + +import ( + "net/http" + "testing" +) + +func TestNewRequest(t *testing.T) { + rawReq, err := http.NewRequest(http.MethodDelete, "http://host2:9999/query", nil) + if err != nil { + t.FailNow() + } + + expect := make(map[int]int) + req := NewRequest(rawReq, + WithTransform(Limiter(128<<10)), + WithDecoder(JSON), + WithTransforms([]Transformer{Limiter(128 << 8)}), + WithExpect(&expect), + OnResponseHeader(OnStatus5xx), + ) + if err != nil { + t.FailNow() + } + if req == nil { + t.FailNow() + } + if rawReq.Host != "" { + t.FailNow() + } + if &expect != req.expect { + t.FailNow() + } + if req.onRespHeader == nil { + t.FailNow() + } + + if len(req.transforms) != 2 { + t.FailNow() + } + l, ok := req.transforms[0].(*limiter) + if !ok || l.n != 128<<10 { + t.FailNow() + } + l, ok = req.transforms[1].(*limiter) + if !ok || l.n != 128<<8 { + t.FailNow() + } +} diff --git a/http-client/resolver.go b/http-client/resolver.go new file mode 100644 index 0000000..7c71fc6 --- /dev/null +++ b/http-client/resolver.go @@ -0,0 +1,22 @@ +// Copyright 2022 LINE Corporation +// +// LINE Corporation licenses this file to you under the Apache License, +// version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at: +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package httpclient + +// Resolver is resolver for endpoints. +type Resolver interface { + // Resolve endpoint(s) and notify next resolver on chain. + // Input endpoint(s) must not be changed. + Resolve(in <-chan Endpoints, out chan<- Endpoints) +} diff --git a/http-client/resolver_chain.go b/http-client/resolver_chain.go new file mode 100644 index 0000000..dd17920 --- /dev/null +++ b/http-client/resolver_chain.go @@ -0,0 +1,90 @@ +// Copyright 2022 LINE Corporation +// +// LINE Corporation licenses this file to you under the Apache License, +// version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at: +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package httpclient + +import ( + "sync" + "sync/atomic" + "time" +) + +// runtime for chain of resolver(s) +type resolverChain struct { + wg *sync.WaitGroup + nResolvers int + resolvers []Resolver + pipes []chan Endpoints + ready int32 +} + +func newResolverChain(resolvers []Resolver) (chain resolverChain) { + // setup pipeline(s) + n := len(resolvers) + pipes := make([]chan Endpoints, n+1) + for i := range pipes { + pipes[i] = make(chan Endpoints, 1) + } + + // setup chain + chain = resolverChain{ + wg: &sync.WaitGroup{}, + nResolvers: n, + resolvers: resolvers, + pipes: pipes, + } + + // run resolver(s) + chain.wg.Add(n) + for i, r := range resolvers { + go func(r Resolver, in, out chan Endpoints) { + r.Resolve(in, out) + close(out) + chain.wg.Done() + }(r, pipes[i], pipes[i+1]) + } + + return +} + +func (r *resolverChain) close() { + if r.nResolvers > 0 { + close(r.pipes[0]) + r.wg.Wait() + } +} + +func (r *resolverChain) push(endpoints Endpoints) { + r.pipes[0] <- endpoints +} + +func (r *resolverChain) wait(timeout time.Duration) (ready bool) { + if atomic.CompareAndSwapInt32(&r.ready, 0, 1) { + tm := time.NewTimer(timeout) + + select { + case <-r.pipes[r.nResolvers]: + tm.Stop() + atomic.StoreInt32(&r.ready, 2) + ready = true + + case <-tm.C: + atomic.StoreInt32(&r.ready, 0) + } + + } else { + ready = atomic.LoadInt32(&r.ready) == 2 + } + return +} diff --git a/http-client/resolver_chain_test.go b/http-client/resolver_chain_test.go new file mode 100644 index 0000000..c062c23 --- /dev/null +++ b/http-client/resolver_chain_test.go @@ -0,0 +1,55 @@ +// Copyright 2022 LINE Corporation +// +// LINE Corporation licenses this file to you under the Apache License, +// version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at: +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package httpclient + +import ( + "testing" + "time" +) + +type noopResolver struct{} + +func (l *noopResolver) Resolve(in <-chan Endpoints, out chan<- Endpoints) { + for v := range in { + time.Sleep(300 * time.Millisecond) + out <- v + } +} + +func TestResolverChainWait(t *testing.T) { + chain := newResolverChain([]Resolver{&noopResolver{}}) + chain.push(nil) + + readies := make(chan bool) + go chainWait(chain, 50*time.Millisecond, readies) + go chainWait(chain, 50*time.Millisecond, readies) + go chainWait(chain, 50*time.Millisecond, readies) + + time.Sleep(300 * time.Millisecond) + if !chain.wait(50 * time.Millisecond) { + t.FailNow() + } + for i := 0; i < 3; i++ { + if <-readies { + t.FailNow() + } + } + close(readies) + chain.close() +} + +func chainWait(chain resolverChain, timeout time.Duration, results chan bool) { + results <- chain.wait(timeout) +} diff --git a/http-client/resolver_healthchecker.go b/http-client/resolver_healthchecker.go new file mode 100644 index 0000000..1b8f4d1 --- /dev/null +++ b/http-client/resolver_healthchecker.go @@ -0,0 +1,117 @@ +// Copyright 2022 LINE Corporation +// +// LINE Corporation licenses this file to you under the Apache License, +// version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at: +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package httpclient + +import ( + "context" + "runtime" + "time" + + workerpool "go.linecorp.com/garr/worker-pool" +) + +var ( + numCPU = runtime.NumCPU() +) + +type healthChecker struct { + interval time.Duration + timeout time.Duration + workers *workerpool.Pool +} + +func newHealthChecker(interval, timeout time.Duration) Resolver { + if interval <= 0 { + if interval = timeout << 1; interval <= 0 { + interval = 200 * time.Millisecond + } + } + + h := &healthChecker{ + interval: interval, + timeout: timeout, + workers: workerpool.NewPool(context.Background(), workerpool.Option{ + NumberWorker: numCPU, + }), + } + h.workers.Start() + + return h +} + +func (h *healthChecker) stopWorkers() { + h.workers.Stop() +} + +func doHealthCheck(index int, endpoint *Endpoint, timeout time.Duration, out chan int) *workerpool.Task { + return workerpool.NewTask(context.Background(), func(context.Context) (interface{}, error) { + if success := endpoint.Dial(timeout); success { + out <- index + } else { + out <- -1 + } + return nil, nil + }) +} + +func (h *healthChecker) Resolve(in <-chan Endpoints, out chan<- Endpoints) { + var ( + endpoints Endpoints + ch = make(chan int, 8) + ) + + do := func(endpoints Endpoints) { + if n := len(endpoints); n > 0 { + resolved := make([]*Endpoint, 0, n) + + for i := range endpoints { + h.workers.Do(doHealthCheck(i, endpoints[i], h.timeout, ch)) + } + + for range endpoints { + if ind := <-ch; ind >= 0 { + resolved = append(resolved, endpoints[ind]) + } + } + + // notify next resolver + if len(resolved) > 0 { + out <- resolved + } + } + } + + // setup ticker + ticker := time.NewTicker(h.interval) + + for { + select { + case eps, ok := <-in: + if !ok { + ticker.Stop() + h.stopWorkers() + return + } + + if len(eps) > 0 && eps.normalize() == nil { + endpoints = eps + do(endpoints) + } + + case <-ticker.C: + do(endpoints) + } + } +} diff --git a/http-client/resolver_healthchecker_test.go b/http-client/resolver_healthchecker_test.go new file mode 100644 index 0000000..5b7e795 --- /dev/null +++ b/http-client/resolver_healthchecker_test.go @@ -0,0 +1,63 @@ +// Copyright 2022 LINE Corporation +// +// LINE Corporation licenses this file to you under the Apache License, +// version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at: +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package httpclient + +import ( + "testing" + "time" +) + +func TestHealthChecker(t *testing.T) { + h := newHealthChecker(200*time.Millisecond, 500*time.Millisecond) + testHealthChecker(t, h, 3) +} + +func TestHealthCheckerInvalidInput(t *testing.T) { + h := newHealthChecker(-1, 100).(*healthChecker) + if h.interval != 200 { + t.FailNow() + } + h.stopWorkers() + + h = newHealthChecker(-1, -2).(*healthChecker) + if h.interval != 200*time.Millisecond { + t.FailNow() + } + h.stopWorkers() +} + +func TestHealthCheckerInfiniteTimeout(t *testing.T) { + h := newHealthChecker(100*time.Millisecond, 0) + testHealthChecker(t, h, 3) +} + +func testHealthChecker(t *testing.T, h Resolver, expectNumResolvedEndpoints int) { + // initialize input and output chan + in, out := make(chan Endpoints, 1), make(chan Endpoints, 1) + + // start resolving + go h.Resolve(in, out) + + in <- validEndpoints() + v := <-out + if v == nil { + t.FailNow() + } + if len(v) != expectNumResolvedEndpoints { + t.FailNow() + } + + close(in) // to notify health checker +} diff --git a/http-client/response.go b/http-client/response.go new file mode 100644 index 0000000..08327f1 --- /dev/null +++ b/http-client/response.go @@ -0,0 +1,105 @@ +// Copyright 2022 LINE Corporation +// +// LINE Corporation licenses this file to you under the Apache License, +// version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at: +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package httpclient + +import ( + "net/http" +) + +// Response instruments http.Response with features. +type Response struct { + req *Request + _resp *http.Response + data interface{} + err error +} + +func (r *Response) reset() { + r.req = nil + r._resp = nil + r.data = nil + r.err = nil +} + +// Raw returns received raw http.Response. +// +// Response body is always instrumented by the client +// and processed through transformers and decoder defined +// with belonged request. +func (r *Response) Raw() *http.Response { + return r._resp +} + +// Request returns Request that this response belongs to. +func (r *Response) Request() *Request { + return r.req +} + +// StatusCode returns response status code (if have). +// On connection failure, return -1. +func (r *Response) StatusCode() (code int) { + if r._resp != nil { + code = r._resp.StatusCode + } else { + code = -1 + } + return +} + +// Status returns response status (if have). +// On connection failure, return empty. +func (r *Response) Status() (status string) { + if r._resp != nil { + status = r._resp.Status + } + return +} + +// Data returns (parsed) response data. +func (r *Response) Data() interface{} { + return r.data +} + +func (r *Response) errorCategory() errorCategory { + if ew, ok := r.err.(*errorWrap); ok { + return ew.category + } + return errNone +} + +// IsDecodingError indicates decoding error. +func (r *Response) IsDecodingError() (v bool) { + return r.errorCategory() == errDecoding +} + +// IsConnectionError indicates connection error. +func (r *Response) IsConnectionError() bool { + return r.errorCategory() == errConnection +} + +// IsTransformError indicates transformation error. +func (r *Response) IsTransformError() bool { + return r.errorCategory() == errTransform +} + +// IsRequestCtxCanceledOrTimeout indicates request context canceled explicitly or timeout occured. +func (r *Response) IsRequestCtxCanceledOrTimeout() bool { + return r.errorCategory() == errRequestCtxCanceledOrTimeout +} + +// Error returns response error (in detail). +func (r *Response) Error() error { + return r.err +} diff --git a/http-client/response_test.go b/http-client/response_test.go new file mode 100644 index 0000000..10673f0 --- /dev/null +++ b/http-client/response_test.go @@ -0,0 +1,146 @@ +// Copyright 2022 LINE Corporation +// +// LINE Corporation licenses this file to you under the Apache License, +// version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at: +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package httpclient + +import ( + "errors" + "fmt" + "net/http" + "testing" +) + +func TestResponseReset(t *testing.T) { + r := &Response{ + req: &Request{}, + _resp: &http.Response{}, + data: 123, + err: fmt.Errorf("Fake error"), + } + r.reset() + if r.req != nil { + t.FailNow() + } + if r._resp != nil { + t.FailNow() + } + if r.data != nil { + t.FailNow() + } + if r.err != nil { + t.FailNow() + } +} + +func TestResponseMethods(t *testing.T) { + req := &Request{} + _resp := &http.Response{StatusCode: 404, Status: "fake"} + + r := &Response{} + if r.StatusCode() != -1 { + t.FailNow() + } + + r = &Response{ + req: req, + _resp: _resp, + data: 123, + err: fmt.Errorf("Mock error"), + } + if r.Raw() != _resp { + t.FailNow() + } + if r.Request() != req { + t.FailNow() + } + if r.StatusCode() != 404 { + t.FailNow() + } + if r.Status() != "fake" { + t.FailNow() + } + if r.Data().(int) != 123 { + t.FailNow() + } + if r.errorCategory() != errNone { + t.FailNow() + } + if r.Error().Error() != "Mock error" { + t.FailNow() + } + if r.IsConnectionError() != false { + t.FailNow() + } + if r.IsDecodingError() != false { + t.FailNow() + } + if r.IsTransformError() != false { + t.FailNow() + } + + // inject decoding error + r.err = &errorWrap{ + category: errDecoding, + err: fmt.Errorf("Decoding ERROR"), + } + if r.IsConnectionError() != false { + t.FailNow() + } + if r.IsDecodingError() != true { + t.FailNow() + } + if r.IsTransformError() != false { + t.FailNow() + } + if r.Error().Error() != "Decoding ERROR" { + t.FailNow() + } + + // inject connection error + r.err = &errorWrap{ + category: errConnection, + err: fmt.Errorf("Connection ERROR"), + } + if r.IsConnectionError() != true { + t.FailNow() + } + if r.IsDecodingError() != false { + t.FailNow() + } + if r.IsTransformError() != false { + t.FailNow() + } + if r.Error().Error() != "Connection ERROR" { + t.FailNow() + } + + // inject transform error + er := fmt.Errorf("Transform ERROR") + r.err = &errorWrap{ + category: errTransform, + err: er, + } + if r.IsConnectionError() != false { + t.FailNow() + } + if r.IsDecodingError() != false { + t.FailNow() + } + if r.IsTransformError() != true { + t.FailNow() + } + if !errors.Is(r.Error(), er) { + t.FailNow() + } +} diff --git a/http-client/transformer.go b/http-client/transformer.go new file mode 100644 index 0000000..bccfc82 --- /dev/null +++ b/http-client/transformer.go @@ -0,0 +1,37 @@ +// Copyright 2022 LINE Corporation +// +// LINE Corporation licenses this file to you under the Apache License, +// version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at: +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package httpclient + +import ( + "net/http" +) + +// Transformer represents a transformation of response to new one. +type Transformer interface { + Transform(*http.Response) (*http.Response, error) +} + +type transformer struct { + exec func(*http.Response) (*http.Response, error) +} + +func (t *transformer) Transform(resp *http.Response) (*http.Response, error) { + return t.exec(resp) +} + +// NewTransformer creates new transformer from predefined action. +func NewTransformer(exec func(*http.Response) (*http.Response, error)) Transformer { + return &transformer{exec: exec} +} diff --git a/http-client/transformer_limiter.go b/http-client/transformer_limiter.go new file mode 100644 index 0000000..a2ae7d0 --- /dev/null +++ b/http-client/transformer_limiter.go @@ -0,0 +1,41 @@ +// Copyright 2022 LINE Corporation +// +// LINE Corporation licenses this file to you under the Apache License, +// version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at: +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package httpclient + +import ( + "fmt" + "io" + "io/ioutil" + "net/http" +) + +// Limiter returns transformer which limit response body. +func Limiter(n int64) Transformer { + return &limiter{n: n} +} + +type limiter struct { + n int64 +} + +func (l *limiter) Transform(resp *http.Response) (transformed *http.Response, err error) { + if l.n >= resp.ContentLength { + resp.Body = ioutil.NopCloser(io.LimitReader(resp.Body, l.n)) + transformed = resp + } else { + err = fmt.Errorf("Response body limit exceed. Limit:%d Actual:%d", l.n, resp.ContentLength) + } + return +} diff --git a/http-client/transformer_limiter_test.go b/http-client/transformer_limiter_test.go new file mode 100644 index 0000000..ded5291 --- /dev/null +++ b/http-client/transformer_limiter_test.go @@ -0,0 +1,44 @@ +// Copyright 2022 LINE Corporation +// +// LINE Corporation licenses this file to you under the Apache License, +// version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at: +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package httpclient + +import ( + "net/http" + "testing" +) + +func TestLimiter(t *testing.T) { + l := Limiter(1024) + + resp, err := l.Transform(&http.Response{ + ContentLength: 1024, + }) + if err != nil { + t.FailNow() + } + if resp.Body == nil { + t.FailNow() + } + + resp, err = l.Transform(&http.Response{ + ContentLength: 4096, + }) + if err == nil { + t.FailNow() + } + if resp != nil { + t.FailNow() + } +} diff --git a/http-client/transformer_test.go b/http-client/transformer_test.go new file mode 100644 index 0000000..163832e --- /dev/null +++ b/http-client/transformer_test.go @@ -0,0 +1,54 @@ +// Copyright 2022 LINE Corporation +// +// LINE Corporation licenses this file to you under the Apache License, +// version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at: +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package httpclient + +import ( + "bytes" + "compress/gzip" + "io/ioutil" + "net/http" + "testing" +) + +func TestTransformer(t *testing.T) { + v := NewTransformer(func(h *http.Response) (*http.Response, error) { + rd, err := gzip.NewReader(h.Body) + if err != nil { + return nil, err + } + h.Body = ioutil.NopCloser(rd) + return h, nil + }) + + resp, err := v.Transform(&http.Response{ + Body: ioutil.NopCloser(bytes.NewReader(createGzippedPayload())), + }) + if err != nil { + t.FailNow() + } + + payload, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.FailNow() + } + if len(payload) != 10000 { + t.FailNow() + } + for i := range payload { + if payload[i] != byte(i) { + t.FailNow() + } + } +} diff --git a/http-client/util.go b/http-client/util.go new file mode 100644 index 0000000..8876cfb --- /dev/null +++ b/http-client/util.go @@ -0,0 +1,132 @@ +// Copyright 2022 LINE Corporation +// +// LINE Corporation licenses this file to you under the Apache License, +// version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at: +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package httpclient + +import ( + "io" + "io/ioutil" + "net" + "net/http" + "net/url" + "path" + "time" + + cb "go.linecorp.com/garr/circuit-breaker" + "go.linecorp.com/garr/retry" +) + +var ( + knownNetworks = []string{"tcp", "udp", ""} +) + +func lookupPortByScheme(scheme string) (network string, port int, err error) { + for i := range knownNetworks { + if port, err = net.LookupPort(knownNetworks[i], scheme); err == nil { + network = knownNetworks[i] + return + } + } + return +} + +func injectTarget(endpoint *Endpoint, req *Request) (originalURL *url.URL) { + u := req.r.URL + + originalURL = &url.URL{ + Scheme: u.Scheme, + Opaque: u.Opaque, + User: u.User, + Host: u.Host, + Path: u.Path, + } + + u.Scheme = endpoint.Scheme + u.Opaque = endpoint.Opaque + u.User = endpoint.User + u.Host = endpoint.Host + u.Path = path.Join(endpoint.Path, u.Path) + + return +} + +func revert(req *Request, originalURL *url.URL) { + u := req.r.URL + + u.Scheme = originalURL.Scheme + u.Opaque = originalURL.Opaque + u.User = originalURL.User + u.Host = originalURL.Host + u.Path = originalURL.Path +} + +func decode(req *Request, resp *http.Response) (err error) { + if req.expect != nil { + if w, ok := req.expect.(io.Writer); ok { + if _, err = io.Copy(w, resp.Body); err == nil { + if closer, ok := req.expect.(io.Closer); ok { + err = closer.Close() + } + } + } else if req.decoder != nil { + err = req.decoder(resp.Body, req.expect) + } + } + return +} + +func drainAndClose(r io.ReadCloser) { + _, _ = io.Copy(ioutil.Discard, r) + _ = r.Close() +} + +// default circuit breaker settings: +// defaultFailureRateThreshold = 0.8 +// defaultMinimumRequestThreshold = 10 +// defaultTrialRequestInterval = time.Duration(3 * time.Second) +// defaultCircuitOpenWindow = time.Duration(10 * time.Second) +// defaultCounterSlidingWindow = time.Duration(20 * time.Second) +// defaultCounterUpdateInterval = time.Duration(1 * time.Second) +// +// See also: https://line.github.io/armeria/client-circuit-breaker.html +func defaultCircuitBreakerBuilder() *cb.CircuitBreakerBuilder { + return cb.NewCircuitBreakerBuilder() +} + +// default backoff, using: +// +// exponential: +// - initialDelayMillis: 50 +// - maxDelayMillis: 5000 (5 seconds) +// - multipiler: 1.15 +// jitter: 0.1 +// limit: 3 (try 3 times) +// +// See also: https://line.github.io/armeria/client-retry.html#backoff +func defaultBackoff() (b retry.Backoff) { + baseBackoff, _ := retry.NewExponentialBackoff(50, 5000, 1.15) + b, _ = retry.NewBackoffBuilder(). + BaseBackoff(baseBackoff). + WithJitter(0.1). + WithLimit(3). + Build() + return +} + +// default health check resolver: +// - interval: 500 Millis +// - timeout: 100 Millis +func defaultHealthChecker() Resolver { + return newHealthChecker(500*time.Millisecond, 100*time.Millisecond) +} diff --git a/http-client/util_test.go b/http-client/util_test.go new file mode 100644 index 0000000..e876539 --- /dev/null +++ b/http-client/util_test.go @@ -0,0 +1,205 @@ +// Copyright 2022 LINE Corporation +// +// LINE Corporation licenses this file to you under the Apache License, +// version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at: +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package httpclient + +import ( + "bytes" + "compress/gzip" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "testing" +) + +type mockReadCloser struct { + *bytes.Buffer + closeState bool +} + +func (m *mockReadCloser) Close() error { + m.closeState = true + return nil +} + +type mockWriteCloser struct { + *bytes.Buffer + closeState bool +} + +func (m *mockWriteCloser) Close() error { + m.closeState = true + return nil +} + +type mockErrWriter struct { +} + +func (m *mockErrWriter) Write(p []byte) (n int, err error) { + return 0, fmt.Errorf("Fake error") +} + +type mockWriteErrCloser struct { +} + +func (m *mockWriteErrCloser) Write(p []byte) (n int, err error) { + return len(p), nil +} + +func (m *mockWriteErrCloser) Close() error { + return fmt.Errorf("Fake error") +} + +func createMockResponse() *http.Response { + return &http.Response{ + Body: ioutil.NopCloser(bytes.NewReader(createBytes())), + } +} + +func createMockJSONResponse(payload interface{}) *http.Response { + b, _ := json.Marshal(payload) + return &http.Response{ + Body: ioutil.NopCloser(bytes.NewReader(b)), + } +} + +func createBytes() []byte { + v := make([]byte, 100) + for i := range v { + v[i] = byte(i) + } + return v +} + +func createGzippedPayload() []byte { + v := make([]byte, 10000) + for i := range v { + v[i] = byte(i) + } + + var buf bytes.Buffer + gz := gzip.NewWriter(&buf) + _, _ = gz.Write(v) + _ = gz.Close() + + return buf.Bytes() +} + +func TestDefaultCB(t *testing.T) { + if b := defaultCircuitBreakerBuilder(); b == nil { + t.FailNow() + } +} + +func TestDefaultBackoff(t *testing.T) { + if b := defaultBackoff(); b == nil { + t.FailNow() + } +} + +func TestDrainAndClose(t *testing.T) { + r := &mockReadCloser{ + Buffer: bytes.NewBuffer(make([]byte, 100)), + } + + drainAndClose(r) + + if r.closeState != true { + t.FailNow() + } + if r.Buffer.Len() != 0 { + t.FailNow() + } +} + +func TestInjectAndRevert(t *testing.T) { + // create an endpoint + rawEndpoint := &RawEndpoint{RawURL: "https://host1:9999/path"} + endpoint, err := rawEndpoint.ToEndpoint() + if err != nil { + t.FailNow() + } + if endpoint.Path != "/path" { + t.FailNow() + } + + // create request + rawReq, err := http.NewRequest(http.MethodDelete, "http://host2:9999/query", nil) + if err != nil { + t.FailNow() + } + + // try to injection + req := NewRequest(rawReq) + original := injectTarget(endpoint, req) + if rawReq.URL.Scheme != "https" || + rawReq.URL.Host != "host1:9999" || + rawReq.URL.Path != "/path/query" { + t.FailNow() + } + + // try to revert + revert(req, original) + if rawReq.URL.Scheme != "http" || + rawReq.URL.Host != "host2:9999" || + rawReq.URL.Path != "/query" { + t.FailNow() + } +} + +func TestLookupPortByScheme(t *testing.T) { + network, p, err := lookupPortByScheme("http") + if err != nil || p != 80 || network != "tcp" { + t.FailNow() + } + + network, p, err = lookupPortByScheme("https") + if err != nil || p != 443 || network != "tcp" { + t.FailNow() + } + + network, p, err = lookupPortByScheme("ftp") + if err != nil || p != 21 || network != "tcp" { + t.FailNow() + } + + network, p, err = lookupPortByScheme("ssh") + if err != nil || p != 22 || network != "tcp" { + t.FailNow() + } + + network, p, err = lookupPortByScheme("ftps") + if err != nil || p != 990 || network != "tcp" { + t.FailNow() + } + + network, p, err = lookupPortByScheme("unknown") + if err == nil || p != 0 || network != "" { + t.FailNow() + } +} + +func validEndpoints() Endpoints { + valids := []string{ + "https://github.com", + "https://google.com", + "https://golang.org", + } + endpoints, err := ParseFromURLs(valids) + if err != nil { + panic(err) + } + return endpoints +}