1
0
mirror of https://github.com/taigrr/wasm-experiments synced 2025-01-18 04:03:21 -08:00

Add prototype Invoke method and regenerate files

This commit is contained in:
Johan Brandhorst
2018-06-01 23:18:57 +01:00
parent 5922c39730
commit e48ef08afd
7 changed files with 238 additions and 70 deletions

File diff suppressed because one or more lines are too long

View File

@@ -1,13 +1,25 @@
package main
import (
"bufio"
"bytes"
"context"
"encoding/base64"
"encoding/binary"
"errors"
"fmt"
"io/ioutil"
"io"
"net/http"
"strconv"
"strings"
"time"
"github.com/golang/protobuf/proto"
_ "google.golang.org/genproto/googleapis/rpc/errdetails"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"github.com/johanbrandhorst/fetch"
"github.com/johanbrandhorst/wasm-experiments/grpc/proto/server"
@@ -21,84 +33,220 @@ import (
//go:generate bash -c "go run assets_generate.go"
func main() {
c := http.Client{
Transport: &fetch.Transport{},
}
b, err := proto.Marshal(&server.GetUserRequest{
s := newClientConn("", "web.Backend")
req := &server.GetUserRequest{
UserId: "1234",
})
}
resp := new(server.User)
err := s.Invoke(context.Background(), "GetUser", req, resp)
if err != nil {
fmt.Println(err)
st := status.Convert(err)
fmt.Println(st.Code(), st.Message(), st.Details())
return
}
fmt.Println(resp.GetId())
req.UserId = "123"
err = s.Invoke(context.Background(), "GetUser", req, resp)
if err != nil {
st := status.Convert(err)
fmt.Println(st.Code(), st.Message(), st.Details())
return
}
fmt.Println(resp.GetId())
}
type ClientConn struct {
client *http.Client
service string
host string
}
func newClientConn(host, service string) *ClientConn {
return &ClientConn{
client: &http.Client{
Transport: &fetch.Transport{},
},
service: service,
host: host,
}
}
func (cc *ClientConn) Invoke(ctx context.Context, method string, in, out proto.Message) error {
b, err := proto.Marshal(in)
if err != nil {
return status.Error(codes.Internal, err.Error())
}
bufHeader := make([]byte, 5)
// Write length of b into buf
binary.BigEndian.PutUint32(bufHeader[1:], uint32(len(b)))
req, err := http.NewRequest("POST", "/web.Backend/GetUser", bytes.NewBuffer(append(bufHeader, b...)))
req, err := http.NewRequest(
"POST",
strings.Join([]string{cc.host, cc.service, method}, "/"),
bytes.NewBuffer(append(bufHeader, b...)),
)
if err != nil {
fmt.Println(err)
return
return status.Error(codes.Internal, err.Error())
}
req.Header.Add("content-type", "application/grpc-web+proto")
//ctx, _ := context.WithTimeout(context.Background(), time.Second)
//req = req.WithContext(ctx)
req = req.WithContext(ctx)
addHeaders(req)
resp, err := c.Do(req)
resp, err := cc.client.Do(req)
if err != nil {
fmt.Println(err)
return
return status.Error(codes.Internal, err.Error())
}
defer resp.Body.Close()
st := statusFromHeaders(resp.Header)
if st.Code() != codes.OK {
return st.Err()
}
msgHeader := make([]byte, 5)
for {
header := make([]byte, 5)
_, err := resp.Body.Read(header)
_, err := resp.Body.Read(msgHeader)
if err != nil {
fmt.Println(err)
return
return status.Error(codes.Internal, err.Error())
}
if header[0] == 0x80 {
trailers, err := ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Println(err)
return
}
fmt.Println(string(trailers))
return
// 1 in MSB signifies that this is the trailer. Break loop.
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md#protocol-differences-vs-grpc-over-http2
if msgHeader[0]>>7 == 1 {
break
}
length := binary.BigEndian.Uint32(header[1:])
msgLen := binary.BigEndian.Uint32(msgHeader[1:])
message := make([]byte, length)
_, err = resp.Body.Read(message)
msg := make([]byte, msgLen)
_, err = resp.Body.Read(msg)
if err != nil {
fmt.Println(err)
return
return status.Error(codes.Internal, err.Error())
}
/*
status := resp.Header.Get("grpc-status")
statusCode, err := strconv.Atoi(status)
if err != nil {
fmt.Println(err)
return
}
code := codes.Code(statusCode)
if code != codes.OK {
msg := resp.Header.Get("grpc-message")
fmt.Println(msg)
return
}
*/
user := new(server.User)
err = proto.Unmarshal(message, user)
err = proto.Unmarshal(msg, out)
if err != nil {
fmt.Println(err)
return
return status.Error(codes.Internal, err.Error())
}
}
fmt.Println(user.Id)
if msgHeader[0]&1 == 0 {
trailers, err := readTrailers(resp.Body)
if err != nil {
return status.Error(codes.Internal, err.Error())
}
st = statusFromHeaders(trailers)
if st.Code() != codes.OK {
return st.Err()
}
} else {
// TODO(johanbrandhorst): Support compressed trailers
}
return nil
}
func addHeaders(req *http.Request) {
// TODO: Add more headers
// https://github.com/grpc/grpc-go/blob/590da37e2dfb4705d8ebd9574ce4cb75295d9674/transport/http2_client.go#L356
req.Header.Add("content-type", "application/grpc-web+proto")
if dl, ok := req.Context().Deadline(); ok {
timeout := dl.Sub(time.Now())
req.Header.Add("grpc-timeout", encodeTimeout(timeout))
}
md, ok := metadata.FromOutgoingContext(req.Context())
if ok {
for h, vs := range md {
for _, v := range vs {
req.Header.Add(h, v)
}
}
}
}
const maxTimeoutValue int64 = 100000000 - 1
// Copied from grpc-go
// https://github.com/grpc/grpc-go/blob/590da37e2dfb4705d8ebd9574ce4cb75295d9674/transport/http_util.go#L388
// div does integer division and round-up the result. Note that this is
// equivalent to (d+r-1)/r but has less chance to overflow.
func div(d, r time.Duration) int64 {
if m := d % r; m > 0 {
return int64(d/r + 1)
}
return int64(d / r)
}
// Copied from grpc-go
// https://github.com/grpc/grpc-go/blob/590da37e2dfb4705d8ebd9574ce4cb75295d9674/transport/http_util.go#L398
func encodeTimeout(t time.Duration) string {
if t <= 0 {
return "0n"
}
if d := div(t, time.Nanosecond); d <= maxTimeoutValue {
return strconv.FormatInt(d, 10) + "n"
}
if d := div(t, time.Microsecond); d <= maxTimeoutValue {
return strconv.FormatInt(d, 10) + "u"
}
if d := div(t, time.Millisecond); d <= maxTimeoutValue {
return strconv.FormatInt(d, 10) + "m"
}
if d := div(t, time.Second); d <= maxTimeoutValue {
return strconv.FormatInt(d, 10) + "S"
}
if d := div(t, time.Minute); d <= maxTimeoutValue {
return strconv.FormatInt(d, 10) + "M"
}
// Note that maxTimeoutValue * time.Hour > MaxInt64.
return strconv.FormatInt(div(t, time.Hour), 10) + "H"
}
// Copied from grpc-go
// https://github.com/grpc/grpc-go/blob/b94ea975f3beb73799fac17cc24ee923fcd3cb5c/transport/http_util.go#L213
func decodeBinHeader(v string) ([]byte, error) {
if len(v)%4 == 0 {
// Input was padded, or padding was not necessary.
return base64.StdEncoding.DecodeString(v)
}
return base64.RawStdEncoding.DecodeString(v)
}
func readTrailers(in io.Reader) (http.Header, error) {
s := bufio.NewScanner(in)
trailers := http.Header{}
for s.Scan() {
v := s.Text()
kv := strings.SplitN(v, ": ", 2)
if len(kv) != 2 {
return nil, errors.New("malformed header: " + v)
}
trailers.Add(kv[0], kv[1])
}
return trailers, s.Err()
}
func statusFromHeaders(h http.Header) *status.Status {
details := h.Get("grpc-status-details-bin")
if details != "" {
b, err := decodeBinHeader(details)
if err != nil {
return status.New(codes.Internal, "malformed grps-status-details-bin header: "+err.Error())
}
s := &spb.Status{}
err = proto.Unmarshal(b, s)
if err != nil {
return status.New(codes.Internal, "malformed grps-status-details-bin header: "+err.Error())
}
return status.FromProto(s)
}
sh := h.Get("grpc-status")
if sh != "" {
val, err := strconv.Atoi(sh)
if err != nil {
return status.New(codes.Internal, "malformed grpc-status header: "+err.Error())
}
return status.New(codes.Code(val), h.Get("grpc-message"))
}
return status.New(codes.OK, "")
}