1
0
mirror of https://github.com/taigrr/wasm-experiments synced 2025-01-18 04:03:21 -08:00

Add server side streaming test

This commit is contained in:
Johan Brandhorst 2018-06-06 09:19:48 +01:00
parent 2392bc267d
commit b98654eb67
No known key found for this signature in database
GPG Key ID: 266C7D9B44EAA057
6 changed files with 197 additions and 6 deletions

2
Gopkg.lock generated
View File

@ -41,7 +41,7 @@
branch = "master"
name = "github.com/johanbrandhorst/grpc-wasm"
packages = ["."]
revision = "d4f5643dc71d1a17cebd3df37826b34b9d3905c5"
revision = "2556db600caef5681127fff6b0cb5381aa977a56"
[[projects]]
name = "github.com/lpar/gzipped"

View File

@ -2,6 +2,8 @@ package backend
import (
"context"
"strconv"
"time"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
@ -40,5 +42,15 @@ func (b Backend) GetUser(ctx context.Context, req *server.GetUserRequest) (*serv
}
func (b Backend) GetUsers(req *server.GetUsersRequest, srv server.Backend_GetUsersServer) error {
for index := 0; index < int(req.GetNumUsers()); index++ {
err := srv.Send(&server.User{
Id: strconv.Itoa(index),
})
if err != nil {
return err
}
time.Sleep(time.Second)
}
return nil
}

File diff suppressed because one or more lines are too long

View File

@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"io"
_ "google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/status"
@ -39,4 +40,27 @@ func main() {
} else {
fmt.Println(resp.GetId())
}
srv, err := client.GetUsers(context.Background(), &server.GetUsersRequest{
NumUsers: 3,
})
if err != nil {
st := status.Convert(err)
fmt.Println(st.Code(), st.Message(), st.Details())
} else {
for {
user, err := srv.Recv()
if err != nil {
if err != io.EOF {
st := status.Convert(err)
fmt.Println(st.Code(), st.Message(), st.Details())
}
break
}
fmt.Println(user.GetId())
}
}
fmt.Println("finished")
}

View File

@ -46,7 +46,16 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
return nil, errors.New("streaming endpoints are not yet supported")
if desc.ClientStreams {
return nil, status.Error(codes.Unimplemented, "client-side streaming is not supported by grpc-web")
}
endpoint := cc.target + "/" + method
if cc.target == "" {
endpoint = method
}
return newStream(ctx, cc.client, endpoint)
}
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {
@ -128,6 +137,7 @@ func addHeaders(req *http.Request) {
// TODO: Add more headers
// https://github.com/grpc/grpc-go/blob/590da37e2dfb4705d8ebd9574ce4cb75295d9674/transport/http2_client.go#L356
req.Header.Add("content-type", "application/grpc-web+proto")
req.Header.Add("x-grpc-web", "1")
if dl, ok := req.Context().Deadline(); ok {
timeout := dl.Sub(time.Now())
req.Header.Add("grpc-timeout", encodeTimeout(timeout))

View File

@ -3,9 +3,18 @@
package grpc
import (
"bytes"
"context"
"encoding/binary"
"io"
"io/ioutil"
"net/http"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
// ClientStream defines the interface a client stream has to satisfy.
@ -49,3 +58,139 @@ type ServiceDesc struct {
Streams []StreamDesc
Metadata interface{}
}
type clientStream struct {
ctx context.Context
req *http.Request
client *http.Client
errCh chan error
msgCh chan []byte
}
func newStream(ctx context.Context, client *http.Client, endpoint string) (*clientStream, error) {
cs := &clientStream{
ctx: ctx,
client: client,
}
req, err := http.NewRequest(
"POST",
endpoint,
nil,
)
if err != nil {
return nil, status.New(codes.Unavailable, err.Error()).Err()
}
cs.req = req.WithContext(ctx)
return cs, nil
}
func (c *clientStream) Header() (metadata.MD, error) {
return nil, nil
}
func (c *clientStream) Trailer() metadata.MD {
return nil
}
func (c *clientStream) Context() context.Context {
return c.ctx
}
func (c *clientStream) RecvMsg(reply interface{}) error {
select {
case <-c.ctx.Done():
return c.ctx.Err()
case err := <-c.errCh:
return err
case msg, ok := <-c.msgCh:
if !ok {
return io.EOF
}
err := proto.Unmarshal(msg, reply.(proto.Message))
return err
}
}
func (c *clientStream) SendMsg(req interface{}) error {
msg, err := proto.Marshal(req.(proto.Message))
if err != nil {
return status.Error(codes.Internal, err.Error())
}
bufHeader := make([]byte, 5)
// Write length of b into buf
binary.BigEndian.PutUint32(bufHeader[1:], uint32(len(msg)))
c.req.Body = ioutil.NopCloser(bytes.NewBuffer(append(bufHeader, msg...)))
addHeaders(c.req)
resp, err := c.client.Do(c.req)
if err != nil {
return status.Error(codes.Unavailable, err.Error())
}
st := statusFromHeaders(resp.Header)
if st.Code() != codes.OK {
resp.Body.Close()
return st.Err()
}
c.errCh = make(chan error, 1)
c.msgCh = make(chan []byte, 1)
// Read response asynchronously
go func() {
defer resp.Body.Close()
msgHeader := make([]byte, 5)
for {
_, err := io.ReadFull(resp.Body, msgHeader)
if err != nil {
c.errCh <- status.Error(codes.Internal, err.Error())
return
}
// 1 in MSB signifies that this is the trailer. Break loop.
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md#protocol-differences-vs-grpc-over-http2
if msgHeader[0]>>7 == 1 {
break
}
msgLen := binary.BigEndian.Uint32(msgHeader[1:])
msg := make([]byte, msgLen)
_, err = io.ReadFull(resp.Body, msg)
if err != nil {
c.errCh <- status.Error(codes.Internal, err.Error())
return
}
c.msgCh <- msg
}
if msgHeader[0]&1 == 0 {
trailers, err := readTrailers(resp.Body)
if err != nil {
c.errCh <- status.Error(codes.Internal, err.Error())
return
}
st = statusFromHeaders(trailers)
} else {
// TODO(johanbrandhorst): Support compressed trailers
}
if st.Code() != codes.OK {
c.errCh <- st.Err()
return
}
close(c.msgCh)
}()
return nil
}
func (c *clientStream) CloseSend() error {
return nil
}