mirror of
https://github.com/taigrr/wasm-experiments
synced 2025-01-18 04:03:21 -08:00
Use custom for of grpc-go and unify client and server
This commit is contained in:
1039
vendor/google.golang.org/grpc/transport/client_transport.go
generated
vendored
Normal file
1039
vendor/google.golang.org/grpc/transport/client_transport.go
generated
vendored
Normal file
File diff suppressed because it is too large
Load Diff
266
vendor/google.golang.org/grpc/transport/client_transport_js.go
generated
vendored
Normal file
266
vendor/google.golang.org/grpc/transport/client_transport_js.go
generated
vendored
Normal file
@@ -0,0 +1,266 @@
|
||||
// +build js,wasm
|
||||
|
||||
/*
|
||||
*
|
||||
* Copyright 2018 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package transport
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/net/http2/hpack"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/peer"
|
||||
"google.golang.org/grpc/stats"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
|
||||
// and starts to receive messages on it. Non-nil error returns if construction
|
||||
// fails.
|
||||
func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onSuccess func()) (_ ClientTransport, err error) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer func() {
|
||||
if err != nil {
|
||||
cancel()
|
||||
}
|
||||
}()
|
||||
|
||||
t := &http2Client{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
md: addr.Metadata,
|
||||
readerDone: make(chan struct{}),
|
||||
writerDone: make(chan struct{}),
|
||||
goAway: make(chan struct{}),
|
||||
activeStreams: make(map[uint32]*Stream),
|
||||
creds: opts.PerRPCCredentials,
|
||||
statsHandler: opts.StatsHandler,
|
||||
initialWindowSize: initialWindowSize,
|
||||
nextID: 1,
|
||||
maxConcurrentStreams: defaultMaxStreamsClient,
|
||||
streamQuota: defaultMaxStreamsClient,
|
||||
streamsQuotaAvailable: make(chan struct{}, 1),
|
||||
}
|
||||
if opts.StatsHandler != nil {
|
||||
t.ctx = opts.StatsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
|
||||
RemoteAddr: t.remoteAddr,
|
||||
LocalAddr: t.localAddr,
|
||||
})
|
||||
connBegin := &stats.ConnBegin{
|
||||
Client: true,
|
||||
}
|
||||
t.statsHandler.HandleConn(t.ctx, connBegin)
|
||||
}
|
||||
defer onSuccess()
|
||||
return t, nil
|
||||
}
|
||||
|
||||
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) {
|
||||
ctx = peer.NewContext(ctx, t.getPeer())
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
s := &Stream{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
headerChan: make(chan struct{}),
|
||||
respChan: make(chan struct{}),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
|
||||
endpoint := callHdr.Method
|
||||
if callHdr.Host != "" {
|
||||
endpoint = callHdr.Host + "/" + endpoint
|
||||
}
|
||||
|
||||
req, err := http.NewRequest(
|
||||
"POST",
|
||||
endpoint,
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s.req = req.WithContext(s.ctx)
|
||||
|
||||
headerFields, err := t.createHeaderFields(s.ctx, callHdr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, hf := range headerFields {
|
||||
// Filter out HTTP2 metadata headers
|
||||
if strings.HasPrefix(hf.Name, ":") {
|
||||
continue
|
||||
}
|
||||
// Don't add the user-agent header
|
||||
if strings.ToLower(hf.Name) == "user-agent" {
|
||||
continue
|
||||
}
|
||||
|
||||
req.Header.Add(hf.Name, hf.Value)
|
||||
}
|
||||
|
||||
// Set gRPC-Web specific headers
|
||||
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md#protocol-differences-vs-grpc-over-http2
|
||||
req.Header.Add("X-Grpc-Web", "1")
|
||||
req.Header.Add("X-User-Agent", "grpc-web-javascript/0.1")
|
||||
|
||||
if t.statsHandler != nil {
|
||||
outHeader := &stats.OutHeader{
|
||||
Client: true,
|
||||
FullMethod: callHdr.Method,
|
||||
RemoteAddr: t.remoteAddr,
|
||||
LocalAddr: t.localAddr,
|
||||
Compression: callHdr.SendCompress,
|
||||
}
|
||||
t.statsHandler.HandleRPC(s.ctx, outHeader)
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
|
||||
if atomic.SwapUint32(&s.requestSent, 1) == 1 {
|
||||
return status.Error(codes.Unimplemented, "multiple Write's are not supported.")
|
||||
}
|
||||
|
||||
s.req.Body = ioutil.NopCloser(bytes.NewBuffer(append(hdr, data...)))
|
||||
|
||||
resp, err := http.DefaultClient.Do(s.req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = t.operateHeaders(s, resp.Header, &s.req.URL.Path, &resp.StatusCode)
|
||||
if err != nil {
|
||||
resp.Body.Close()
|
||||
s.writeErr = err
|
||||
close(s.respChan)
|
||||
return err
|
||||
}
|
||||
|
||||
s.body = resp.Body
|
||||
close(s.respChan)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *http2Client) Close() error {
|
||||
if t.statsHandler != nil {
|
||||
connEnd := &stats.ConnEnd{
|
||||
Client: true,
|
||||
}
|
||||
t.statsHandler.HandleConn(t.ctx, connEnd)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (t *http2Client) GracefulClose() error {
|
||||
return nil
|
||||
}
|
||||
func (t *http2Client) GetGoAwayReason() GoAwayReason {
|
||||
return 0
|
||||
}
|
||||
func (t *http2Client) CloseStream(stream *Stream, err error) {
|
||||
if stream.cancel != nil {
|
||||
stream.cancel()
|
||||
}
|
||||
select {
|
||||
case <-stream.respChan:
|
||||
if stream.body != nil {
|
||||
stream.body.Close()
|
||||
}
|
||||
default:
|
||||
}
|
||||
}
|
||||
func (t *http2Client) IncrMsgSent() {}
|
||||
func (t *http2Client) IncrMsgRecv() {}
|
||||
|
||||
func (d *decodeState) decodeResponseHeader(headers http.Header, path *string, status *int) error {
|
||||
for key, values := range headers {
|
||||
for _, value := range values {
|
||||
if err := d.processHeaderField(hpack.HeaderField{Name: strings.ToLower(key), Value: value}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
if path != nil {
|
||||
if err := d.processHeaderField(hpack.HeaderField{Name: ":path", Value: *path}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if status != nil {
|
||||
if err := d.processHeaderField(hpack.HeaderField{Name: ":status", Value: strconv.Itoa(*status)}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return d.validate()
|
||||
}
|
||||
|
||||
// operateHeaders takes action on the decoded headers.
|
||||
func (t *http2Client) operateHeaders(s *Stream, headers http.Header, path *string, status *int) error {
|
||||
atomic.StoreUint32(&s.bytesReceived, 1)
|
||||
var state decodeState
|
||||
if err := state.decodeResponseHeader(headers, path, status); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if t != nil && t.statsHandler != nil {
|
||||
inHeader := &stats.InHeader{
|
||||
Client: true,
|
||||
}
|
||||
t.statsHandler.HandleRPC(s.ctx, inHeader)
|
||||
}
|
||||
}()
|
||||
// If headers haven't been received yet.
|
||||
if atomic.SwapUint32(&s.headerDone, 1) == 0 {
|
||||
// These values can be set without any synchronization because
|
||||
// stream goroutine will read it only after seeing a closed
|
||||
// headerChan which we'll close after setting this.
|
||||
s.recvCompress = state.encoding
|
||||
if len(state.mdata) > 0 {
|
||||
s.header = state.mdata
|
||||
}
|
||||
close(s.headerChan)
|
||||
}
|
||||
if state.rawStatusCode == nil || state.rawStatusMsg == "" || state.statusGen == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if s.swapState(streamDone) == streamDone {
|
||||
// If it was already done, return.
|
||||
return nil
|
||||
}
|
||||
s.status = state.status()
|
||||
if len(state.mdata) > 0 {
|
||||
s.trailer = state.mdata
|
||||
}
|
||||
close(s.done)
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetTrailers sets the trailers on the stream. This is unique to the WASM client
|
||||
// as trailers are not read by the stream but by the user of the stream.
|
||||
func (s *Stream) SetTrailers(trailers http.Header) error {
|
||||
return (*http2Client)(nil).operateHeaders(s, trailers, nil, nil)
|
||||
}
|
||||
64
vendor/google.golang.org/grpc/transport/content_type.go
generated
vendored
Normal file
64
vendor/google.golang.org/grpc/transport/content_type.go
generated
vendored
Normal file
@@ -0,0 +1,64 @@
|
||||
// +build !js !wasm
|
||||
|
||||
/*
|
||||
*
|
||||
* Copyright 2018 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package transport
|
||||
|
||||
import "strings"
|
||||
|
||||
const (
|
||||
// baseContentType is the base content-type for gRPC. This is a valid
|
||||
// content-type on it's own, but can also include a content-subtype such as
|
||||
// "proto" as a suffix after "+" or ";". See
|
||||
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
|
||||
// for more details.
|
||||
baseContentType = "application/grpc"
|
||||
)
|
||||
|
||||
// contentSubtype returns the content-subtype for the given content-type. The
|
||||
// given content-type must be a valid content-type that starts with
|
||||
// "application/grpc". A content-subtype will follow "application/grpc" after a
|
||||
// "+" or ";". See
|
||||
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
|
||||
// more details.
|
||||
//
|
||||
// If contentType is not a valid content-type for gRPC, the boolean
|
||||
// will be false, otherwise true. If content-type == "application/grpc",
|
||||
// "application/grpc+", or "application/grpc;", the boolean will be true,
|
||||
// but no content-subtype will be returned.
|
||||
//
|
||||
// contentType is assumed to be lowercase already.
|
||||
func contentSubtype(contentType string) (string, bool) {
|
||||
if contentType == baseContentType {
|
||||
return "", true
|
||||
}
|
||||
if !strings.HasPrefix(contentType, baseContentType) {
|
||||
return "", false
|
||||
}
|
||||
// guaranteed since != baseContentType and has baseContentType prefix
|
||||
switch contentType[len(baseContentType)] {
|
||||
case '+', ';':
|
||||
// this will return true for "application/grpc+" or "application/grpc;"
|
||||
// which the previous validContentType function tested to be valid, so we
|
||||
// just say that no content-subtype is specified in this case
|
||||
return contentType[len(baseContentType)+1:], true
|
||||
default:
|
||||
return "", false
|
||||
}
|
||||
}
|
||||
68
vendor/google.golang.org/grpc/transport/content_type_js.go
generated
vendored
Normal file
68
vendor/google.golang.org/grpc/transport/content_type_js.go
generated
vendored
Normal file
@@ -0,0 +1,68 @@
|
||||
// +build js,wasm
|
||||
|
||||
/*
|
||||
*
|
||||
* Copyright 2018 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package transport
|
||||
|
||||
import "strings"
|
||||
|
||||
const (
|
||||
// baseContentType is the base content-type for gRPC-web. This is a valid
|
||||
// content-type on it's own, but can also include a content-subtype such as
|
||||
// "proto" as a suffix after "+" or ";". See
|
||||
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md#protocol-differences-vs-grpc-over-http2
|
||||
// for more details.
|
||||
baseContentType = "application/grpc-web"
|
||||
)
|
||||
|
||||
// contentSubtype returns the content-subtype for the given content-type. The
|
||||
// given content-type must be a valid content-type that starts with
|
||||
// "application/grpc". A content-subtype will follow "application/grpc" after a
|
||||
// "+" or ";". See
|
||||
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md#protocol-differences-vs-grpc-over-http2
|
||||
// for more details.
|
||||
//
|
||||
// If contentType is not a valid content-type for gRPC, the boolean
|
||||
// will be false, otherwise true. If content-type == "application/grpc",
|
||||
// "application/grpc+", or "application/grpc;", the boolean will be true,
|
||||
// but no content-subtype will be returned.
|
||||
//
|
||||
// contentType is assumed to be lowercase already.
|
||||
func contentSubtype(contentType string) (string, bool) {
|
||||
switch contentType {
|
||||
// We allow application/octet-stream because this
|
||||
// is the content type returned by the browser
|
||||
// when reading a streaming response from a Fetch API request.
|
||||
case baseContentType, "application/octet-stream":
|
||||
return "", true
|
||||
}
|
||||
if !strings.HasPrefix(contentType, baseContentType) {
|
||||
return "", false
|
||||
}
|
||||
// guaranteed since != baseContentType and has baseContentType prefix
|
||||
switch contentType[len(baseContentType)] {
|
||||
case '+', ';':
|
||||
// this will return true for "application/grpc+" or "application/grpc;"
|
||||
// which the previous validContentType function tested to be valid, so we
|
||||
// just say that no content-subtype is specified in this case
|
||||
return contentType[len(baseContentType)+1:], true
|
||||
default:
|
||||
return "", false
|
||||
}
|
||||
}
|
||||
101
vendor/google.golang.org/grpc/transport/controlbuf.go
generated
vendored
101
vendor/google.golang.org/grpc/transport/controlbuf.go
generated
vendored
@@ -28,6 +28,10 @@ import (
|
||||
"golang.org/x/net/http2/hpack"
|
||||
)
|
||||
|
||||
var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
|
||||
e.SetMaxDynamicTableSizeLimit(v)
|
||||
}
|
||||
|
||||
type itemNode struct {
|
||||
it interface{}
|
||||
next *itemNode
|
||||
@@ -80,6 +84,13 @@ func (il *itemList) isEmpty() bool {
|
||||
// the control buffer of transport. They represent different aspects of
|
||||
// control tasks, e.g., flow control, settings, streaming resetting, etc.
|
||||
|
||||
// registerStream is used to register an incoming stream with loopy writer.
|
||||
type registerStream struct {
|
||||
streamID uint32
|
||||
wq *writeQuota
|
||||
}
|
||||
|
||||
// headerFrame is also used to register stream on the client-side.
|
||||
type headerFrame struct {
|
||||
streamID uint32
|
||||
hf []hpack.HeaderField
|
||||
@@ -361,44 +372,47 @@ func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimato
|
||||
const minBatchSize = 1000
|
||||
|
||||
// run should be run in a separate goroutine.
|
||||
func (l *loopyWriter) run() {
|
||||
var (
|
||||
it interface{}
|
||||
err error
|
||||
isEmpty bool
|
||||
)
|
||||
func (l *loopyWriter) run() (err error) {
|
||||
defer func() {
|
||||
errorf("transport: loopyWriter.run returning. Err: %v", err)
|
||||
if err == ErrConnClosing {
|
||||
// Don't log ErrConnClosing as error since it happens
|
||||
// 1. When the connection is closed by some other known issue.
|
||||
// 2. User closed the connection.
|
||||
// 3. A graceful close of connection.
|
||||
infof("transport: loopyWriter.run returning. %v", err)
|
||||
err = nil
|
||||
}
|
||||
}()
|
||||
for {
|
||||
it, err = l.cbuf.get(true)
|
||||
it, err := l.cbuf.get(true)
|
||||
if err != nil {
|
||||
return
|
||||
return err
|
||||
}
|
||||
if err = l.handle(it); err != nil {
|
||||
return
|
||||
return err
|
||||
}
|
||||
if _, err = l.processData(); err != nil {
|
||||
return
|
||||
return err
|
||||
}
|
||||
gosched := true
|
||||
hasdata:
|
||||
for {
|
||||
it, err = l.cbuf.get(false)
|
||||
it, err := l.cbuf.get(false)
|
||||
if err != nil {
|
||||
return
|
||||
return err
|
||||
}
|
||||
if it != nil {
|
||||
if err = l.handle(it); err != nil {
|
||||
return
|
||||
return err
|
||||
}
|
||||
if _, err = l.processData(); err != nil {
|
||||
return
|
||||
return err
|
||||
}
|
||||
continue hasdata
|
||||
}
|
||||
if isEmpty, err = l.processData(); err != nil {
|
||||
return
|
||||
isEmpty, err := l.processData()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !isEmpty {
|
||||
continue hasdata
|
||||
@@ -450,30 +464,39 @@ func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
|
||||
return l.framer.fr.WriteSettingsAck()
|
||||
}
|
||||
|
||||
func (l *loopyWriter) registerStreamHandler(h *registerStream) error {
|
||||
str := &outStream{
|
||||
id: h.streamID,
|
||||
state: empty,
|
||||
itl: &itemList{},
|
||||
wq: h.wq,
|
||||
}
|
||||
l.estdStreams[h.streamID] = str
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *loopyWriter) headerHandler(h *headerFrame) error {
|
||||
if l.side == serverSide {
|
||||
if h.endStream { // Case 1.A: Server wants to close stream.
|
||||
// Make sure it's not a trailers only response.
|
||||
if str, ok := l.estdStreams[h.streamID]; ok {
|
||||
if str.state != empty { // either active or waiting on stream quota.
|
||||
// add it str's list of items.
|
||||
str.itl.enqueue(h)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil {
|
||||
return err
|
||||
}
|
||||
return l.cleanupStreamHandler(h.cleanup)
|
||||
str, ok := l.estdStreams[h.streamID]
|
||||
if !ok {
|
||||
warningf("transport: loopy doesn't recognize the stream: %d", h.streamID)
|
||||
return nil
|
||||
}
|
||||
// Case 1.B: Server is responding back with headers.
|
||||
str := &outStream{
|
||||
state: empty,
|
||||
itl: &itemList{},
|
||||
wq: h.wq,
|
||||
// Case 1.A: Server is responding back with headers.
|
||||
if !h.endStream {
|
||||
return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite)
|
||||
}
|
||||
l.estdStreams[h.streamID] = str
|
||||
return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite)
|
||||
// else: Case 1.B: Server wants to close stream.
|
||||
|
||||
if str.state != empty { // either active or waiting on stream quota.
|
||||
// add it str's list of items.
|
||||
str.itl.enqueue(h)
|
||||
return nil
|
||||
}
|
||||
if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil {
|
||||
return err
|
||||
}
|
||||
return l.cleanupStreamHandler(h.cleanup)
|
||||
}
|
||||
// Case 2: Client wants to originate stream.
|
||||
str := &outStream{
|
||||
@@ -632,6 +655,8 @@ func (l *loopyWriter) handle(i interface{}) error {
|
||||
return l.outgoingSettingsHandler(i)
|
||||
case *headerFrame:
|
||||
return l.headerHandler(i)
|
||||
case *registerStream:
|
||||
return l.registerStreamHandler(i)
|
||||
case *cleanupStream:
|
||||
return l.cleanupStreamHandler(i)
|
||||
case *incomingGoAway:
|
||||
@@ -664,6 +689,8 @@ func (l *loopyWriter) applySettings(ss []http2.Setting) error {
|
||||
}
|
||||
}
|
||||
}
|
||||
case http2.SettingHeaderTableSize:
|
||||
updateHeaderTblSize(l.hEnc, s.Val)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
10
vendor/google.golang.org/grpc/transport/flowcontrol.go
generated
vendored
10
vendor/google.golang.org/grpc/transport/flowcontrol.go
generated
vendored
@@ -58,14 +58,20 @@ type writeQuota struct {
|
||||
ch chan struct{}
|
||||
// done is triggered in error case.
|
||||
done <-chan struct{}
|
||||
// replenish is called by loopyWriter to give quota back to.
|
||||
// It is implemented as a field so that it can be updated
|
||||
// by tests.
|
||||
replenish func(n int)
|
||||
}
|
||||
|
||||
func newWriteQuota(sz int32, done <-chan struct{}) *writeQuota {
|
||||
return &writeQuota{
|
||||
w := &writeQuota{
|
||||
quota: sz,
|
||||
ch: make(chan struct{}, 1),
|
||||
done: done,
|
||||
}
|
||||
w.replenish = w.realReplenish
|
||||
return w
|
||||
}
|
||||
|
||||
func (w *writeQuota) get(sz int32) error {
|
||||
@@ -83,7 +89,7 @@ func (w *writeQuota) get(sz int32) error {
|
||||
}
|
||||
}
|
||||
|
||||
func (w *writeQuota) replenish(n int) {
|
||||
func (w *writeQuota) realReplenish(n int) {
|
||||
sz := int32(n)
|
||||
a := atomic.AddInt32(&w.quota, sz)
|
||||
b := a - sz
|
||||
|
||||
989
vendor/google.golang.org/grpc/transport/http2_client.go
generated
vendored
989
vendor/google.golang.org/grpc/transport/http2_client.go
generated
vendored
File diff suppressed because it is too large
Load Diff
118
vendor/google.golang.org/grpc/transport/http2_server.go
generated
vendored
118
vendor/google.golang.org/grpc/transport/http2_server.go
generated
vendored
@@ -24,7 +24,6 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"math/rand"
|
||||
"net"
|
||||
"strconv"
|
||||
"sync"
|
||||
@@ -36,9 +35,11 @@ import (
|
||||
"golang.org/x/net/http2"
|
||||
"golang.org/x/net/http2/hpack"
|
||||
|
||||
"google.golang.org/grpc/channelz"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/internal/channelz"
|
||||
"google.golang.org/grpc/internal/grpcrand"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/peer"
|
||||
@@ -273,7 +274,9 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
|
||||
go func() {
|
||||
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
|
||||
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
|
||||
t.loopy.run()
|
||||
if err := t.loopy.run(); err != nil {
|
||||
errorf("transport: loopyWriter.run returning. Err: %v", err)
|
||||
}
|
||||
t.conn.Close()
|
||||
close(t.writerDone)
|
||||
}()
|
||||
@@ -413,6 +416,11 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
t.updateWindow(s, uint32(n))
|
||||
},
|
||||
}
|
||||
// Register the stream with loopy.
|
||||
t.controlBuf.put(®isterStream{
|
||||
streamID: s.id,
|
||||
wq: s.wq,
|
||||
})
|
||||
handle(s)
|
||||
return
|
||||
}
|
||||
@@ -682,28 +690,7 @@ func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
|
||||
})
|
||||
}
|
||||
|
||||
// WriteHeader sends the header metedata md back to the client.
|
||||
func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
|
||||
if s.headerOk || s.getState() == streamDone {
|
||||
return ErrIllegalHeaderWrite
|
||||
}
|
||||
s.headerOk = true
|
||||
if md.Len() > 0 {
|
||||
if s.header.Len() > 0 {
|
||||
s.header = metadata.Join(s.header, md)
|
||||
} else {
|
||||
s.header = md
|
||||
}
|
||||
}
|
||||
md = s.header
|
||||
// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
|
||||
// first and create a slice of that exact size.
|
||||
headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
|
||||
if s.sendCompress != "" {
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
|
||||
}
|
||||
func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
|
||||
for k, vv := range md {
|
||||
if isReservedHeader(k) {
|
||||
// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
|
||||
@@ -713,6 +700,37 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
|
||||
}
|
||||
}
|
||||
return headerFields
|
||||
}
|
||||
|
||||
// WriteHeader sends the header metedata md back to the client.
|
||||
func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
|
||||
if s.updateHeaderSent() || s.getState() == streamDone {
|
||||
return ErrIllegalHeaderWrite
|
||||
}
|
||||
s.hdrMu.Lock()
|
||||
if md.Len() > 0 {
|
||||
if s.header.Len() > 0 {
|
||||
s.header = metadata.Join(s.header, md)
|
||||
} else {
|
||||
s.header = md
|
||||
}
|
||||
}
|
||||
t.writeHeaderLocked(s)
|
||||
s.hdrMu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *http2Server) writeHeaderLocked(s *Stream) {
|
||||
// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
|
||||
// first and create a slice of that exact size.
|
||||
headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
|
||||
if s.sendCompress != "" {
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
|
||||
}
|
||||
headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
|
||||
t.controlBuf.put(&headerFrame{
|
||||
streamID: s.id,
|
||||
hf: headerFields,
|
||||
@@ -720,7 +738,6 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
|
||||
onWrite: func() {
|
||||
atomic.StoreUint32(&t.resetPingStrikes, 1)
|
||||
},
|
||||
wq: s.wq,
|
||||
})
|
||||
if t.stats != nil {
|
||||
// Note: WireLength is not set in outHeader.
|
||||
@@ -728,7 +745,6 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
|
||||
outHeader := &stats.OutHeader{}
|
||||
t.stats.HandleRPC(s.Context(), outHeader)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// WriteStatus sends stream status to the client and terminates the stream.
|
||||
@@ -736,21 +752,20 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
|
||||
// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
|
||||
// OK is adopted.
|
||||
func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
|
||||
if !s.headerOk && s.header.Len() > 0 {
|
||||
if err := t.WriteHeader(s, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if s.getState() == streamDone {
|
||||
return nil
|
||||
}
|
||||
if s.getState() == streamDone {
|
||||
return nil
|
||||
}
|
||||
s.hdrMu.Lock()
|
||||
// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
|
||||
// first and create a slice of that exact size.
|
||||
headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
|
||||
if !s.headerOk {
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
|
||||
if !s.updateHeaderSent() { // No headers have been sent.
|
||||
if len(s.header) > 0 { // Send a separate header frame.
|
||||
t.writeHeaderLocked(s)
|
||||
} else { // Send a trailer only response.
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
|
||||
}
|
||||
}
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
|
||||
@@ -759,23 +774,15 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
|
||||
stBytes, err := proto.Marshal(p)
|
||||
if err != nil {
|
||||
// TODO: return error instead, when callers are able to handle it.
|
||||
panic(err)
|
||||
grpclog.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
|
||||
} else {
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
|
||||
}
|
||||
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
|
||||
}
|
||||
|
||||
// Attach the trailer metadata.
|
||||
for k, vv := range s.trailer {
|
||||
// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
|
||||
if isReservedHeader(k) {
|
||||
continue
|
||||
}
|
||||
for _, v := range vv {
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
|
||||
}
|
||||
}
|
||||
trailer := &headerFrame{
|
||||
headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
|
||||
trailingHeader := &headerFrame{
|
||||
streamID: s.id,
|
||||
hf: headerFields,
|
||||
endStream: true,
|
||||
@@ -783,7 +790,8 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
|
||||
atomic.StoreUint32(&t.resetPingStrikes, 1)
|
||||
},
|
||||
}
|
||||
t.closeStream(s, false, 0, trailer, true)
|
||||
s.hdrMu.Unlock()
|
||||
t.closeStream(s, false, 0, trailingHeader, true)
|
||||
if t.stats != nil {
|
||||
t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
|
||||
}
|
||||
@@ -793,7 +801,7 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
|
||||
// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
|
||||
// is returns if it fails (e.g., framing error, transport error).
|
||||
func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
|
||||
if !s.headerOk { // Headers haven't been written yet.
|
||||
if !s.isHeaderSent() { // Headers haven't been written yet.
|
||||
if err := t.WriteHeader(s, nil); err != nil {
|
||||
// TODO(mmukhi, dfawley): Make sure this is the right code to return.
|
||||
return streamErrorf(codes.Internal, "transport: %v", err)
|
||||
@@ -1123,14 +1131,12 @@ func (t *http2Server) getOutFlowWindow() int64 {
|
||||
}
|
||||
}
|
||||
|
||||
var rgen = rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
|
||||
func getJitter(v time.Duration) time.Duration {
|
||||
if v == infinity {
|
||||
return 0
|
||||
}
|
||||
// Generate a jitter between +/- 10% of the value.
|
||||
r := int64(v / 10)
|
||||
j := rgen.Int63n(2*r) - r
|
||||
j := grpcrand.Int63n(2*r) - r
|
||||
return time.Duration(j)
|
||||
}
|
||||
|
||||
100
vendor/google.golang.org/grpc/transport/http_util.go
generated
vendored
100
vendor/google.golang.org/grpc/transport/http_util.go
generated
vendored
@@ -28,6 +28,7 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
"unicode/utf8"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"golang.org/x/net/http2"
|
||||
@@ -45,12 +46,6 @@ const (
|
||||
// http2IOBufSize specifies the buffer size for sending frames.
|
||||
defaultWriteBufSize = 32 * 1024
|
||||
defaultReadBufSize = 32 * 1024
|
||||
// baseContentType is the base content-type for gRPC. This is a valid
|
||||
// content-type on it's own, but can also include a content-subtype such as
|
||||
// "proto" as a suffix after "+" or ";". See
|
||||
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
|
||||
// for more details.
|
||||
baseContentType = "application/grpc"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -156,38 +151,6 @@ func isWhitelistedHeader(hdr string) bool {
|
||||
}
|
||||
}
|
||||
|
||||
// contentSubtype returns the content-subtype for the given content-type. The
|
||||
// given content-type must be a valid content-type that starts with
|
||||
// "application/grpc". A content-subtype will follow "application/grpc" after a
|
||||
// "+" or ";". See
|
||||
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
|
||||
// more details.
|
||||
//
|
||||
// If contentType is not a valid content-type for gRPC, the boolean
|
||||
// will be false, otherwise true. If content-type == "application/grpc",
|
||||
// "application/grpc+", or "application/grpc;", the boolean will be true,
|
||||
// but no content-subtype will be returned.
|
||||
//
|
||||
// contentType is assumed to be lowercase already.
|
||||
func contentSubtype(contentType string) (string, bool) {
|
||||
if contentType == baseContentType {
|
||||
return "", true
|
||||
}
|
||||
if !strings.HasPrefix(contentType, baseContentType) {
|
||||
return "", false
|
||||
}
|
||||
// guaranteed since != baseContentType and has baseContentType prefix
|
||||
switch contentType[len(baseContentType)] {
|
||||
case '+', ';':
|
||||
// this will return true for "application/grpc+" or "application/grpc;"
|
||||
// which the previous validContentType function tested to be valid, so we
|
||||
// just say that no content-subtype is specified in this case
|
||||
return contentType[len(baseContentType)+1:], true
|
||||
default:
|
||||
return "", false
|
||||
}
|
||||
}
|
||||
|
||||
// contentSubtype is assumed to be lowercase
|
||||
func contentType(contentSubtype string) string {
|
||||
if contentSubtype == "" {
|
||||
@@ -233,13 +196,7 @@ func decodeMetadataHeader(k, v string) (string, error) {
|
||||
return v, nil
|
||||
}
|
||||
|
||||
func (d *decodeState) decodeResponseHeader(frame *http2.MetaHeadersFrame) error {
|
||||
for _, hf := range frame.Fields {
|
||||
if err := d.processHeaderField(hf); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func (d *decodeState) validate() error {
|
||||
// If grpc status exists, no need to check further.
|
||||
if d.rawStatusCode != nil || d.statusGen != nil {
|
||||
return nil
|
||||
@@ -437,16 +394,17 @@ func decodeTimeout(s string) (time.Duration, error) {
|
||||
|
||||
const (
|
||||
spaceByte = ' '
|
||||
tildaByte = '~'
|
||||
tildeByte = '~'
|
||||
percentByte = '%'
|
||||
)
|
||||
|
||||
// encodeGrpcMessage is used to encode status code in header field
|
||||
// "grpc-message".
|
||||
// It checks to see if each individual byte in msg is an
|
||||
// allowable byte, and then either percent encoding or passing it through.
|
||||
// When percent encoding, the byte is converted into hexadecimal notation
|
||||
// with a '%' prepended.
|
||||
// "grpc-message". It does percent encoding and also replaces invalid utf-8
|
||||
// characters with Unicode replacement character.
|
||||
//
|
||||
// It checks to see if each individual byte in msg is an allowable byte, and
|
||||
// then either percent encoding or passing it through. When percent encoding,
|
||||
// the byte is converted into hexadecimal notation with a '%' prepended.
|
||||
func encodeGrpcMessage(msg string) string {
|
||||
if msg == "" {
|
||||
return ""
|
||||
@@ -454,7 +412,7 @@ func encodeGrpcMessage(msg string) string {
|
||||
lenMsg := len(msg)
|
||||
for i := 0; i < lenMsg; i++ {
|
||||
c := msg[i]
|
||||
if !(c >= spaceByte && c < tildaByte && c != percentByte) {
|
||||
if !(c >= spaceByte && c <= tildeByte && c != percentByte) {
|
||||
return encodeGrpcMessageUnchecked(msg)
|
||||
}
|
||||
}
|
||||
@@ -463,14 +421,26 @@ func encodeGrpcMessage(msg string) string {
|
||||
|
||||
func encodeGrpcMessageUnchecked(msg string) string {
|
||||
var buf bytes.Buffer
|
||||
lenMsg := len(msg)
|
||||
for i := 0; i < lenMsg; i++ {
|
||||
c := msg[i]
|
||||
if c >= spaceByte && c < tildaByte && c != percentByte {
|
||||
buf.WriteByte(c)
|
||||
} else {
|
||||
buf.WriteString(fmt.Sprintf("%%%02X", c))
|
||||
for len(msg) > 0 {
|
||||
r, size := utf8.DecodeRuneInString(msg)
|
||||
for _, b := range []byte(string(r)) {
|
||||
if size > 1 {
|
||||
// If size > 1, r is not ascii. Always do percent encoding.
|
||||
buf.WriteString(fmt.Sprintf("%%%02X", b))
|
||||
continue
|
||||
}
|
||||
|
||||
// The for loop is necessary even if size == 1. r could be
|
||||
// utf8.RuneError.
|
||||
//
|
||||
// fmt.Sprintf("%%%02X", utf8.RuneError) gives "%FFFD".
|
||||
if b >= spaceByte && b <= tildeByte && b != percentByte {
|
||||
buf.WriteByte(b)
|
||||
} else {
|
||||
buf.WriteString(fmt.Sprintf("%%%02X", b))
|
||||
}
|
||||
}
|
||||
msg = msg[size:]
|
||||
}
|
||||
return buf.String()
|
||||
}
|
||||
@@ -531,10 +501,14 @@ func (w *bufWriter) Write(b []byte) (n int, err error) {
|
||||
if w.err != nil {
|
||||
return 0, w.err
|
||||
}
|
||||
n = copy(w.buf[w.offset:], b)
|
||||
w.offset += n
|
||||
if w.offset >= w.batchSize {
|
||||
err = w.Flush()
|
||||
for len(b) > 0 {
|
||||
nn := copy(w.buf[w.offset:], b)
|
||||
b = b[nn:]
|
||||
w.offset += nn
|
||||
n += nn
|
||||
if w.offset >= w.batchSize {
|
||||
err = w.Flush()
|
||||
}
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
35
vendor/google.golang.org/grpc/transport/stream.go
generated
vendored
Normal file
35
vendor/google.golang.org/grpc/transport/stream.go
generated
vendored
Normal file
@@ -0,0 +1,35 @@
|
||||
// +build !js !wasm
|
||||
|
||||
/*
|
||||
*
|
||||
* Copyright 2018 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package transport
|
||||
|
||||
import (
|
||||
"io"
|
||||
)
|
||||
|
||||
// Read reads all p bytes from the wire for this stream.
|
||||
func (s *Stream) Read(p []byte) (n int, err error) {
|
||||
// Don't request a read if there was an error earlier
|
||||
if er := s.trReader.(*transportReader).er; er != nil {
|
||||
return 0, er
|
||||
}
|
||||
s.requestRead(len(p))
|
||||
return io.ReadFull(s.trReader, p)
|
||||
}
|
||||
40
vendor/google.golang.org/grpc/transport/stream_js.go
generated
vendored
Normal file
40
vendor/google.golang.org/grpc/transport/stream_js.go
generated
vendored
Normal file
@@ -0,0 +1,40 @@
|
||||
// +build js,wasm
|
||||
|
||||
/*
|
||||
*
|
||||
* Copyright 2018 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package transport
|
||||
|
||||
import (
|
||||
"io"
|
||||
)
|
||||
|
||||
// Read reads all p bytes from the wire for this stream.
|
||||
func (s *Stream) Read(p []byte) (n int, err error) {
|
||||
// block until body has been set
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return 0, s.ctx.Err()
|
||||
case <-s.respChan:
|
||||
if s.writeErr != nil {
|
||||
return 0, s.writeErr
|
||||
}
|
||||
n, err := io.ReadFull(s.body, p)
|
||||
return n, err
|
||||
}
|
||||
}
|
||||
55
vendor/google.golang.org/grpc/transport/transport.go
generated
vendored
55
vendor/google.golang.org/grpc/transport/transport.go
generated
vendored
@@ -26,6 +26,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
@@ -185,13 +186,20 @@ type Stream struct {
|
||||
|
||||
headerChan chan struct{} // closed to indicate the end of header metadata.
|
||||
headerDone uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
|
||||
header metadata.MD // the received header metadata.
|
||||
trailer metadata.MD // the key-value map of trailer metadata.
|
||||
|
||||
headerOk bool // becomes true from the first header is about to send
|
||||
state streamState
|
||||
// hdrMu protects header and trailer metadata on the server-side.
|
||||
hdrMu sync.Mutex
|
||||
header metadata.MD // the received header metadata.
|
||||
trailer metadata.MD // the key-value map of trailer metadata.
|
||||
|
||||
status *status.Status // the status error received from the server
|
||||
// On the server-side, headerSent is atomically set to 1 when the headers are sent out.
|
||||
headerSent uint32
|
||||
|
||||
state streamState
|
||||
|
||||
// On client-side it is the status error received from the server.
|
||||
// On server-side it is unused.
|
||||
status *status.Status
|
||||
|
||||
bytesReceived uint32 // indicates whether any bytes have been received on this stream
|
||||
unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream
|
||||
@@ -199,6 +207,24 @@ type Stream struct {
|
||||
// contentSubtype is the content-subtype for requests.
|
||||
// this must be lowercase or the behavior is undefined.
|
||||
contentSubtype string
|
||||
|
||||
// only used by WASM clients
|
||||
req *http.Request
|
||||
requestSent uint32 // indicates that the request has been sent
|
||||
body io.ReadCloser
|
||||
writeErr error
|
||||
respChan chan struct{} // closed to indicate the body or writeErr has been set.
|
||||
}
|
||||
|
||||
// isHeaderSent is only valid on the server-side.
|
||||
func (s *Stream) isHeaderSent() bool {
|
||||
return atomic.LoadUint32(&s.headerSent) == 1
|
||||
}
|
||||
|
||||
// updateHeaderSent updates headerSent and returns true
|
||||
// if it was alreay set. It is valid only on server-side.
|
||||
func (s *Stream) updateHeaderSent() bool {
|
||||
return atomic.SwapUint32(&s.headerSent, 1) == 1
|
||||
}
|
||||
|
||||
func (s *Stream) swapState(st streamState) streamState {
|
||||
@@ -313,10 +339,12 @@ func (s *Stream) SetHeader(md metadata.MD) error {
|
||||
if md.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
if s.headerOk || atomic.LoadUint32((*uint32)(&s.state)) == uint32(streamDone) {
|
||||
if s.isHeaderSent() || s.getState() == streamDone {
|
||||
return ErrIllegalHeaderWrite
|
||||
}
|
||||
s.hdrMu.Lock()
|
||||
s.header = metadata.Join(s.header, md)
|
||||
s.hdrMu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -335,7 +363,12 @@ func (s *Stream) SetTrailer(md metadata.MD) error {
|
||||
if md.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
if s.getState() == streamDone {
|
||||
return ErrIllegalHeaderWrite
|
||||
}
|
||||
s.hdrMu.Lock()
|
||||
s.trailer = metadata.Join(s.trailer, md)
|
||||
s.hdrMu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -343,16 +376,6 @@ func (s *Stream) write(m recvMsg) {
|
||||
s.buf.put(m)
|
||||
}
|
||||
|
||||
// Read reads all p bytes from the wire for this stream.
|
||||
func (s *Stream) Read(p []byte) (n int, err error) {
|
||||
// Don't request a read if there was an error earlier
|
||||
if er := s.trReader.(*transportReader).er; er != nil {
|
||||
return 0, er
|
||||
}
|
||||
s.requestRead(len(p))
|
||||
return io.ReadFull(s.trReader, p)
|
||||
}
|
||||
|
||||
// tranportReader reads all the data available for this Stream from the transport and
|
||||
// passes them into the decoder, which converts them into a gRPC message stream.
|
||||
// The error is io.EOF when the stream is done or another non-nil error if
|
||||
|
||||
Reference in New Issue
Block a user