Stream mode when http2

This commit is contained in:
SALLEYRON Julien 2017-10-24 14:38:02 +02:00 committed by Traefiker
parent 35ca40c3de
commit 7400c39511
6 changed files with 227 additions and 34 deletions

6
glide.lock generated
View file

@ -1,5 +1,5 @@
hash: 45cf1c60c4c2c584ee9514e24dee16debb8e88e59517a4b82ec91600b8904dfe hash: de7e6a0069090a5811c003db434da19fe31efcf0c9429d3ccb676295708f0d2b
updated: 2017-10-23T15:19:16.848940186+02:00 updated: 2017-10-24T14:08:11.364720581+02:00
imports: imports:
- name: cloud.google.com/go - name: cloud.google.com/go
version: 2e6a95edb1071d750f6d7db777bf66cd2997af6c version: 2e6a95edb1071d750f6d7db777bf66cd2997af6c
@ -481,7 +481,7 @@ imports:
- name: github.com/urfave/negroni - name: github.com/urfave/negroni
version: 490e6a555d47ca891a89a150d0c1ef3922dfffe9 version: 490e6a555d47ca891a89a150d0c1ef3922dfffe9
- name: github.com/vulcand/oxy - name: github.com/vulcand/oxy
version: c66eb2065193ca9264781f951e92c245b2ec81c2 version: 7e9763c4dc71b9758379da3581e6495c145caaab
repo: https://github.com/containous/oxy.git repo: https://github.com/containous/oxy.git
vcs: git vcs: git
subpackages: subpackages:

View file

@ -12,7 +12,7 @@ import:
- package: github.com/cenk/backoff - package: github.com/cenk/backoff
- package: github.com/containous/flaeg - package: github.com/containous/flaeg
- package: github.com/vulcand/oxy - package: github.com/vulcand/oxy
version: c66eb2065193ca9264781f951e92c245b2ec81c2 version: 7e9763c4dc71b9758379da3581e6495c145caaab
repo: https://github.com/containous/oxy.git repo: https://github.com/containous/oxy.git
vcs: git vcs: git
subpackages: subpackages:

View file

@ -1,8 +1,10 @@
package integration package integration
import ( import (
"crypto/rand"
"crypto/tls" "crypto/tls"
"crypto/x509" "crypto/x509"
"errors"
"io/ioutil" "io/ioutil"
"net" "net"
"os" "os"
@ -22,7 +24,9 @@ var LocalhostKey []byte
// GRPCSuite // GRPCSuite
type GRPCSuite struct{ BaseSuite } type GRPCSuite struct{ BaseSuite }
type myserver struct{} type myserver struct {
stopStreamExample chan bool
}
func (s *GRPCSuite) SetUpSuite(c *check.C) { func (s *GRPCSuite) SetUpSuite(c *check.C) {
var err error var err error
@ -36,7 +40,15 @@ func (s *myserver) SayHello(ctx context.Context, in *helloworld.HelloRequest) (*
return &helloworld.HelloReply{Message: "Hello " + in.Name}, nil return &helloworld.HelloReply{Message: "Hello " + in.Name}, nil
} }
func startGRPCServer(lis net.Listener) error { func (s *myserver) StreamExample(in *helloworld.StreamExampleRequest, server helloworld.Greeter_StreamExampleServer) error {
data := make([]byte, 512)
rand.Read(data)
server.Send(&helloworld.StreamExampleReply{Data: string(data)})
<-s.stopStreamExample
return nil
}
func startGRPCServer(lis net.Listener, server *myserver) error {
cert, err := tls.X509KeyPair(LocalhostCert, LocalhostKey) cert, err := tls.X509KeyPair(LocalhostCert, LocalhostKey)
if err != nil { if err != nil {
return err return err
@ -45,26 +57,30 @@ func startGRPCServer(lis net.Listener) error {
creds := credentials.NewServerTLSFromCert(&cert) creds := credentials.NewServerTLSFromCert(&cert)
serverOption := grpc.Creds(creds) serverOption := grpc.Creds(creds)
var s *grpc.Server = grpc.NewServer(serverOption) s := grpc.NewServer(serverOption)
defer s.Stop() defer s.Stop()
helloworld.RegisterGreeterServer(s, &myserver{}) helloworld.RegisterGreeterServer(s, server)
return s.Serve(lis) return s.Serve(lis)
} }
func getHelloClientGRPC() (helloworld.GreeterClient, func() error, error) {
func callHelloClientGRPC() (string, error) {
roots := x509.NewCertPool() roots := x509.NewCertPool()
roots.AppendCertsFromPEM(LocalhostCert) roots.AppendCertsFromPEM(LocalhostCert)
credsClient := credentials.NewClientTLSFromCert(roots, "") credsClient := credentials.NewClientTLSFromCert(roots, "")
conn, err := grpc.Dial("127.0.0.1:4443", grpc.WithTransportCredentials(credsClient)) conn, err := grpc.Dial("127.0.0.1:4443", grpc.WithTransportCredentials(credsClient))
if err != nil { if err != nil {
return "", err return nil, func() error { return nil }, err
}
return helloworld.NewGreeterClient(conn), conn.Close, nil
} }
defer conn.Close() func callHelloClientGRPC(name string) (string, error) {
client := helloworld.NewGreeterClient(conn) client, closer, err := getHelloClientGRPC()
defer closer()
name := "World" if err != nil {
return "", err
}
r, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: name}) r, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: name})
if err != nil { if err != nil {
return "", err return "", err
@ -72,13 +88,26 @@ func callHelloClientGRPC() (string, error) {
return r.Message, nil return r.Message, nil
} }
func callStreamExampleClientGRPC() (helloworld.Greeter_StreamExampleClient, func() error, error) {
client, closer, err := getHelloClientGRPC()
if err != nil {
return nil, closer, err
}
t, err := client.StreamExample(context.Background(), &helloworld.StreamExampleRequest{})
if err != nil {
return nil, closer, err
}
return t, closer, nil
}
func (s *GRPCSuite) TestGRPC(c *check.C) { func (s *GRPCSuite) TestGRPC(c *check.C) {
lis, err := net.Listen("tcp", ":0") lis, err := net.Listen("tcp", ":0")
_, port, err := net.SplitHostPort(lis.Addr().String()) _, port, err := net.SplitHostPort(lis.Addr().String())
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
go func() { go func() {
err := startGRPCServer(lis) err := startGRPCServer(lis, &myserver{})
c.Log(err) c.Log(err)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
}() }()
@ -106,7 +135,7 @@ func (s *GRPCSuite) TestGRPC(c *check.C) {
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
var response string var response string
err = try.Do(1*time.Second, func() error { err = try.Do(1*time.Second, func() error {
response, err = callHelloClientGRPC() response, err = callHelloClientGRPC("World")
return err return err
}) })
@ -120,7 +149,7 @@ func (s *GRPCSuite) TestGRPCInsecure(c *check.C) {
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
go func() { go func() {
err := startGRPCServer(lis) err := startGRPCServer(lis, &myserver{})
c.Log(err) c.Log(err)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
}() }()
@ -148,10 +177,68 @@ func (s *GRPCSuite) TestGRPCInsecure(c *check.C) {
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
var response string var response string
err = try.Do(1*time.Second, func() error { err = try.Do(1*time.Second, func() error {
response, err = callHelloClientGRPC() response, err = callHelloClientGRPC("World")
return err return err
}) })
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(response, check.Equals, "Hello World") c.Assert(response, check.Equals, "Hello World")
} }
func (s *GRPCSuite) TestGRPCBuffer(c *check.C) {
stopStreamExample := make(chan bool)
defer func() { stopStreamExample <- true }()
lis, err := net.Listen("tcp", ":0")
_, port, err := net.SplitHostPort(lis.Addr().String())
c.Assert(err, check.IsNil)
go func() {
err := startGRPCServer(lis, &myserver{
stopStreamExample: stopStreamExample,
})
c.Log(err)
c.Assert(err, check.IsNil)
}()
file := s.adaptFile(c, "fixtures/grpc/config.toml", struct {
CertContent string
KeyContent string
GRPCServerPort string
}{
CertContent: string(LocalhostCert),
KeyContent: string(LocalhostKey),
GRPCServerPort: port,
})
defer os.Remove(file)
cmd, display := s.traefikCmd(withConfigFile(file))
defer display(c)
err = cmd.Start()
c.Assert(err, check.IsNil)
defer cmd.Process.Kill()
// wait for Traefik
err = try.GetRequest("http://127.0.0.1:8080/api/providers", 1*time.Second, try.BodyContains("Host:127.0.0.1"))
c.Assert(err, check.IsNil)
var client helloworld.Greeter_StreamExampleClient
client, closer, err := callStreamExampleClientGRPC()
defer closer()
received := make(chan bool)
go func() {
tr, _ := client.Recv()
c.Assert(len(tr.Data), check.Equals, 512)
received <- true
}()
err = try.Do(time.Second*10, func() error {
select {
case <-received:
return nil
default:
return errors.New("failed to receive stream data")
}
})
c.Assert(err, check.IsNil)
}

View file

@ -10,6 +10,8 @@ It is generated from these files:
It has these top-level messages: It has these top-level messages:
HelloRequest HelloRequest
HelloReply HelloReply
StreamExampleRequest
StreamExampleReply
*/ */
package helloworld package helloworld
@ -67,9 +69,35 @@ func (m *HelloReply) GetMessage() string {
return "" return ""
} }
type StreamExampleRequest struct {
}
func (m *StreamExampleRequest) Reset() { *m = StreamExampleRequest{} }
func (m *StreamExampleRequest) String() string { return proto.CompactTextString(m) }
func (*StreamExampleRequest) ProtoMessage() {}
func (*StreamExampleRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
type StreamExampleReply struct {
Data string `protobuf:"bytes,1,opt,name=data" json:"data,omitempty"`
}
func (m *StreamExampleReply) Reset() { *m = StreamExampleReply{} }
func (m *StreamExampleReply) String() string { return proto.CompactTextString(m) }
func (*StreamExampleReply) ProtoMessage() {}
func (*StreamExampleReply) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
func (m *StreamExampleReply) GetData() string {
if m != nil {
return m.Data
}
return ""
}
func init() { func init() {
proto.RegisterType((*HelloRequest)(nil), "helloworld.HelloRequest") proto.RegisterType((*HelloRequest)(nil), "helloworld.HelloRequest")
proto.RegisterType((*HelloReply)(nil), "helloworld.HelloReply") proto.RegisterType((*HelloReply)(nil), "helloworld.HelloReply")
proto.RegisterType((*StreamExampleRequest)(nil), "helloworld.StreamExampleRequest")
proto.RegisterType((*StreamExampleReply)(nil), "helloworld.StreamExampleReply")
} }
// Reference imports to suppress errors if they are not otherwise used. // Reference imports to suppress errors if they are not otherwise used.
@ -85,6 +113,8 @@ const _ = grpc.SupportPackageIsVersion4
type GreeterClient interface { type GreeterClient interface {
// Sends a greeting // Sends a greeting
SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error)
// Tick me
StreamExample(ctx context.Context, in *StreamExampleRequest, opts ...grpc.CallOption) (Greeter_StreamExampleClient, error)
} }
type greeterClient struct { type greeterClient struct {
@ -104,11 +134,45 @@ func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...
return out, nil return out, nil
} }
func (c *greeterClient) StreamExample(ctx context.Context, in *StreamExampleRequest, opts ...grpc.CallOption) (Greeter_StreamExampleClient, error) {
stream, err := grpc.NewClientStream(ctx, &_Greeter_serviceDesc.Streams[0], c.cc, "/helloworld.Greeter/StreamExample", opts...)
if err != nil {
return nil, err
}
x := &greeterStreamExampleClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type Greeter_StreamExampleClient interface {
Recv() (*StreamExampleReply, error)
grpc.ClientStream
}
type greeterStreamExampleClient struct {
grpc.ClientStream
}
func (x *greeterStreamExampleClient) Recv() (*StreamExampleReply, error) {
m := new(StreamExampleReply)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// Server API for Greeter service // Server API for Greeter service
type GreeterServer interface { type GreeterServer interface {
// Sends a greeting // Sends a greeting
SayHello(context.Context, *HelloRequest) (*HelloReply, error) SayHello(context.Context, *HelloRequest) (*HelloReply, error)
// Tick me
StreamExample(*StreamExampleRequest, Greeter_StreamExampleServer) error
} }
func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) { func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) {
@ -133,6 +197,27 @@ func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(in
return interceptor(ctx, in, info, handler) return interceptor(ctx, in, info, handler)
} }
func _Greeter_StreamExample_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(StreamExampleRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(GreeterServer).StreamExample(m, &greeterStreamExampleServer{stream})
}
type Greeter_StreamExampleServer interface {
Send(*StreamExampleReply) error
grpc.ServerStream
}
type greeterStreamExampleServer struct {
grpc.ServerStream
}
func (x *greeterStreamExampleServer) Send(m *StreamExampleReply) error {
return x.ServerStream.SendMsg(m)
}
var _Greeter_serviceDesc = grpc.ServiceDesc{ var _Greeter_serviceDesc = grpc.ServiceDesc{
ServiceName: "helloworld.Greeter", ServiceName: "helloworld.Greeter",
HandlerType: (*GreeterServer)(nil), HandlerType: (*GreeterServer)(nil),
@ -142,23 +227,33 @@ var _Greeter_serviceDesc = grpc.ServiceDesc{
Handler: _Greeter_SayHello_Handler, Handler: _Greeter_SayHello_Handler,
}, },
}, },
Streams: []grpc.StreamDesc{}, Streams: []grpc.StreamDesc{
{
StreamName: "StreamExample",
Handler: _Greeter_StreamExample_Handler,
ServerStreams: true,
},
},
Metadata: "helloworld.proto", Metadata: "helloworld.proto",
} }
func init() { proto.RegisterFile("helloworld.proto", fileDescriptor0) } func init() { proto.RegisterFile("helloworld.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{ var fileDescriptor0 = []byte{
// 175 bytes of a gzipped FileDescriptorProto // 231 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0xc8, 0x48, 0xcd, 0xc9, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x50, 0xc1, 0x4a, 0xc4, 0x30,
0xc9, 0x2f, 0xcf, 0x2f, 0xca, 0x49, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x42, 0x88, 0x10, 0x35, 0xb0, 0xb8, 0x3a, 0x28, 0xca, 0x20, 0x4b, 0x59, 0x41, 0x96, 0x1c, 0x64, 0x4f, 0xa1,
0x28, 0x29, 0x71, 0xf1, 0x78, 0x80, 0x78, 0x41, 0xa9, 0x85, 0xa5, 0xa9, 0xc5, 0x25, 0x42, 0x42, 0xe8, 0xdd, 0x43, 0x41, 0xf4, 0x58, 0x5a, 0xc4, 0x73, 0xb4, 0x43, 0x15, 0x12, 0x13, 0x93, 0x88,
0x5c, 0x2c, 0x79, 0x89, 0xb9, 0xa9, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x9c, 0x41, 0x60, 0xb6, 0x92, 0xf6, 0x6f, 0xfc, 0x54, 0x49, 0x6c, 0x31, 0x4a, 0xf1, 0xf6, 0x66, 0xe6, 0xe5, 0xbd, 0x97, 0x07,
0x1a, 0x17, 0x17, 0x54, 0x4d, 0x41, 0x4e, 0xa5, 0x90, 0x04, 0x17, 0x7b, 0x6e, 0x6a, 0x71, 0x71, 0xc7, 0x4f, 0xa4, 0x94, 0x79, 0x37, 0x4e, 0x75, 0xc2, 0x3a, 0x13, 0x0c, 0xc2, 0xcf, 0x86, 0x73,
0x62, 0x3a, 0x4c, 0x11, 0x8c, 0x6b, 0xe4, 0xc9, 0xc5, 0xee, 0x5e, 0x94, 0x9a, 0x5a, 0x92, 0x5a, 0x38, 0xb8, 0x8d, 0x53, 0x43, 0xaf, 0x6f, 0xe4, 0x03, 0x22, 0x2c, 0x5e, 0xa4, 0xa6, 0x82, 0x6d,
0x24, 0x64, 0xc7, 0xc5, 0x11, 0x9c, 0x58, 0x09, 0xd6, 0x25, 0x24, 0xa1, 0x87, 0xe4, 0x02, 0x64, 0xd8, 0x76, 0xbf, 0x49, 0x98, 0x9f, 0x03, 0x8c, 0x1c, 0xab, 0x06, 0x2c, 0x60, 0xa9, 0xc9, 0x7b,
0xcb, 0xa4, 0xc4, 0xb0, 0xc8, 0x14, 0xe4, 0x54, 0x2a, 0x31, 0x38, 0x19, 0x70, 0x49, 0x67, 0xe6, 0xd9, 0x4f, 0xa4, 0x69, 0xe4, 0x2b, 0x38, 0x69, 0x83, 0x23, 0xa9, 0xaf, 0x3f, 0xa4, 0xb6, 0x8a,
0xeb, 0xa5, 0x17, 0x15, 0x24, 0xeb, 0xa5, 0x56, 0x24, 0xe6, 0x16, 0xe4, 0xa4, 0x16, 0x23, 0xa9, 0x46, 0x4d, 0xbe, 0x05, 0xfc, 0xb3, 0x8f, 0x3a, 0x08, 0x8b, 0x4e, 0x06, 0x39, 0x39, 0x45, 0x7c,
0x75, 0xe2, 0x07, 0x2b, 0x0e, 0x07, 0xb1, 0x03, 0x40, 0x5e, 0x0a, 0x60, 0x4c, 0x62, 0x03, 0xfb, 0xf1, 0xc9, 0x60, 0x79, 0xe3, 0x88, 0x02, 0x39, 0xbc, 0x82, 0xbd, 0x56, 0x0e, 0xc9, 0x18, 0x0b,
0xcd, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0x0f, 0xb7, 0xcd, 0xf2, 0xef, 0x00, 0x00, 0x00, 0x91, 0x7d, 0x22, 0xcf, 0xbb, 0x5e, 0xcd, 0x5c, 0xac, 0x1a, 0xf8, 0x0e, 0xde, 0xc1, 0xe1, 0x2f,
0x57, 0xdc, 0xe4, 0xd4, 0xb9, 0xa0, 0xeb, 0xb3, 0x7f, 0x18, 0x49, 0xb4, 0x64, 0x55, 0x09, 0xa7,
0xcf, 0x46, 0xf4, 0xce, 0x3e, 0x0a, 0xfa, 0xbe, 0xf9, 0xec, 0x55, 0x75, 0x94, 0x32, 0xdc, 0x47,
0x5c, 0xc7, 0xb2, 0x6b, 0xf6, 0xb0, 0x9b, 0x5a, 0xbf, 0xfc, 0x0a, 0x00, 0x00, 0xff, 0xff, 0x6f,
0x5f, 0xae, 0xb8, 0x89, 0x01, 0x00, 0x00,
} }

View file

@ -9,7 +9,10 @@ package helloworld;
// The greeting service definition. // The greeting service definition.
service Greeter { service Greeter {
// Sends a greeting // Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {} rpc SayHello (HelloRequest) returns (HelloReply) {};
rpc StreamExample (StreamExampleRequest) returns (stream StreamExampleReply) {};
} }
// The request message containing the user's name. // The request message containing the user's name.
@ -21,3 +24,9 @@ message HelloRequest {
message HelloReply { message HelloReply {
string message = 1; string message = 1;
} }
message StreamExampleRequest {}
message StreamExampleReply {
string data = 1;
}

View file

@ -190,7 +190,9 @@ func (f *httpForwarder) serveHTTP(w http.ResponseWriter, req *http.Request, ctx
stream = contentType == "text/event-stream" stream = contentType == "text/event-stream"
} }
} }
written, err := io.Copy(newResponseFlusher(w, stream), response.Body)
flush := stream || req.ProtoMajor == 2
written, err := io.Copy(newResponseFlusher(w, flush), response.Body)
if err != nil { if err != nil {
ctx.log.Errorf("Error copying upstream response body: %v", err) ctx.log.Errorf("Error copying upstream response body: %v", err)
ctx.errHandler.ServeHTTP(w, req, err) ctx.errHandler.ServeHTTP(w, req, err)