First commit of smarter-device-manager

This commit is contained in:
Alexandre Ferreira
2019-11-22 10:52:49 -06:00
commit 6a04d69326
539 changed files with 357090 additions and 0 deletions

View File

@@ -0,0 +1,143 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package transport
import (
"sync"
"time"
)
const (
// bdpLimit is the maximum value the flow control windows
// will be increased to.
bdpLimit = (1 << 20) * 4
// alpha is a constant factor used to keep a moving average
// of RTTs.
alpha = 0.9
// If the current bdp sample is greater than or equal to
// our beta * our estimated bdp and the current bandwidth
// sample is the maximum bandwidth observed so far, we
// increase our bbp estimate by a factor of gamma.
beta = 0.66
// To put our bdp to be smaller than or equal to twice the real BDP,
// we should multiply our current sample with 4/3, however to round things out
// we use 2 as the multiplication factor.
gamma = 2
)
var (
// Adding arbitrary data to ping so that its ack can be
// identified.
// Easter-egg: what does the ping message say?
bdpPing = &ping{data: [8]byte{2, 4, 16, 16, 9, 14, 7, 7}}
)
type bdpEstimator struct {
// sentAt is the time when the ping was sent.
sentAt time.Time
mu sync.Mutex
// bdp is the current bdp estimate.
bdp uint32
// sample is the number of bytes received in one measurement cycle.
sample uint32
// bwMax is the maximum bandwidth noted so far (bytes/sec).
bwMax float64
// bool to keep track of the beginning of a new measurement cycle.
isSent bool
// Callback to update the window sizes.
updateFlowControl func(n uint32)
// sampleCount is the number of samples taken so far.
sampleCount uint64
// round trip time (seconds)
rtt float64
}
// timesnap registers the time bdp ping was sent out so that
// network rtt can be calculated when its ack is received.
// It is called (by controller) when the bdpPing is
// being written on the wire.
func (b *bdpEstimator) timesnap(d [8]byte) {
if bdpPing.data != d {
return
}
b.sentAt = time.Now()
}
// add adds bytes to the current sample for calculating bdp.
// It returns true only if a ping must be sent. This can be used
// by the caller (handleData) to make decision about batching
// a window update with it.
func (b *bdpEstimator) add(n uint32) bool {
b.mu.Lock()
defer b.mu.Unlock()
if b.bdp == bdpLimit {
return false
}
if !b.isSent {
b.isSent = true
b.sample = n
b.sentAt = time.Time{}
b.sampleCount++
return true
}
b.sample += n
return false
}
// calculate is called when an ack for a bdp ping is received.
// Here we calculate the current bdp and bandwidth sample and
// decide if the flow control windows should go up.
func (b *bdpEstimator) calculate(d [8]byte) {
// Check if the ping acked for was the bdp ping.
if bdpPing.data != d {
return
}
b.mu.Lock()
rttSample := time.Since(b.sentAt).Seconds()
if b.sampleCount < 10 {
// Bootstrap rtt with an average of first 10 rtt samples.
b.rtt += (rttSample - b.rtt) / float64(b.sampleCount)
} else {
// Heed to the recent past more.
b.rtt += (rttSample - b.rtt) * float64(alpha)
}
b.isSent = false
// The number of bytes accumulated so far in the sample is smaller
// than or equal to 1.5 times the real BDP on a saturated connection.
bwCurrent := float64(b.sample) / (b.rtt * float64(1.5))
if bwCurrent > b.bwMax {
b.bwMax = bwCurrent
}
// If the current sample (which is smaller than or equal to the 1.5 times the real BDP) is
// greater than or equal to 2/3rd our perceived bdp AND this is the maximum bandwidth seen so far, we
// should update our perception of the network BDP.
if float64(b.sample) >= beta*float64(b.bdp) && bwCurrent == b.bwMax && b.bdp != bdpLimit {
sampleFloat := float64(b.sample)
b.bdp = uint32(gamma * sampleFloat)
if b.bdp > bdpLimit {
b.bdp = bdpLimit
}
bdp := b.bdp
b.mu.Unlock()
b.updateFlowControl(bdp)
return
}
b.mu.Unlock()
}

311
vendor/google.golang.org/grpc/transport/control.go generated vendored Normal file
View File

@@ -0,0 +1,311 @@
/*
*
* Copyright 2014 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package transport
import (
"fmt"
"math"
"sync"
"sync/atomic"
"time"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
)
const (
// The default value of flow control window size in HTTP2 spec.
defaultWindowSize = 65535
// The initial window size for flow control.
initialWindowSize = defaultWindowSize // for an RPC
infinity = time.Duration(math.MaxInt64)
defaultClientKeepaliveTime = infinity
defaultClientKeepaliveTimeout = time.Duration(20 * time.Second)
defaultMaxStreamsClient = 100
defaultMaxConnectionIdle = infinity
defaultMaxConnectionAge = infinity
defaultMaxConnectionAgeGrace = infinity
defaultServerKeepaliveTime = time.Duration(2 * time.Hour)
defaultServerKeepaliveTimeout = time.Duration(20 * time.Second)
defaultKeepalivePolicyMinTime = time.Duration(5 * time.Minute)
// max window limit set by HTTP2 Specs.
maxWindowSize = math.MaxInt32
// defaultLocalSendQuota sets is default value for number of data
// bytes that each stream can schedule before some of it being
// flushed out.
defaultLocalSendQuota = 64 * 1024
)
// The following defines various control items which could flow through
// the control buffer of transport. They represent different aspects of
// control tasks, e.g., flow control, settings, streaming resetting, etc.
type headerFrame struct {
streamID uint32
hf []hpack.HeaderField
endStream bool
}
func (*headerFrame) item() {}
type continuationFrame struct {
streamID uint32
endHeaders bool
headerBlockFragment []byte
}
type dataFrame struct {
streamID uint32
endStream bool
d []byte
f func()
}
func (*dataFrame) item() {}
func (*continuationFrame) item() {}
type windowUpdate struct {
streamID uint32
increment uint32
}
func (*windowUpdate) item() {}
type settings struct {
ack bool
ss []http2.Setting
}
func (*settings) item() {}
type resetStream struct {
streamID uint32
code http2.ErrCode
}
func (*resetStream) item() {}
type goAway struct {
code http2.ErrCode
debugData []byte
headsUp bool
closeConn bool
}
func (*goAway) item() {}
type flushIO struct {
}
func (*flushIO) item() {}
type ping struct {
ack bool
data [8]byte
}
func (*ping) item() {}
// quotaPool is a pool which accumulates the quota and sends it to acquire()
// when it is available.
type quotaPool struct {
c chan int
mu sync.Mutex
version uint32
quota int
}
// newQuotaPool creates a quotaPool which has quota q available to consume.
func newQuotaPool(q int) *quotaPool {
qb := &quotaPool{
c: make(chan int, 1),
}
if q > 0 {
qb.c <- q
} else {
qb.quota = q
}
return qb
}
// add cancels the pending quota sent on acquired, incremented by v and sends
// it back on acquire.
func (qb *quotaPool) add(v int) {
qb.mu.Lock()
defer qb.mu.Unlock()
qb.lockedAdd(v)
}
func (qb *quotaPool) lockedAdd(v int) {
select {
case n := <-qb.c:
qb.quota += n
default:
}
qb.quota += v
if qb.quota <= 0 {
return
}
// After the pool has been created, this is the only place that sends on
// the channel. Since mu is held at this point and any quota that was sent
// on the channel has been retrieved, we know that this code will always
// place any positive quota value on the channel.
select {
case qb.c <- qb.quota:
qb.quota = 0
default:
}
}
func (qb *quotaPool) addAndUpdate(v int) {
qb.mu.Lock()
defer qb.mu.Unlock()
qb.lockedAdd(v)
// Update the version only after having added to the quota
// so that if acquireWithVesrion sees the new vesrion it is
// guaranteed to have seen the updated quota.
// Also, still keep this inside of the lock, so that when
// compareAndExecute is processing, this function doesn't
// get executed partially (quota gets updated but the version
// doesn't).
atomic.AddUint32(&(qb.version), 1)
}
func (qb *quotaPool) acquireWithVersion() (<-chan int, uint32) {
return qb.c, atomic.LoadUint32(&(qb.version))
}
func (qb *quotaPool) compareAndExecute(version uint32, success, failure func()) bool {
qb.mu.Lock()
defer qb.mu.Unlock()
if version == atomic.LoadUint32(&(qb.version)) {
success()
return true
}
failure()
return false
}
// acquire returns the channel on which available quota amounts are sent.
func (qb *quotaPool) acquire() <-chan int {
return qb.c
}
// inFlow deals with inbound flow control
type inFlow struct {
mu sync.Mutex
// The inbound flow control limit for pending data.
limit uint32
// pendingData is the overall data which have been received but not been
// consumed by applications.
pendingData uint32
// The amount of data the application has consumed but grpc has not sent
// window update for them. Used to reduce window update frequency.
pendingUpdate uint32
// delta is the extra window update given by receiver when an application
// is reading data bigger in size than the inFlow limit.
delta uint32
}
// newLimit updates the inflow window to a new value n.
// It assumes that n is always greater than the old limit.
func (f *inFlow) newLimit(n uint32) uint32 {
f.mu.Lock()
defer f.mu.Unlock()
d := n - f.limit
f.limit = n
return d
}
func (f *inFlow) maybeAdjust(n uint32) uint32 {
if n > uint32(math.MaxInt32) {
n = uint32(math.MaxInt32)
}
f.mu.Lock()
defer f.mu.Unlock()
// estSenderQuota is the receiver's view of the maximum number of bytes the sender
// can send without a window update.
estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate))
// estUntransmittedData is the maximum number of bytes the sends might not have put
// on the wire yet. A value of 0 or less means that we have already received all or
// more bytes than the application is requesting to read.
estUntransmittedData := int32(n - f.pendingData) // Casting into int32 since it could be negative.
// This implies that unless we send a window update, the sender won't be able to send all the bytes
// for this message. Therefore we must send an update over the limit since there's an active read
// request from the application.
if estUntransmittedData > estSenderQuota {
// Sender's window shouldn't go more than 2^31 - 1 as speecified in the HTTP spec.
if f.limit+n > maxWindowSize {
f.delta = maxWindowSize - f.limit
} else {
// Send a window update for the whole message and not just the difference between
// estUntransmittedData and estSenderQuota. This will be helpful in case the message
// is padded; We will fallback on the current available window(at least a 1/4th of the limit).
f.delta = n
}
return f.delta
}
return 0
}
// onData is invoked when some data frame is received. It updates pendingData.
func (f *inFlow) onData(n uint32) error {
f.mu.Lock()
defer f.mu.Unlock()
f.pendingData += n
if f.pendingData+f.pendingUpdate > f.limit+f.delta {
return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate, f.limit)
}
return nil
}
// onRead is invoked when the application reads the data. It returns the window size
// to be sent to the peer.
func (f *inFlow) onRead(n uint32) uint32 {
f.mu.Lock()
defer f.mu.Unlock()
if f.pendingData == 0 {
return 0
}
f.pendingData -= n
if n > f.delta {
n -= f.delta
f.delta = 0
} else {
f.delta -= n
n = 0
}
f.pendingUpdate += n
if f.pendingUpdate >= f.limit/4 {
wu := f.pendingUpdate
f.pendingUpdate = 0
return wu
}
return 0
}
func (f *inFlow) resetPendingUpdate() uint32 {
f.mu.Lock()
defer f.mu.Unlock()
n := f.pendingUpdate
f.pendingUpdate = 0
return n
}

View File

@@ -0,0 +1,413 @@
/*
*
* Copyright 2016 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// This file is the implementation of a gRPC server using HTTP/2 which
// uses the standard Go http2 Server implementation (via the
// http.Handler interface), rather than speaking low-level HTTP/2
// frames itself. It is the implementation of *grpc.Server.ServeHTTP.
package transport
import (
"errors"
"fmt"
"io"
"net"
"net/http"
"strings"
"sync"
"time"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
"golang.org/x/net/http2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
)
// NewServerHandlerTransport returns a ServerTransport handling gRPC
// from inside an http.Handler. It requires that the http Server
// supports HTTP/2.
func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request) (ServerTransport, error) {
if r.ProtoMajor != 2 {
return nil, errors.New("gRPC requires HTTP/2")
}
if r.Method != "POST" {
return nil, errors.New("invalid gRPC request method")
}
if !validContentType(r.Header.Get("Content-Type")) {
return nil, errors.New("invalid gRPC request content-type")
}
if _, ok := w.(http.Flusher); !ok {
return nil, errors.New("gRPC requires a ResponseWriter supporting http.Flusher")
}
if _, ok := w.(http.CloseNotifier); !ok {
return nil, errors.New("gRPC requires a ResponseWriter supporting http.CloseNotifier")
}
st := &serverHandlerTransport{
rw: w,
req: r,
closedCh: make(chan struct{}),
writes: make(chan func()),
}
if v := r.Header.Get("grpc-timeout"); v != "" {
to, err := decodeTimeout(v)
if err != nil {
return nil, streamErrorf(codes.Internal, "malformed time-out: %v", err)
}
st.timeoutSet = true
st.timeout = to
}
var metakv []string
if r.Host != "" {
metakv = append(metakv, ":authority", r.Host)
}
for k, vv := range r.Header {
k = strings.ToLower(k)
if isReservedHeader(k) && !isWhitelistedPseudoHeader(k) {
continue
}
for _, v := range vv {
v, err := decodeMetadataHeader(k, v)
if err != nil {
return nil, streamErrorf(codes.InvalidArgument, "malformed binary metadata: %v", err)
}
metakv = append(metakv, k, v)
}
}
st.headerMD = metadata.Pairs(metakv...)
return st, nil
}
// serverHandlerTransport is an implementation of ServerTransport
// which replies to exactly one gRPC request (exactly one HTTP request),
// using the net/http.Handler interface. This http.Handler is guaranteed
// at this point to be speaking over HTTP/2, so it's able to speak valid
// gRPC.
type serverHandlerTransport struct {
rw http.ResponseWriter
req *http.Request
timeoutSet bool
timeout time.Duration
didCommonHeaders bool
headerMD metadata.MD
closeOnce sync.Once
closedCh chan struct{} // closed on Close
// writes is a channel of code to run serialized in the
// ServeHTTP (HandleStreams) goroutine. The channel is closed
// when WriteStatus is called.
writes chan func()
// block concurrent WriteStatus calls
// e.g. grpc/(*serverStream).SendMsg/RecvMsg
writeStatusMu sync.Mutex
}
func (ht *serverHandlerTransport) Close() error {
ht.closeOnce.Do(ht.closeCloseChanOnce)
return nil
}
func (ht *serverHandlerTransport) closeCloseChanOnce() { close(ht.closedCh) }
func (ht *serverHandlerTransport) RemoteAddr() net.Addr { return strAddr(ht.req.RemoteAddr) }
// strAddr is a net.Addr backed by either a TCP "ip:port" string, or
// the empty string if unknown.
type strAddr string
func (a strAddr) Network() string {
if a != "" {
// Per the documentation on net/http.Request.RemoteAddr, if this is
// set, it's set to the IP:port of the peer (hence, TCP):
// https://golang.org/pkg/net/http/#Request
//
// If we want to support Unix sockets later, we can
// add our own grpc-specific convention within the
// grpc codebase to set RemoteAddr to a different
// format, or probably better: we can attach it to the
// context and use that from serverHandlerTransport.RemoteAddr.
return "tcp"
}
return ""
}
func (a strAddr) String() string { return string(a) }
// do runs fn in the ServeHTTP goroutine.
func (ht *serverHandlerTransport) do(fn func()) error {
// Avoid a panic writing to closed channel. Imperfect but maybe good enough.
select {
case <-ht.closedCh:
return ErrConnClosing
default:
select {
case ht.writes <- fn:
return nil
case <-ht.closedCh:
return ErrConnClosing
}
}
}
func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) error {
ht.writeStatusMu.Lock()
defer ht.writeStatusMu.Unlock()
err := ht.do(func() {
ht.writeCommonHeaders(s)
// And flush, in case no header or body has been sent yet.
// This forces a separation of headers and trailers if this is the
// first call (for example, in end2end tests's TestNoService).
ht.rw.(http.Flusher).Flush()
h := ht.rw.Header()
h.Set("Grpc-Status", fmt.Sprintf("%d", st.Code()))
if m := st.Message(); m != "" {
h.Set("Grpc-Message", encodeGrpcMessage(m))
}
if p := st.Proto(); p != nil && len(p.Details) > 0 {
stBytes, err := proto.Marshal(p)
if err != nil {
// TODO: return error instead, when callers are able to handle it.
panic(err)
}
h.Set("Grpc-Status-Details-Bin", encodeBinHeader(stBytes))
}
if md := s.Trailer(); len(md) > 0 {
for k, vv := range md {
// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
if isReservedHeader(k) {
continue
}
for _, v := range vv {
// http2 ResponseWriter mechanism to send undeclared Trailers after
// the headers have possibly been written.
h.Add(http2.TrailerPrefix+k, encodeMetadataHeader(k, v))
}
}
}
})
if err == nil { // transport has not been closed
ht.Close()
close(ht.writes)
}
return err
}
// writeCommonHeaders sets common headers on the first write
// call (Write, WriteHeader, or WriteStatus).
func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) {
if ht.didCommonHeaders {
return
}
ht.didCommonHeaders = true
h := ht.rw.Header()
h["Date"] = nil // suppress Date to make tests happy; TODO: restore
h.Set("Content-Type", "application/grpc")
// Predeclare trailers we'll set later in WriteStatus (after the body).
// This is a SHOULD in the HTTP RFC, and the way you add (known)
// Trailers per the net/http.ResponseWriter contract.
// See https://golang.org/pkg/net/http/#ResponseWriter
// and https://golang.org/pkg/net/http/#example_ResponseWriter_trailers
h.Add("Trailer", "Grpc-Status")
h.Add("Trailer", "Grpc-Message")
h.Add("Trailer", "Grpc-Status-Details-Bin")
if s.sendCompress != "" {
h.Set("Grpc-Encoding", s.sendCompress)
}
}
func (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
return ht.do(func() {
ht.writeCommonHeaders(s)
ht.rw.Write(hdr)
ht.rw.Write(data)
if !opts.Delay {
ht.rw.(http.Flusher).Flush()
}
})
}
func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
return ht.do(func() {
ht.writeCommonHeaders(s)
h := ht.rw.Header()
for k, vv := range md {
// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
if isReservedHeader(k) {
continue
}
for _, v := range vv {
v = encodeMetadataHeader(k, v)
h.Add(k, v)
}
}
ht.rw.WriteHeader(200)
ht.rw.(http.Flusher).Flush()
})
}
func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context) {
// With this transport type there will be exactly 1 stream: this HTTP request.
var ctx context.Context
var cancel context.CancelFunc
if ht.timeoutSet {
ctx, cancel = context.WithTimeout(context.Background(), ht.timeout)
} else {
ctx, cancel = context.WithCancel(context.Background())
}
// requestOver is closed when either the request's context is done
// or the status has been written via WriteStatus.
requestOver := make(chan struct{})
// clientGone receives a single value if peer is gone, either
// because the underlying connection is dead or because the
// peer sends an http2 RST_STREAM.
clientGone := ht.rw.(http.CloseNotifier).CloseNotify()
go func() {
select {
case <-requestOver:
return
case <-ht.closedCh:
case <-clientGone:
}
cancel()
}()
req := ht.req
s := &Stream{
id: 0, // irrelevant
requestRead: func(int) {},
cancel: cancel,
buf: newRecvBuffer(),
st: ht,
method: req.URL.Path,
recvCompress: req.Header.Get("grpc-encoding"),
}
pr := &peer.Peer{
Addr: ht.RemoteAddr(),
}
if req.TLS != nil {
pr.AuthInfo = credentials.TLSInfo{State: *req.TLS}
}
ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
ctx = peer.NewContext(ctx, pr)
s.ctx = newContextWithStream(ctx, s)
s.trReader = &transportReader{
reader: &recvBufferReader{ctx: s.ctx, recv: s.buf},
windowHandler: func(int) {},
}
// readerDone is closed when the Body.Read-ing goroutine exits.
readerDone := make(chan struct{})
go func() {
defer close(readerDone)
// TODO: minimize garbage, optimize recvBuffer code/ownership
const readSize = 8196
for buf := make([]byte, readSize); ; {
n, err := req.Body.Read(buf)
if n > 0 {
s.buf.put(recvMsg{data: buf[:n:n]})
buf = buf[n:]
}
if err != nil {
s.buf.put(recvMsg{err: mapRecvMsgError(err)})
return
}
if len(buf) == 0 {
buf = make([]byte, readSize)
}
}
}()
// startStream is provided by the *grpc.Server's serveStreams.
// It starts a goroutine serving s and exits immediately.
// The goroutine that is started is the one that then calls
// into ht, calling WriteHeader, Write, WriteStatus, Close, etc.
startStream(s)
ht.runStream()
close(requestOver)
// Wait for reading goroutine to finish.
req.Body.Close()
<-readerDone
}
func (ht *serverHandlerTransport) runStream() {
for {
select {
case fn, ok := <-ht.writes:
if !ok {
return
}
fn()
case <-ht.closedCh:
return
}
}
}
func (ht *serverHandlerTransport) Drain() {
panic("Drain() is not implemented")
}
// mapRecvMsgError returns the non-nil err into the appropriate
// error value as expected by callers of *grpc.parser.recvMsg.
// In particular, in can only be:
// * io.EOF
// * io.ErrUnexpectedEOF
// * of type transport.ConnectionError
// * of type transport.StreamError
func mapRecvMsgError(err error) error {
if err == io.EOF || err == io.ErrUnexpectedEOF {
return err
}
if se, ok := err.(http2.StreamError); ok {
if code, ok := http2ErrConvTab[se.Code]; ok {
return StreamError{
Code: code,
Desc: se.Error(),
}
}
}
return connectionErrorf(true, err, err.Error())
}

1322
vendor/google.golang.org/grpc/transport/http2_client.go generated vendored Normal file

File diff suppressed because it is too large Load Diff

1178
vendor/google.golang.org/grpc/transport/http2_server.go generated vendored Normal file

File diff suppressed because it is too large Load Diff

489
vendor/google.golang.org/grpc/transport/http_util.go generated vendored Normal file
View File

@@ -0,0 +1,489 @@
/*
*
* Copyright 2014 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package transport
import (
"bufio"
"bytes"
"encoding/base64"
"fmt"
"io"
"net"
"net/http"
"strconv"
"strings"
"time"
"github.com/golang/protobuf/proto"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const (
// http2MaxFrameLen specifies the max length of a HTTP2 frame.
http2MaxFrameLen = 16384 // 16KB frame
// http://http2.github.io/http2-spec/#SettingValues
http2InitHeaderTableSize = 4096
// http2IOBufSize specifies the buffer size for sending frames.
defaultWriteBufSize = 32 * 1024
defaultReadBufSize = 32 * 1024
)
var (
clientPreface = []byte(http2.ClientPreface)
http2ErrConvTab = map[http2.ErrCode]codes.Code{
http2.ErrCodeNo: codes.Internal,
http2.ErrCodeProtocol: codes.Internal,
http2.ErrCodeInternal: codes.Internal,
http2.ErrCodeFlowControl: codes.ResourceExhausted,
http2.ErrCodeSettingsTimeout: codes.Internal,
http2.ErrCodeStreamClosed: codes.Internal,
http2.ErrCodeFrameSize: codes.Internal,
http2.ErrCodeRefusedStream: codes.Unavailable,
http2.ErrCodeCancel: codes.Canceled,
http2.ErrCodeCompression: codes.Internal,
http2.ErrCodeConnect: codes.Internal,
http2.ErrCodeEnhanceYourCalm: codes.ResourceExhausted,
http2.ErrCodeInadequateSecurity: codes.PermissionDenied,
http2.ErrCodeHTTP11Required: codes.FailedPrecondition,
}
statusCodeConvTab = map[codes.Code]http2.ErrCode{
codes.Internal: http2.ErrCodeInternal,
codes.Canceled: http2.ErrCodeCancel,
codes.Unavailable: http2.ErrCodeRefusedStream,
codes.ResourceExhausted: http2.ErrCodeEnhanceYourCalm,
codes.PermissionDenied: http2.ErrCodeInadequateSecurity,
}
httpStatusConvTab = map[int]codes.Code{
// 400 Bad Request - INTERNAL.
http.StatusBadRequest: codes.Internal,
// 401 Unauthorized - UNAUTHENTICATED.
http.StatusUnauthorized: codes.Unauthenticated,
// 403 Forbidden - PERMISSION_DENIED.
http.StatusForbidden: codes.PermissionDenied,
// 404 Not Found - UNIMPLEMENTED.
http.StatusNotFound: codes.Unimplemented,
// 429 Too Many Requests - UNAVAILABLE.
http.StatusTooManyRequests: codes.Unavailable,
// 502 Bad Gateway - UNAVAILABLE.
http.StatusBadGateway: codes.Unavailable,
// 503 Service Unavailable - UNAVAILABLE.
http.StatusServiceUnavailable: codes.Unavailable,
// 504 Gateway timeout - UNAVAILABLE.
http.StatusGatewayTimeout: codes.Unavailable,
}
)
// Records the states during HPACK decoding. Must be reset once the
// decoding of the entire headers are finished.
type decodeState struct {
encoding string
// statusGen caches the stream status received from the trailer the server
// sent. Client side only. Do not access directly. After all trailers are
// parsed, use the status method to retrieve the status.
statusGen *status.Status
// rawStatusCode and rawStatusMsg are set from the raw trailer fields and are not
// intended for direct access outside of parsing.
rawStatusCode *int
rawStatusMsg string
httpStatus *int
// Server side only fields.
timeoutSet bool
timeout time.Duration
method string
// key-value metadata map from the peer.
mdata map[string][]string
statsTags []byte
statsTrace []byte
}
// isReservedHeader checks whether hdr belongs to HTTP2 headers
// reserved by gRPC protocol. Any other headers are classified as the
// user-specified metadata.
func isReservedHeader(hdr string) bool {
if hdr != "" && hdr[0] == ':' {
return true
}
switch hdr {
case "content-type",
"grpc-message-type",
"grpc-encoding",
"grpc-message",
"grpc-status",
"grpc-timeout",
"grpc-status-details-bin",
"te":
return true
default:
return false
}
}
// isWhitelistedPseudoHeader checks whether hdr belongs to HTTP2 pseudoheaders
// that should be propagated into metadata visible to users.
func isWhitelistedPseudoHeader(hdr string) bool {
switch hdr {
case ":authority":
return true
default:
return false
}
}
func validContentType(t string) bool {
e := "application/grpc"
if !strings.HasPrefix(t, e) {
return false
}
// Support variations on the content-type
// (e.g. "application/grpc+blah", "application/grpc;blah").
if len(t) > len(e) && t[len(e)] != '+' && t[len(e)] != ';' {
return false
}
return true
}
func (d *decodeState) status() *status.Status {
if d.statusGen == nil {
// No status-details were provided; generate status using code/msg.
d.statusGen = status.New(codes.Code(int32(*(d.rawStatusCode))), d.rawStatusMsg)
}
return d.statusGen
}
const binHdrSuffix = "-bin"
func encodeBinHeader(v []byte) string {
return base64.RawStdEncoding.EncodeToString(v)
}
func decodeBinHeader(v string) ([]byte, error) {
if len(v)%4 == 0 {
// Input was padded, or padding was not necessary.
return base64.StdEncoding.DecodeString(v)
}
return base64.RawStdEncoding.DecodeString(v)
}
func encodeMetadataHeader(k, v string) string {
if strings.HasSuffix(k, binHdrSuffix) {
return encodeBinHeader(([]byte)(v))
}
return v
}
func decodeMetadataHeader(k, v string) (string, error) {
if strings.HasSuffix(k, binHdrSuffix) {
b, err := decodeBinHeader(v)
return string(b), err
}
return v, nil
}
func (d *decodeState) decodeResponseHeader(frame *http2.MetaHeadersFrame) error {
for _, hf := range frame.Fields {
if err := d.processHeaderField(hf); err != nil {
return err
}
}
// If grpc status exists, no need to check further.
if d.rawStatusCode != nil || d.statusGen != nil {
return nil
}
// If grpc status doesn't exist and http status doesn't exist,
// then it's a malformed header.
if d.httpStatus == nil {
return streamErrorf(codes.Internal, "malformed header: doesn't contain status(gRPC or HTTP)")
}
if *(d.httpStatus) != http.StatusOK {
code, ok := httpStatusConvTab[*(d.httpStatus)]
if !ok {
code = codes.Unknown
}
return streamErrorf(code, http.StatusText(*(d.httpStatus)))
}
// gRPC status doesn't exist and http status is OK.
// Set rawStatusCode to be unknown and return nil error.
// So that, if the stream has ended this Unknown status
// will be propogated to the user.
// Otherwise, it will be ignored. In which case, status from
// a later trailer, that has StreamEnded flag set, is propogated.
code := int(codes.Unknown)
d.rawStatusCode = &code
return nil
}
func (d *decodeState) addMetadata(k, v string) {
if d.mdata == nil {
d.mdata = make(map[string][]string)
}
d.mdata[k] = append(d.mdata[k], v)
}
func (d *decodeState) processHeaderField(f hpack.HeaderField) error {
switch f.Name {
case "content-type":
if !validContentType(f.Value) {
return streamErrorf(codes.FailedPrecondition, "transport: received the unexpected content-type %q", f.Value)
}
case "grpc-encoding":
d.encoding = f.Value
case "grpc-status":
code, err := strconv.Atoi(f.Value)
if err != nil {
return streamErrorf(codes.Internal, "transport: malformed grpc-status: %v", err)
}
d.rawStatusCode = &code
case "grpc-message":
d.rawStatusMsg = decodeGrpcMessage(f.Value)
case "grpc-status-details-bin":
v, err := decodeBinHeader(f.Value)
if err != nil {
return streamErrorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
}
s := &spb.Status{}
if err := proto.Unmarshal(v, s); err != nil {
return streamErrorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
}
d.statusGen = status.FromProto(s)
case "grpc-timeout":
d.timeoutSet = true
var err error
if d.timeout, err = decodeTimeout(f.Value); err != nil {
return streamErrorf(codes.Internal, "transport: malformed time-out: %v", err)
}
case ":path":
d.method = f.Value
case ":status":
code, err := strconv.Atoi(f.Value)
if err != nil {
return streamErrorf(codes.Internal, "transport: malformed http-status: %v", err)
}
d.httpStatus = &code
case "grpc-tags-bin":
v, err := decodeBinHeader(f.Value)
if err != nil {
return streamErrorf(codes.Internal, "transport: malformed grpc-tags-bin: %v", err)
}
d.statsTags = v
d.addMetadata(f.Name, string(v))
case "grpc-trace-bin":
v, err := decodeBinHeader(f.Value)
if err != nil {
return streamErrorf(codes.Internal, "transport: malformed grpc-trace-bin: %v", err)
}
d.statsTrace = v
d.addMetadata(f.Name, string(v))
default:
if isReservedHeader(f.Name) && !isWhitelistedPseudoHeader(f.Name) {
break
}
v, err := decodeMetadataHeader(f.Name, f.Value)
if err != nil {
errorf("Failed to decode metadata header (%q, %q): %v", f.Name, f.Value, err)
return nil
}
d.addMetadata(f.Name, string(v))
}
return nil
}
type timeoutUnit uint8
const (
hour timeoutUnit = 'H'
minute timeoutUnit = 'M'
second timeoutUnit = 'S'
millisecond timeoutUnit = 'm'
microsecond timeoutUnit = 'u'
nanosecond timeoutUnit = 'n'
)
func timeoutUnitToDuration(u timeoutUnit) (d time.Duration, ok bool) {
switch u {
case hour:
return time.Hour, true
case minute:
return time.Minute, true
case second:
return time.Second, true
case millisecond:
return time.Millisecond, true
case microsecond:
return time.Microsecond, true
case nanosecond:
return time.Nanosecond, true
default:
}
return
}
const maxTimeoutValue int64 = 100000000 - 1
// div does integer division and round-up the result. Note that this is
// equivalent to (d+r-1)/r but has less chance to overflow.
func div(d, r time.Duration) int64 {
if m := d % r; m > 0 {
return int64(d/r + 1)
}
return int64(d / r)
}
// TODO(zhaoq): It is the simplistic and not bandwidth efficient. Improve it.
func encodeTimeout(t time.Duration) string {
if t <= 0 {
return "0n"
}
if d := div(t, time.Nanosecond); d <= maxTimeoutValue {
return strconv.FormatInt(d, 10) + "n"
}
if d := div(t, time.Microsecond); d <= maxTimeoutValue {
return strconv.FormatInt(d, 10) + "u"
}
if d := div(t, time.Millisecond); d <= maxTimeoutValue {
return strconv.FormatInt(d, 10) + "m"
}
if d := div(t, time.Second); d <= maxTimeoutValue {
return strconv.FormatInt(d, 10) + "S"
}
if d := div(t, time.Minute); d <= maxTimeoutValue {
return strconv.FormatInt(d, 10) + "M"
}
// Note that maxTimeoutValue * time.Hour > MaxInt64.
return strconv.FormatInt(div(t, time.Hour), 10) + "H"
}
func decodeTimeout(s string) (time.Duration, error) {
size := len(s)
if size < 2 {
return 0, fmt.Errorf("transport: timeout string is too short: %q", s)
}
unit := timeoutUnit(s[size-1])
d, ok := timeoutUnitToDuration(unit)
if !ok {
return 0, fmt.Errorf("transport: timeout unit is not recognized: %q", s)
}
t, err := strconv.ParseInt(s[:size-1], 10, 64)
if err != nil {
return 0, err
}
return d * time.Duration(t), nil
}
const (
spaceByte = ' '
tildaByte = '~'
percentByte = '%'
)
// encodeGrpcMessage is used to encode status code in header field
// "grpc-message".
// It checks to see if each individual byte in msg is an
// allowable byte, and then either percent encoding or passing it through.
// When percent encoding, the byte is converted into hexadecimal notation
// with a '%' prepended.
func encodeGrpcMessage(msg string) string {
if msg == "" {
return ""
}
lenMsg := len(msg)
for i := 0; i < lenMsg; i++ {
c := msg[i]
if !(c >= spaceByte && c < tildaByte && c != percentByte) {
return encodeGrpcMessageUnchecked(msg)
}
}
return msg
}
func encodeGrpcMessageUnchecked(msg string) string {
var buf bytes.Buffer
lenMsg := len(msg)
for i := 0; i < lenMsg; i++ {
c := msg[i]
if c >= spaceByte && c < tildaByte && c != percentByte {
buf.WriteByte(c)
} else {
buf.WriteString(fmt.Sprintf("%%%02X", c))
}
}
return buf.String()
}
// decodeGrpcMessage decodes the msg encoded by encodeGrpcMessage.
func decodeGrpcMessage(msg string) string {
if msg == "" {
return ""
}
lenMsg := len(msg)
for i := 0; i < lenMsg; i++ {
if msg[i] == percentByte && i+2 < lenMsg {
return decodeGrpcMessageUnchecked(msg)
}
}
return msg
}
func decodeGrpcMessageUnchecked(msg string) string {
var buf bytes.Buffer
lenMsg := len(msg)
for i := 0; i < lenMsg; i++ {
c := msg[i]
if c == percentByte && i+2 < lenMsg {
parsed, err := strconv.ParseUint(msg[i+1:i+3], 16, 8)
if err != nil {
buf.WriteByte(c)
} else {
buf.WriteByte(byte(parsed))
i += 2
}
} else {
buf.WriteByte(c)
}
}
return buf.String()
}
type framer struct {
numWriters int32
reader io.Reader
writer *bufio.Writer
fr *http2.Framer
}
func newFramer(conn net.Conn, writeBufferSize, readBufferSize int) *framer {
f := &framer{
reader: bufio.NewReaderSize(conn, readBufferSize),
writer: bufio.NewWriterSize(conn, writeBufferSize),
}
f.fr = http2.NewFramer(f.writer, f.reader)
// Opt-in to Frame reuse API on framer to reduce garbage.
// Frames aren't safe to read from after a subsequent call to ReadFrame.
f.fr.SetReuseFrames()
f.fr.ReadMetaHeaders = hpack.NewDecoder(http2InitHeaderTableSize, nil)
return f
}

50
vendor/google.golang.org/grpc/transport/log.go generated vendored Normal file
View File

@@ -0,0 +1,50 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// This file contains wrappers for grpclog functions.
// The transport package only logs to verbose level 2 by default.
package transport
import "google.golang.org/grpc/grpclog"
const logLevel = 2
func infof(format string, args ...interface{}) {
if grpclog.V(logLevel) {
grpclog.Infof(format, args...)
}
}
func warningf(format string, args ...interface{}) {
if grpclog.V(logLevel) {
grpclog.Warningf(format, args...)
}
}
func errorf(format string, args ...interface{}) {
if grpclog.V(logLevel) {
grpclog.Errorf(format, args...)
}
}
func fatalf(format string, args ...interface{}) {
if grpclog.V(logLevel) {
grpclog.Fatalf(format, args...)
}
}

777
vendor/google.golang.org/grpc/transport/transport.go generated vendored Normal file
View File

@@ -0,0 +1,777 @@
/*
*
* Copyright 2014 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package transport defines and implements message oriented communication
// channel to complete various transactions (e.g., an RPC).
package transport // import "google.golang.org/grpc/transport"
import (
stdctx "context"
"fmt"
"io"
"net"
"sync"
"time"
"golang.org/x/net/context"
"golang.org/x/net/http2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
"google.golang.org/grpc/tap"
)
// recvMsg represents the received msg from the transport. All transport
// protocol specific info has been removed.
type recvMsg struct {
data []byte
// nil: received some data
// io.EOF: stream is completed. data is nil.
// other non-nil error: transport failure. data is nil.
err error
}
// recvBuffer is an unbounded channel of recvMsg structs.
// Note recvBuffer differs from controlBuffer only in that recvBuffer
// holds a channel of only recvMsg structs instead of objects implementing "item" interface.
// recvBuffer is written to much more often than
// controlBuffer and using strict recvMsg structs helps avoid allocation in "recvBuffer.put"
type recvBuffer struct {
c chan recvMsg
mu sync.Mutex
backlog []recvMsg
}
func newRecvBuffer() *recvBuffer {
b := &recvBuffer{
c: make(chan recvMsg, 1),
}
return b
}
func (b *recvBuffer) put(r recvMsg) {
b.mu.Lock()
if len(b.backlog) == 0 {
select {
case b.c <- r:
b.mu.Unlock()
return
default:
}
}
b.backlog = append(b.backlog, r)
b.mu.Unlock()
}
func (b *recvBuffer) load() {
b.mu.Lock()
if len(b.backlog) > 0 {
select {
case b.c <- b.backlog[0]:
b.backlog[0] = recvMsg{}
b.backlog = b.backlog[1:]
default:
}
}
b.mu.Unlock()
}
// get returns the channel that receives a recvMsg in the buffer.
//
// Upon receipt of a recvMsg, the caller should call load to send another
// recvMsg onto the channel if there is any.
func (b *recvBuffer) get() <-chan recvMsg {
return b.c
}
// recvBufferReader implements io.Reader interface to read the data from
// recvBuffer.
type recvBufferReader struct {
ctx context.Context
goAway chan struct{}
recv *recvBuffer
last []byte // Stores the remaining data in the previous calls.
err error
}
// Read reads the next len(p) bytes from last. If last is drained, it tries to
// read additional data from recv. It blocks if there no additional data available
// in recv. If Read returns any non-nil error, it will continue to return that error.
func (r *recvBufferReader) Read(p []byte) (n int, err error) {
if r.err != nil {
return 0, r.err
}
n, r.err = r.read(p)
return n, r.err
}
func (r *recvBufferReader) read(p []byte) (n int, err error) {
if r.last != nil && len(r.last) > 0 {
// Read remaining data left in last call.
copied := copy(p, r.last)
r.last = r.last[copied:]
return copied, nil
}
select {
case <-r.ctx.Done():
return 0, ContextErr(r.ctx.Err())
case <-r.goAway:
return 0, ErrStreamDrain
case m := <-r.recv.get():
r.recv.load()
if m.err != nil {
return 0, m.err
}
copied := copy(p, m.data)
r.last = m.data[copied:]
return copied, nil
}
}
// All items in an out of a controlBuffer should be the same type.
type item interface {
item()
}
// controlBuffer is an unbounded channel of item.
type controlBuffer struct {
c chan item
mu sync.Mutex
backlog []item
}
func newControlBuffer() *controlBuffer {
b := &controlBuffer{
c: make(chan item, 1),
}
return b
}
func (b *controlBuffer) put(r item) {
b.mu.Lock()
if len(b.backlog) == 0 {
select {
case b.c <- r:
b.mu.Unlock()
return
default:
}
}
b.backlog = append(b.backlog, r)
b.mu.Unlock()
}
func (b *controlBuffer) load() {
b.mu.Lock()
if len(b.backlog) > 0 {
select {
case b.c <- b.backlog[0]:
b.backlog[0] = nil
b.backlog = b.backlog[1:]
default:
}
}
b.mu.Unlock()
}
// get returns the channel that receives an item in the buffer.
//
// Upon receipt of an item, the caller should call load to send another
// item onto the channel if there is any.
func (b *controlBuffer) get() <-chan item {
return b.c
}
type streamState uint8
const (
streamActive streamState = iota
streamWriteDone // EndStream sent
streamReadDone // EndStream received
streamDone // the entire stream is finished.
)
// Stream represents an RPC in the transport layer.
type Stream struct {
id uint32
// nil for client side Stream.
st ServerTransport
// ctx is the associated context of the stream.
ctx context.Context
// cancel is always nil for client side Stream.
cancel context.CancelFunc
// done is closed when the final status arrives.
done chan struct{}
// goAway is closed when the server sent GoAways signal before this stream was initiated.
goAway chan struct{}
// method records the associated RPC method of the stream.
method string
recvCompress string
sendCompress string
buf *recvBuffer
trReader io.Reader
fc *inFlow
recvQuota uint32
// TODO: Remote this unused variable.
// The accumulated inbound quota pending for window update.
updateQuota uint32
// Callback to state application's intentions to read data. This
// is used to adjust flow control, if need be.
requestRead func(int)
sendQuotaPool *quotaPool
localSendQuota *quotaPool
// Close headerChan to indicate the end of reception of header metadata.
headerChan chan struct{}
// header caches the received header metadata.
header metadata.MD
// The key-value map of trailer metadata.
trailer metadata.MD
mu sync.RWMutex // guard the following
// headerOK becomes true from the first header is about to send.
headerOk bool
state streamState
// true iff headerChan is closed. Used to avoid closing headerChan
// multiple times.
headerDone bool
// the status error received from the server.
status *status.Status
// rstStream indicates whether a RST_STREAM frame needs to be sent
// to the server to signify that this stream is closing.
rstStream bool
// rstError is the error that needs to be sent along with the RST_STREAM frame.
rstError http2.ErrCode
// bytesSent and bytesReceived indicates whether any bytes have been sent or
// received on this stream.
bytesSent bool
bytesReceived bool
}
// RecvCompress returns the compression algorithm applied to the inbound
// message. It is empty string if there is no compression applied.
func (s *Stream) RecvCompress() string {
return s.recvCompress
}
// SetSendCompress sets the compression algorithm to the stream.
func (s *Stream) SetSendCompress(str string) {
s.sendCompress = str
}
// Done returns a chanel which is closed when it receives the final status
// from the server.
func (s *Stream) Done() <-chan struct{} {
return s.done
}
// GoAway returns a channel which is closed when the server sent GoAways signal
// before this stream was initiated.
func (s *Stream) GoAway() <-chan struct{} {
return s.goAway
}
// Header acquires the key-value pairs of header metadata once it
// is available. It blocks until i) the metadata is ready or ii) there is no
// header metadata or iii) the stream is canceled/expired.
func (s *Stream) Header() (metadata.MD, error) {
var err error
select {
case <-s.ctx.Done():
err = ContextErr(s.ctx.Err())
case <-s.goAway:
err = ErrStreamDrain
case <-s.headerChan:
return s.header.Copy(), nil
}
// Even if the stream is closed, header is returned if available.
select {
case <-s.headerChan:
return s.header.Copy(), nil
default:
}
return nil, err
}
// Trailer returns the cached trailer metedata. Note that if it is not called
// after the entire stream is done, it could return an empty MD. Client
// side only.
func (s *Stream) Trailer() metadata.MD {
s.mu.RLock()
c := s.trailer.Copy()
s.mu.RUnlock()
return c
}
// ServerTransport returns the underlying ServerTransport for the stream.
// The client side stream always returns nil.
func (s *Stream) ServerTransport() ServerTransport {
return s.st
}
// Context returns the context of the stream.
func (s *Stream) Context() context.Context {
return s.ctx
}
// Method returns the method for the stream.
func (s *Stream) Method() string {
return s.method
}
// Status returns the status received from the server.
func (s *Stream) Status() *status.Status {
return s.status
}
// SetHeader sets the header metadata. This can be called multiple times.
// Server side only.
func (s *Stream) SetHeader(md metadata.MD) error {
s.mu.Lock()
if s.headerOk || s.state == streamDone {
s.mu.Unlock()
return ErrIllegalHeaderWrite
}
if md.Len() == 0 {
s.mu.Unlock()
return nil
}
s.header = metadata.Join(s.header, md)
s.mu.Unlock()
return nil
}
// SetTrailer sets the trailer metadata which will be sent with the RPC status
// by the server. This can be called multiple times. Server side only.
func (s *Stream) SetTrailer(md metadata.MD) error {
if md.Len() == 0 {
return nil
}
s.mu.Lock()
s.trailer = metadata.Join(s.trailer, md)
s.mu.Unlock()
return nil
}
func (s *Stream) write(m recvMsg) {
s.buf.put(m)
}
// Read reads all p bytes from the wire for this stream.
func (s *Stream) Read(p []byte) (n int, err error) {
// Don't request a read if there was an error earlier
if er := s.trReader.(*transportReader).er; er != nil {
return 0, er
}
s.requestRead(len(p))
return io.ReadFull(s.trReader, p)
}
// tranportReader reads all the data available for this Stream from the transport and
// passes them into the decoder, which converts them into a gRPC message stream.
// The error is io.EOF when the stream is done or another non-nil error if
// the stream broke.
type transportReader struct {
reader io.Reader
// The handler to control the window update procedure for both this
// particular stream and the associated transport.
windowHandler func(int)
er error
}
func (t *transportReader) Read(p []byte) (n int, err error) {
n, err = t.reader.Read(p)
if err != nil {
t.er = err
return
}
t.windowHandler(n)
return
}
// finish sets the stream's state and status, and closes the done channel.
// s.mu must be held by the caller. st must always be non-nil.
func (s *Stream) finish(st *status.Status) {
s.status = st
s.state = streamDone
close(s.done)
}
// BytesSent indicates whether any bytes have been sent on this stream.
func (s *Stream) BytesSent() bool {
s.mu.Lock()
bs := s.bytesSent
s.mu.Unlock()
return bs
}
// BytesReceived indicates whether any bytes have been received on this stream.
func (s *Stream) BytesReceived() bool {
s.mu.Lock()
br := s.bytesReceived
s.mu.Unlock()
return br
}
// GoString is implemented by Stream so context.String() won't
// race when printing %#v.
func (s *Stream) GoString() string {
return fmt.Sprintf("<stream: %p, %v>", s, s.method)
}
// The key to save transport.Stream in the context.
type streamKey struct{}
// newContextWithStream creates a new context from ctx and attaches stream
// to it.
func newContextWithStream(ctx context.Context, stream *Stream) context.Context {
return context.WithValue(ctx, streamKey{}, stream)
}
// StreamFromContext returns the stream saved in ctx.
func StreamFromContext(ctx context.Context) (s *Stream, ok bool) {
s, ok = ctx.Value(streamKey{}).(*Stream)
return
}
// state of transport
type transportState int
const (
reachable transportState = iota
closing
draining
)
// ServerConfig consists of all the configurations to establish a server transport.
type ServerConfig struct {
MaxStreams uint32
AuthInfo credentials.AuthInfo
InTapHandle tap.ServerInHandle
StatsHandler stats.Handler
KeepaliveParams keepalive.ServerParameters
KeepalivePolicy keepalive.EnforcementPolicy
InitialWindowSize int32
InitialConnWindowSize int32
WriteBufferSize int
ReadBufferSize int
}
// NewServerTransport creates a ServerTransport with conn or non-nil error
// if it fails.
func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) {
return newHTTP2Server(conn, config)
}
// ConnectOptions covers all relevant options for communicating with the server.
type ConnectOptions struct {
// UserAgent is the application user agent.
UserAgent string
// Authority is the :authority pseudo-header to use. This field has no effect if
// TransportCredentials is set.
Authority string
// Dialer specifies how to dial a network address.
Dialer func(context.Context, string) (net.Conn, error)
// FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
FailOnNonTempDialError bool
// PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
PerRPCCredentials []credentials.PerRPCCredentials
// TransportCredentials stores the Authenticator required to setup a client connection.
TransportCredentials credentials.TransportCredentials
// KeepaliveParams stores the keepalive parameters.
KeepaliveParams keepalive.ClientParameters
// StatsHandler stores the handler for stats.
StatsHandler stats.Handler
// InitialWindowSize sets the initial window size for a stream.
InitialWindowSize int32
// InitialConnWindowSize sets the initial window size for a connection.
InitialConnWindowSize int32
// WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
WriteBufferSize int
// ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
ReadBufferSize int
}
// TargetInfo contains the information of the target such as network address and metadata.
type TargetInfo struct {
Addr string
Metadata interface{}
}
// NewClientTransport establishes the transport with the required ConnectOptions
// and returns it to the caller.
func NewClientTransport(ctx context.Context, target TargetInfo, opts ConnectOptions, timeout time.Duration) (ClientTransport, error) {
return newHTTP2Client(ctx, target, opts, timeout)
}
// Options provides additional hints and information for message
// transmission.
type Options struct {
// Last indicates whether this write is the last piece for
// this stream.
Last bool
// Delay is a hint to the transport implementation for whether
// the data could be buffered for a batching write. The
// transport implementation may ignore the hint.
Delay bool
}
// CallHdr carries the information of a particular RPC.
type CallHdr struct {
// Host specifies the peer's host.
Host string
// Method specifies the operation to perform.
Method string
// RecvCompress specifies the compression algorithm applied on
// inbound messages.
RecvCompress string
// SendCompress specifies the compression algorithm applied on
// outbound message.
SendCompress string
// Creds specifies credentials.PerRPCCredentials for a call.
Creds credentials.PerRPCCredentials
// Flush indicates whether a new stream command should be sent
// to the peer without waiting for the first data. This is
// only a hint.
// If it's true, the transport may modify the flush decision
// for performance purposes.
// If it's false, new stream will never be flushed.
Flush bool
}
// ClientTransport is the common interface for all gRPC client-side transport
// implementations.
type ClientTransport interface {
// Close tears down this transport. Once it returns, the transport
// should not be accessed any more. The caller must make sure this
// is called only once.
Close() error
// GracefulClose starts to tear down the transport. It stops accepting
// new RPCs and wait the completion of the pending RPCs.
GracefulClose() error
// Write sends the data for the given stream. A nil stream indicates
// the write is to be performed on the transport as a whole.
Write(s *Stream, hdr []byte, data []byte, opts *Options) error
// NewStream creates a Stream for an RPC.
NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
// CloseStream clears the footprint of a stream when the stream is
// not needed any more. The err indicates the error incurred when
// CloseStream is called. Must be called when a stream is finished
// unless the associated transport is closing.
CloseStream(stream *Stream, err error)
// Error returns a channel that is closed when some I/O error
// happens. Typically the caller should have a goroutine to monitor
// this in order to take action (e.g., close the current transport
// and create a new one) in error case. It should not return nil
// once the transport is initiated.
Error() <-chan struct{}
// GoAway returns a channel that is closed when ClientTransport
// receives the draining signal from the server (e.g., GOAWAY frame in
// HTTP/2).
GoAway() <-chan struct{}
// GetGoAwayReason returns the reason why GoAway frame was received.
GetGoAwayReason() GoAwayReason
}
// ServerTransport is the common interface for all gRPC server-side transport
// implementations.
//
// Methods may be called concurrently from multiple goroutines, but
// Write methods for a given Stream will be called serially.
type ServerTransport interface {
// HandleStreams receives incoming streams using the given handler.
HandleStreams(func(*Stream), func(context.Context, string) context.Context)
// WriteHeader sends the header metadata for the given stream.
// WriteHeader may not be called on all streams.
WriteHeader(s *Stream, md metadata.MD) error
// Write sends the data for the given stream.
// Write may not be called on all streams.
Write(s *Stream, hdr []byte, data []byte, opts *Options) error
// WriteStatus sends the status of a stream to the client. WriteStatus is
// the final call made on a stream and always occurs.
WriteStatus(s *Stream, st *status.Status) error
// Close tears down the transport. Once it is called, the transport
// should not be accessed any more. All the pending streams and their
// handlers will be terminated asynchronously.
Close() error
// RemoteAddr returns the remote network address.
RemoteAddr() net.Addr
// Drain notifies the client this ServerTransport stops accepting new RPCs.
Drain()
}
// streamErrorf creates an StreamError with the specified error code and description.
func streamErrorf(c codes.Code, format string, a ...interface{}) StreamError {
return StreamError{
Code: c,
Desc: fmt.Sprintf(format, a...),
}
}
// connectionErrorf creates an ConnectionError with the specified error description.
func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError {
return ConnectionError{
Desc: fmt.Sprintf(format, a...),
temp: temp,
err: e,
}
}
// ConnectionError is an error that results in the termination of the
// entire connection and the retry of all the active streams.
type ConnectionError struct {
Desc string
temp bool
err error
}
func (e ConnectionError) Error() string {
return fmt.Sprintf("connection error: desc = %q", e.Desc)
}
// Temporary indicates if this connection error is temporary or fatal.
func (e ConnectionError) Temporary() bool {
return e.temp
}
// Origin returns the original error of this connection error.
func (e ConnectionError) Origin() error {
// Never return nil error here.
// If the original error is nil, return itself.
if e.err == nil {
return e
}
return e.err
}
var (
// ErrConnClosing indicates that the transport is closing.
ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
// ErrStreamDrain indicates that the stream is rejected by the server because
// the server stops accepting new RPCs.
ErrStreamDrain = streamErrorf(codes.Unavailable, "the server stops accepting new RPCs")
)
// TODO: See if we can replace StreamError with status package errors.
// StreamError is an error that only affects one stream within a connection.
type StreamError struct {
Code codes.Code
Desc string
}
func (e StreamError) Error() string {
return fmt.Sprintf("stream error: code = %s desc = %q", e.Code, e.Desc)
}
// wait blocks until it can receive from one of the provided contexts or channels
func wait(ctx, tctx context.Context, done, goAway <-chan struct{}, proceed <-chan int) (int, error) {
select {
case <-ctx.Done():
return 0, ContextErr(ctx.Err())
case <-done:
return 0, io.EOF
case <-goAway:
return 0, ErrStreamDrain
case <-tctx.Done():
return 0, ErrConnClosing
case i := <-proceed:
return i, nil
}
}
// ContextErr converts the error from context package into a StreamError.
func ContextErr(err error) StreamError {
switch err {
case context.DeadlineExceeded, stdctx.DeadlineExceeded:
return streamErrorf(codes.DeadlineExceeded, "%v", err)
case context.Canceled, stdctx.Canceled:
return streamErrorf(codes.Canceled, "%v", err)
}
return streamErrorf(codes.Internal, "Unexpected error from context packet: %v", err)
}
// GoAwayReason contains the reason for the GoAway frame received.
type GoAwayReason uint8
const (
// Invalid indicates that no GoAway frame is received.
Invalid GoAwayReason = 0
// NoReason is the default value when GoAway frame is received.
NoReason GoAwayReason = 1
// TooManyPings indicates that a GoAway frame with ErrCodeEnhanceYourCalm
// was received and that the debug data said "too_many_pings".
TooManyPings GoAwayReason = 2
)
// loopyWriter is run in a separate go routine. It is the single code path that will
// write data on wire.
func loopyWriter(ctx context.Context, cbuf *controlBuffer, handler func(item) error) {
for {
select {
case i := <-cbuf.get():
cbuf.load()
if err := handler(i); err != nil {
return
}
case <-ctx.Done():
return
}
hasData:
for {
select {
case i := <-cbuf.get():
cbuf.load()
if err := handler(i); err != nil {
return
}
case <-ctx.Done():
return
default:
if err := handler(&flushIO{}); err != nil {
return
}
break hasData
}
}
}
}