目录

GPRC 进阶

grpc 除了提供四种请求类型之外,还支持很多高级功能:keepalive、请求重试、负载均衡、用户验证等。接下来一一介绍。

GRPC 进阶功能

每个grpc请求都是 stream。

Keepalive

Keepalive 能够让 grpc 的每个 stream 保持长连接状态,适合一些执行时间长的请求。Keepalive 支持在服务端和客户端配置,且只有服务端配置后,客户端的配置才会真正有效。先给出实例的代码在来说明 grpc keepalive 的使用情况:server 实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// ...
var kaep = keepalive.EnforcementPolicy{
	MinTime:             5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection
	PermitWithoutStream: true,            // Allow pings even when there are no active streams
}

var kasp = keepalive.ServerParameters{
	MaxConnectionIdle:     15 * time.Second, // If a client is idle for 15 seconds, send a GOAWAY
	MaxConnectionAge:      30 * time.Second, // If any connection is alive for more than 30 seconds, send a GOAWAY
	MaxConnectionAgeGrace: 5 * time.Second,  // Allow 5 seconds for pending RPCs to complete before forcibly closing connections
	Time:                  5 * time.Second,  // Ping the client if it is idle for 5 seconds to ensure the connection is still active
	Timeout:               1 * time.Second,  // Wait 1 second for the ping ack before assuming the connection is dead
}

// server implements EchoServer.
type server struct {
	pb.UnimplementedEchoServer
}

func (s *server) UnaryEcho(ctx context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) {
	return &pb.EchoResponse{Message: req.Message}, nil
}

func main() {
	address := "50001"
	lis, err := net.Listen("tcp", address)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}

  // 创建 grpc server 时配置服务端的 keepalive
	s := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp))
	pb.RegisterEchoServer(s, &server{})

	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

client 端实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// ...
var kacp = keepalive.ClientParameters{
	Time:                10 * time.Second, // send pings every 10 seconds if there is no activity
	Timeout:             time.Second,      // wait 1 second for ping ack before considering the connection dead
	PermitWithoutStream: true,             // send pings even without active streams
}

func main() {
	conn, err := grpc.Dial("50001", grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp))
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()

	c := pb.NewEchoClient(conn)

	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
	defer cancel()
	fmt.Println("Performing unary request")
	res, err := c.UnaryEcho(ctx, &pb.EchoRequest{Message: "keepalive demo"})
	if err != nil {
		log.Fatalf("unexpected error from UnaryEcho: %v", err)
	}
	fmt.Println("RPC response:", res)
}

keepalive 的实现核心在于 keepalive.EnforcementPolicykeepalive.ServerParameters。首先是 keepalive.ServerParameters。它包含几个属性:

  • MaxConnectionIdle : 最大空闲连接时间,默认为无限制。这段时间为客户端 stream 请求为0 或者建立连接。超出这段时间后,serve 会发送一个 GoWay,强制 client stream 断开。
  • MaxConnectionAge:最大连接时间,默认为无限制。stream 连接超出这个值是发送一个 GoWay
  • MaxConnectionAgeGrace :超出MaxConnectionAge之后的宽限时长,默认无限制,最小为 1s。
  • Time :如果一段时间客户端存活但没有 pings 请求,服务端发送一次 ping 请求,默认是 2hour。
  • Timeout:服务端发送 ping 请求超时的时间,默认20s。

keepalive.EnforcementPolicy在服务端强制执行策略,如果客户端违反改策略则断开连接。它有两个属性:

  • MinTime : 如果在指定时间内收到 pings 请求大于一次,强制断开连接,默认 5min。
  • PermitWithoutStream:没有活动的 stream 也允许pings。默认关闭。

keepalive.ClientParameters是在客户端这侧使用的 keepalive 配置:

  • Time :pings 请求间隔时间,默认无限制,最小为 10s。
  • Timeout :pings 超时时间,默认是 20s。
  • PermitWithoutStream:没有活动的 stream 也允许pings。默认关闭。

请求重试

grpc 支持请求重试,在客户端配置好规则之后,客户端会在请求失败之后尝试重新发起请求。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
var (
	retryPolicy = `{
		"methodConfig": [{
		  "name": [{"service": "mysite.pb.Echo"}],
		  "waitForReady": true,
		  "retryPolicy": {
			  "MaxAttempts": 3,
			  "InitialBackoff": ".01s",
			  "MaxBackoff": "1s",
			  "BackoffMultiplier": 2.0,
			  "RetryableStatusCodes": [ "UNAVAILABLE" ]
		  }
		}]}`
)

// use grpc.WithDefaultServiceConfig() to set service config
func retryDial() (*grpc.ClientConn, error) {
	return grpc.Dial(*addr, grpc.WithInsecure(), grpc.WithDefaultServiceConfig(retryPolicy))
}
// ...

retry 配置只需要在客户端设置即可生效。主要是配置ServerConfig,格式为该链接

  • MaxAttempts :重试的最大次数,最大值是5。
  • InitialBackoff : 初始化重试间隔时间,第一次重试去 Randon(0,initialBackoff)
  • MaxBackoff : 最大重试间隔时间,多次重试是,间隔时间取 random(0,min(initial_backoff*backoff_multiplier**(n-1), max_backoff))
  • RetryableStatusCodes : 设置需要重试的状态码。

负载均衡

grpc 支持客户端负载均衡策略,负载均衡在 grpc name_resolver 的基础上实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
const (
	exampleScheme      = "example"
	exampleServiceName = "lb.example.grpc.io"
)
// ...
func main() {
  // ...
	// round_robin 指定负载均衡策略为轮询策略
	roundrobinConn, err := grpc.Dial(
		fmt.Sprintf("%s:///%s", exampleScheme, exampleServiceName),
		grpc.WithBalancerName("round_robin"), // This sets the initial balancing policy.
		grpc.WithInsecure(),
		grpc.WithBlock(),
	)
  // ...
}

// 配置 name resolver

type exampleResolverBuilder struct{}

func (*exampleResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
	r := &exampleResolver{
		target: target,
		cc:     cc,
		addrsStore: map[string][]string{
			exampleServiceName: addrs,
		},
	}
	r.start()
	return r, nil
}
func (*exampleResolverBuilder) Scheme() string { return exampleScheme }

type exampleResolver struct {
	target     resolver.Target
	cc         resolver.ClientConn
	addrsStore map[string][]string
}

func (r *exampleResolver) start() {
	addrStrs := r.addrsStore[r.target.Endpoint]
	addrs := make([]resolver.Address, len(addrStrs))
	for i, s := range addrStrs {
		addrs[i] = resolver.Address{Addr: s}
	}
	r.cc.UpdateState(resolver.State{Addresses: addrs})
}
func (*exampleResolver) ResolveNow(o resolver.ResolveNowOptions) {}
func (*exampleResolver) Close()                                  {}

func init() {
	resolver.Register(&exampleResolverBuilder{})
}

主要是要实现 resolver.Builder接口

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// Builder creates a resolver that will be used to watch name resolution updates.
type Builder interface {
	// Build creates a new resolver for the given target.
	//
	// gRPC dial calls Build synchronously, and fails if the returned error is
	// not nil.
	Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
	// Scheme returns the scheme supported by this resolver.
	// Scheme is defined at <https://github.com/grpc/grpc/blob/master/doc/naming.md>.
	Scheme() string
}

上面的实现方式不支持动态增减服务端地址,可以使用 etcd 实现负载均衡:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
type etcdBuilder struct {
	prefix    string
	endpoints []string
}

func ETCDBuilder(prefix string, endpoints []string) resolver.Builder {
	return &etcdBuilder{prefix, endpoints}
}

func (b *etcdBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   b.endpoints,
		DialTimeout: 3 * time.Second,
	})
	if err != nil {
		return nil, fmt.Errorf("connect to etcd endpoints error")
	}

	ctx, cancel := context.WithCancel(context.Background())
	rlv := &etcdResolver{
		cc:             cc,
		cli:            cli,
		ctx:            ctx,
		cancel:         cancel,
		watchKeyPrefix: b.prefix,
		freq:           5 * time.Second,
		t:              time.NewTimer(0),
		rn:             make(chan struct{}, 1),
		im:             make(chan []resolver.Address),
		wg:             sync.WaitGroup{},
	}

	rlv.wg.Add(2)
	go rlv.watcher()
	go rlv.FetchBackendsWithWatch()

	return rlv, nil
}

func (b *etcdBuilder) Scheme() string {
	return "etcd"
}

type etcdResolver struct {
	retry  int
	freq   time.Duration
	ctx    context.Context
	cancel context.CancelFunc
	cc     resolver.ClientConn
	cli    *clientv3.Client
	t      *time.Timer

	watchKeyPrefix string

	rn chan struct{}
	im chan []resolver.Address

	wg sync.WaitGroup
}

func (r *etcdResolver) ResolveNow(opt resolver.ResolveNowOptions) {
	select {
        case r.rn <- struct{}{}:
        default:
	}
}

func (r *etcdResolver) Close() {
	r.cancel()
	r.wg.Wait()
	r.t.Stop()
}

func (r *etcdResolver) watcher() {
	defer r.wg.Done()

	for {
		select {
		case <-r.ctx.Done():
			return
		case addrs := <-r.im:
			if len(addrs) > 0 {
				r.retry = 0
				r.t.Reset(r.freq)
				r.cc.UpdateState(resolver.State{Addresses: addrs})
				continue
			}
		case <-r.t.C:
		case <-r.rn:
		}

		result := r.FetchBackends()

		if len(result) == 0 {
			r.retry++
			r.t.Reset(r.freq)
		} else {
			r.retry = 0
			r.t.Reset(r.freq)
		}

		r.cc.UpdateState(resolver.State{Addresses: result})
	}
}

func (r *etcdResolver) FetchBackendsWithWatch() {
	defer r.wg.Done()

	for {
		select {
		case <-r.ctx.Done():
			return
		case _ = <-r.cli.Watch(r.ctx, r.watchKeyPrefix, clientv3.WithPrefix()):
			result := r.FetchBackends()
			r.im <- result
		}
	}
}

func (r *etcdResolver) FetchBackends() []resolver.Address {
	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
	defer cancel()

	result := make([]resolver.Address, 0)

	resp, err := r.cli.Get(ctx, r.watchKeyPrefix, clientv3.WithPrefix())
	if err != nil {
		return result
	}

	for _, kv := range resp.Kvs {
		if strings.TrimSpace(string(kv.Value)) == "" {
			continue
		}
		result = append(result, resolver.Address{Addr: string(kv.Value)})
	}

	return result
}

grpc 加密传输

以上的请求中,grpc 都是通过明文传输数据。但这种方式是很容易泄露数据内容的,grpc 支持 TLS 格式的加密通讯,来保存数据传输的安全性。

TLS 证书

我们首先来生成 TLS 证书

1
2
openssl ecparam -genkey -name secp384r1 -out server.key
openssl req -new -x509 -sha256 -key server.key -out server.pem -days 3650

这里需要填写相关信息

1
2
3
4
5
6
7
Country Name (2 letter code) []:
State or Province Name (full name) []:
Locality Name (eg, city) []:
Organization Name (eg, company) []:
Organizational Unit Name (eg, section) []:
Common Name (eg, fully qualified host name) []: mysite
Email Address []:

填写完成后就生成对应的证书:

1
2
3
ssl
├── server.key
└── server.pem

服务端实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// ...
const PORT = "50001"

func main() {
    // 通过 credentials 加载服务端的TLS证书
    c, err := credentials.NewServerTLSFromFile("../ssl/server.pem", "../ssl/server.key")
    if err != nil {
        log.Fatalf("credentials.NewServerTLSFromFile err: %v", err)
    }
		
  	// 添加 credentials 配置
    server := grpc.NewServer(grpc.Creds(c))
    pb.RegisterSearchServiceServer(server, &SearchService{})

    lis, err := net.Listen("tcp", ":"+PORT)
    if err != nil {
        log.Fatalf("net.Listen err: %v", err)
    }

    server.Serve(lis)
}

客户端实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
const PORT = "9001"

func main() {
    // 添加 credentials 配置
    c, err := credentials.NewClientTLSFromFile("../ssl/server.pem", "mysite")
    if err != nil {
        log.Fatalf("credentials.NewClientTLSFromFile err: %v", err)
    }

  	// 客户端开启证书验证
    conn, err := grpc.Dial(":"+PORT, grpc.WithTransportCredentials(c))
    if err != nil {
        log.Fatalf("grpc.Dial err: %v", err)
    }
    defer conn.Close()

    client := pb.NewSearchServiceClient(conn)
    resp, err := client.Search(context.Background(), &pb.SearchRequest{
        Request: "gRPC",
    })
    if err != nil {
        log.Fatalf("client.Search err: %v", err)
    }

    log.Printf("resp: %s", resp.GetResponse())
}

CA TLS 证书

TLS 证书的安全性还不够高,特别在证书生成之后,server.key文件的传输就成为一个问题。所以 CA 来签发 TLS 证书来解决这个问题。使用开源工具 cfssl 生成对应的证书:1.ca 配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
cat << EOF | tee ca-config.json
{
  "signing": {
    "default": {
      "expiry": "87600h"
    },
    "profiles": {
      "mysite": {
         "expiry": "87600h",
         "usages": [
            "signing",
            "key encipherment",
            "server auth",
            "client auth"
        ]
      }
    }
  }}
EOF

配置 mysite 机构证书可以进行服务端和客户端双向验证。2.ca 证书

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
cat << EOF | tee ca-csr.json
{
    "CN": "mysite CA",
    "key": {
        "algo": "rsa",
        "size": 2048
    },
    "names": [
        {
            "C": "CN",
            "L": "Beijing",
            "ST": "Beijing"
        }
    ]}
EOF

3.服务端证书

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
cat << EOF | tee server-csr.json
{
    "CN": "mysite",
    "hosts": [
        "127.0.0.1"
    ],
    "key": {
        "algo": "rsa",
        "size": 2048
    },
    "names": [
        {
            "C": "CN",
            "L": "Beijing",
            "ST": "Beijing"
        }
    ]}
EOF

生成 mysite ca 证书和私钥,初始化 ca

1
cfssl gencert -initca ca-csr.json | cfssljson -bare ca

生成server证书

1
cfssl gencert -ca=ca.pem -ca-key=ca-key.pem -config=ca-config.json -profile=mysite -hostname=mysite server-csr.json | cfssljson -bare server

最后的结果为:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
../ssl
├── ca-config.json
├── ca-csr.json
├── ca-key.pem
├── ca.csr
├── ca.pem
├── server-csr.json
├── server-key.pem
├── server.csr
└── server.pem

接下来是代码实现,先是服务端:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// ...
type ecServer struct {
	pb.UnimplementedEchoServer
}

func (s *ecServer) UnaryEcho(ctx context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) {
	return &pb.EchoResponse{Message: req.Message}, nil
}

func main() {
	lis, err := net.Listen("tcp", "127.0.0.1:50001")
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}

	// Create tls based credential.
	cert, err := tls.LoadX509KeyPair("ssl/server.pem", "ssl/server-key.pem")
	if err != nil {
		log.Fatalf("tls.LoadX509KeyPair err: %v", err)
	}

	certPool := x509.NewCertPool()
	ca, err := ioutil.ReadFile("ssl/ca.pem")
	if err != nil {
		log.Fatalf("ioutil.ReadFile err: %v", err)
	}

	if ok := certPool.AppendCertsFromPEM(ca); !ok {
		log.Fatalf("certPool.AppendCertsFromPEM err")
	}

	creds := credentials.NewTLS(&tls.Config{
		Certificates: []tls.Certificate{cert},
		ClientAuth:   tls.RequireAndVerifyClientCert,
		ClientCAs:    certPool,
	})

	s := grpc.NewServer(grpc.Creds(creds))

	// Register EchoServer on the server.
	pb.RegisterEchoServer(s, &ecServer{})

	log.Println("server start")
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

然后是客户端:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// ...
func callUnaryEcho(client pb.EchoClient, message string) {
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	resp, err := client.UnaryEcho(ctx, &pb.EchoRequest{Message: message})
	if err != nil {
		log.Fatalf("client.UnaryEcho(_) = _, %v: ", err)
	}
	fmt.Println("UnaryEcho: ", resp.Message)
}

func main() {
	// Create tls based credential.
	cert, err := tls.LoadX509KeyPair("ssl/server.pem", "ssl/server-key.pem")
	if err != nil {
		log.Fatalf("tls.LoadX509KeyPair err: %v", err)
	}

	certPool := x509.NewCertPool()
	ca, err := ioutil.ReadFile("ssl/ca.pem")
	if err != nil {
		log.Fatalf("ioutil.ReadFile err: %v", err)
	}

	if ok := certPool.AppendCertsFromPEM(ca); !ok {
		log.Fatalf("certPool.AppendCertsFromPEM err")
	}

	creds := credentials.NewTLS(&tls.Config{
		Certificates: []tls.Certificate{cert},
		ServerName:   "mysite",
		RootCAs:      certPool,
	})

	// Set up a connection to the server.
	conn, err := grpc.Dial("127.0.0.1:50001", grpc.WithTransportCredentials(creds))
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()

	// Make a echo client and send an RPC.
	rgc := pb.NewEchoClient(conn)
	callUnaryEcho(rgc, "hello world")
}

拦截器

grpc 支持服务端和客户端的拦截器,可以在请求发起或返回前进行处理,而不用修改原来的代码。接下来来看服务端和客户端各自怎么使用拦截器:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// unary 请求拦截器
func UnaryInterceptor(ctx context.Context,
	req interface{},
	info *grpc.UnaryServerInfo,
	handler grpc.UnaryHandler,
) (resp interface{}, err error) {
	var ip string
	p, ok := peer.FromContext(ctx)
	if ok {
		ip = p.Addr.String()
	}
	md, _ := metadata.FromIncomingContext(ctx)
	start := time.Now()
	resp, err = handler(ctx, req)
	end := time.Now()
	log.Printf("%10s | %14s | %10v | md=%v | reply = %v", ip, info.FullMethod, end.Sub(start), md, resp)
	return
}

// stream 请求拦截器
func StreamInterceptor(srv interface{},
	ss grpc.ServerStream,
	info *grpc.StreamServerInfo,
	handler grpc.StreamHandler,
) (err error) {
	var ip string
	p, ok := peer.FromContext(ss.Context())
	if ok {
		ip = p.Addr.String()
	}
	err = handler(srv, ss)
	log.Printf("stream %v | %v | %s\\n", srv, ip, info.FullMethod)
	return
}

type server struct {
	pb.UnimplementedEchoServer
}

func (s *server) UnaryEcho(ctx context.Context, request *pb.EchoRequest) (*pb.EchoResponse, error) {
	return &pb.EchoResponse{Message: request.Message}, nil
}

func (s *server) BidirectionalStreamingEcho(stream pb.Echo_BidirectionalStreamingEchoServer) error {
	ctx := stream.Context()
	for {
		select {
		case <-ctx.Done():
			break
		default:
		}

		msg, err := stream.Recv()
		if errors.Is(err, io.EOF) {
			break
		}
		if err != nil {
			log.Printf("recv failed: %v\\n", err)
		}

		if err := stream.Send(&pb.EchoResponse{Message: "reply: " + msg.Message}); err != nil {
			log.Printf("send to client: %v\\n", err)
		}
	}

	return nil
}

func main() {
	addr := "127.0.0.1:50001"

	lis, err := net.Listen("tcp", addr)
	if err != nil {
		log.Fatalf("network at %v: %v\\n", addr, err)
	}

	s := grpc.NewServer(grpc.ChainUnaryInterceptor(UnaryInterceptor), grpc.ChainStreamInterceptor(StreamInterceptor))
	pb.RegisterEchoServer(s, &server{})

	if err := s.Serve(lis); err != nil {
		log.Fatalf("start server at %v: %v\\n", addr, err)
	}
}

grpc 中的拦截器分两种,一元请求的拦截器和流式请求的拦截器。其中流式请求的连接器同时作用于服务端流式、客户端流式和双向流式三种请求模式。 接下来是客户端:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
func clientUnaryInterceptor(
	ctx context.Context,
	method string,
	req, reply interface{},
	cc *grpc.ClientConn,
	invoker grpc.UnaryInvoker,
	opts ...grpc.CallOption,
) (err error) {

	ctx = metadata.AppendToOutgoingContext(ctx, "username", "OOB")
	err = invoker(ctx, method, req, reply, cc, opts...)
	return
}

func clientStreamInterceptor(ctx context.Context,
	desc *grpc.StreamDesc,
	cc *grpc.ClientConn,
	method string,
	streamer grpc.Streamer,
	opts ...grpc.CallOption,
) (stream grpc.ClientStream, err error) {

	// before stream
	stream, err = streamer(ctx, desc, cc, method, opts...)
	// after stream
	return
}

func callUnaryEcho(cc pb.EchoClient, msg string) {
	reply, err := cc.UnaryEcho(context.Background(), &pb.EchoRequest{Message: msg})
	if err == nil {
		log.Printf("reply => %v\\n", reply)
	}
}

func callBidirectionalEcho(cc pb.EchoClient, msg string) {
	stream, err := cc.BidirectionalStreamingEcho(context.TODO())
	if err != nil {
		log.Fatalf("call BidirectionalEcho: %v\\n", err)
	}

	_ = stream.Send(&pb.EchoRequest{Message: msg})
	_ = stream.CloseSend()
	ctx := stream.Context()
	for {
		select {
		case <-ctx.Done():
			break
		default:
		}

		reply, err := stream.Recv()
		if errors.Is(err, io.EOF) {
			break
		}
		if err != nil {
			log.Fatalf("stream recv: %v\\n", err)
		}
		log.Printf("stream reply => %v\\n", reply.Message)
	}
}

func main() {
	addr := "127.0.0.1:50001"

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	conn, err := grpc.DialContext(
		ctx,
		addr,
		grpc.WithInsecure(),
		grpc.WithChainUnaryInterceptor(clientUnaryInterceptor),
		grpc.WithChainStreamInterceptor(clientStreamInterceptor))
	if err != nil {
		log.Fatalf("connect %v: %v\\n", addr, err)
	}

	cc := pb.NewEchoClient(conn)

	callUnaryEcho(cc, "unary")

	callBidirectionalEcho(cc, "start")
}

grpc 的拦截器同时支持单个拦截器和链式拦截器。

grpc 添加 pprof 接口

grpc 本身是使用 http2 作为底层协议,所以它也能和 golang 的 pprof 结合提供 pprof 接口。下面给出代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
type server struct {
	pb.UnimplementedEchoServer
}

func (s *server) UnaryEcho(ctx context.Context, request *pb.EchoRequest) (*pb.EchoResponse, error) {
	return &pb.EchoResponse{Message: request.Message}, nil
}

func (s *server) BidirectionalStreamingEcho(stream pb.Echo_BidirectionalStreamingEchoServer) error {
	ctx := stream.Context()
	for {
		select {
		case <-ctx.Done():
			break
		default:
		}

		msg, err := stream.Recv()
		if errors.Is(err, io.EOF) {
			break
		}
		if err != nil {
			log.Printf("recv failed: %v\\n", err)
		}

		if err := stream.Send(&pb.EchoResponse{Message: "reply: " + msg.Message}); err != nil {
			log.Printf("send to client: %v\\n", err)
		}
	}

	return nil
}

func main() {
	addr := "127.0.0.1:50001"

  // 这里可以添加服务段启动配置和各种拦截器
	s := grpc.NewServer() 
	pb.RegisterEchoServer(s, &server{})

	mux := http.NewServeMux()
	mux.HandleFunc("/debug/pprof/", pprof.Index)
	mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
	mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
	mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
	mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
  // 启动 http2 服务,golang http 启动时添加证书会自动转化为 http2 服务。
  // 将 Content-Type 为 application/grpc 请求转交给 grpc 即可。
	err := http.ListenAndServeTLS(
		addr,
		"ssl/server.pem",
		"ssl/server-key.pem",
		http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
		if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
			log.Println("call grpc service")
			s.ServeHTTP(rw, r)
		} else {
			mux.ServeHTTP(rw, r)
		}
	}))
	if err != nil {
		log.Fatalf("start server at %v: %v", addr, err)
	}
}

grpc 请求断开处理

grpc 的请求没有自己设置请求的超时时间,而是将这部分的处理交给 golang 的 context 包。通过 context 的功能实现客户端的登录超时,请求超时。服务端代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
type server struct {
	pb.UnimplementedEchoServer
}

func (s *server) BidirectionalStreamingEcho(stream pb.Echo_BidirectionalStreamingEchoServer) error {
	// 该函数内是 stream 的整个生命周期,该函数退出后,stream 的上下文结束
	// 每个stream函数相互独立
	// 服务端的 stream 不能直接发起请求终止,但可以通过提前结束该函数,停止该 stream
	for {
		in, err := stream.Recv()
		if err != nil {
			fmt.Printf("server: error receiving from stream: %v\n", err)
			if err == io.EOF {
				return nil
			}
			return err
		}
		fmt.Printf("echoing message %q\n", in.Message)
		stream.Send(&pb.EchoResponse{Message: in.Message})
	}
}

func main() {
	lis, err := net.Listen("tcp", "127.0.0.1:10050")
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	fmt.Printf("server listening at port %v\n", lis.Addr())
	s := grpc.NewServer()
	pb.RegisterEchoServer(s, &server{})
	s.Serve(lis)
}

客户端:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
func sendMessage(stream pb.Echo_BidirectionalStreamingEchoClient, msg string) error {
	fmt.Printf("sending message %q\n", msg)
	return stream.Send(&pb.EchoRequest{Message: msg})
}

func recvMessage(stream pb.Echo_BidirectionalStreamingEchoClient, wantErrCode codes.Code) {
	res, err := stream.Recv()
	if status.Code(err) != wantErrCode {
		log.Fatalf("stream.Recv() = %v, %v; want _, status.Code(err)=%v", res, err, wantErrCode)
	}
	if err != nil {
		fmt.Printf("stream.Recv() returned expected error %v\n", err)
		return
	}
	fmt.Printf("received message %q\n", res.Message)
}

func main() {
	addr := "127.0.0.1:10050"
	// 建立连接
	// 建立连接的 ctx 和请求的 ctx 是独立的
	conn, err := grpc.DialContext(context.Background(), addr, grpc.WithInsecure())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()

	c := pb.NewEchoClient(conn)

	// Initiate the stream with a context that supports cancellation.
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	stream, err := c.BidirectionalStreamingEcho(ctx)
	if err != nil {
		log.Fatalf("error creating stream: %v", err)
	}

	// Send some test messages.
	if err := sendMessage(stream, "hello"); err != nil {
		log.Fatalf("error sending on stream: %v", err)
	}
	if err := sendMessage(stream, "world"); err != nil {
		log.Fatalf("error sending on stream: %v", err)
	}

	// Ensure the RPC is working.
	recvMessage(stream, codes.OK)
	recvMessage(stream, codes.OK)

	fmt.Println("cancelling context")
	cancel()

	// This Send may or may not return an error, depending on whether the
	// monitored context detects cancellation before the call is made.
	sendMessage(stream, "closed")

	// This Recv should never succeed.
	recvMessage(stream, codes.Canceled)
}

GRPC 性能优化

虽然 grpc 的官方自诩是高性能的框架,但是 grpc 内部使用大量的反射,使得 grpc 在性能上并不算很好,所以还是有必要优化。grpc 的优化思路比较简单,不需要直接修改源码,只需要在 protoc 命令生成 golang 代码是,将 golang/protobuf 换成第三方的 gogo/protobuf 。gogo库基于官方库开发,增加了很多的功能,包括:

  • 快速的序列化和反序列化
  • 更规范的Go数据结构
  • goprotobuf兼容
  • 可选择的产生一些辅助方法,减少使用中的代码输入
  • 可以选择产生测试代码和benchmark代码
  • 其它序列化格式

比如etcd、k8s、dgraph、docker swarmkit都使用它。基于速度和定制化的考虑,gogo有三种产生代码的方式

  • gofast: 速度优先,不支持其它gogoprotobuf extensions。
1
2
go get github.com/gogo/protobuf/protoc-gen-gofast
protoc --gofast_out=. myproto.proto
  • gogofast类似gofast,但是会导入gogoprotobuf
  • gogofaster类似gogofast, 不会产生XXX_unrecognized指针字段,可以减少垃圾回收时间。
  • gogoslick类似gogofaster,但是可以增加一些额外的方法gostringequal等等。
1
2
3
4
go get github.com/gogo/protobuf/proto
go get github.com/gogo/protobuf/{binary} //protoc-gen-gogofast、protoc-gen-gogofaster 、protoc-gen-gogoslick 
go get github.com/gogo/protobuf/gogoproto
protoc -I=. -I=$GOPATH/src -I=$GOPATH/src/github.com/gogo/protobuf/protobuf --{binary}_out=. myproto.proto
  • protoc-gen-gogo: 最快的速度,最多的可定制化

你可以通过扩展定制序列化: 扩展.

1
2
3
4
go get github.com/gogo/protobuf/proto
go get github.com/gogo/protobuf/jsonpb
go get github.com/gogo/protobuf/protoc-gen-gogo
go get github.com/gogo/protobuf/gogoproto

gogo同样支持grpc: protoc --gofast_out=plugins=grpc:. my.proto。同时还有 protobuf 对应的教程