Eli's Blog

1. RPC

1.1 什么是RPC

RPC: Remote Procedure Call,远程过程调用。调用过程包括传输协议和对象编码(序列化)。

1.2 RPC框架

  • 负载均衡
  • 服务注册和发现
  • 服务治理

1.3 为什么使用RPC

简单、通用、安全、效率

2. Protobuf

Protocol Buffers 是一种与语言、平台无关,可扩展的序列化结构化数据的方法,常用于通信协议、数据存储等。相较于JSON、XML,它更小、更快、更简单。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
syntax = "proto3";

service SearchService {
rpc Search (SearchRequest) returns (SearchResponse);
}

message SearchRequest {
string query = 1;
int32 page_number = 2;
int32 result_per_page = 3;
}

message SearchResponse {
...
}

3. gRPC

gRPC 是一个高性能、开源和通用的RPC框架,面向移动和 HTTP/2 设计

特点:

  • HTTP/2
  • Protobuf
  • 客户端、服务端基于同一份IDL
  • 移动网络支持良好
  • 支持多语言

3.1 安装

gRPC:

1
go get -u google.golang.org/grpc

Protocol Buffers v3:

1
2
brew search protobuf
brew install protobuf@3.6

Protoc Plugin:

1
2
3
4
5
# 会自动编译安装protoc-gen-go可执行插件文件
go get -u github.com/golang/protobuf/protoc-gen-go

# 编译安装 (不要做这个操作,应该使用上面一个protoc-gen-go)
#go install google.golang.org/protobuf/cmd/protoc-gen-go

3.2 入门

3.2.1 编写 IDL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
syntax = "proto3";

option go_package = ".;proto"; // 重要

package proto;

service SearchService {
rpc Search(SearchRequest) returns (SearchResponse) {}
}

message SearchRequest {
string request = 1;
}

message SearchResponse {
string response = 1;
}

3.2.2 生成 pb.go文件

1
2
3
4
protoc --go_out=. *.proto

# 比前一个多了注册函数等
protoc --go_out=plugins=grpc:. *.proto

3.2.3 服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type SearchService struct{}

func (s *SearchService) Search(ctx context.Context, r *pb.SearchRequest) (*pb.SearchResponse, error) {
return &pb.SearchResponse{Response: r.GetRequest() + " Server"}, nil
}

const HOST = ":9001"

func main() {
server := grpc.NewServer()
pb.RegisterSearchServiceServer(server, &SearchService{})

ln, err := net.Listen("tcp", HOST)
if err != nil {
log.Fatalf("net.Listen err: %v", err)
}

server.Serve(ln)
}

3.2.4 客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func main() {
conn, err := grpc.Dial(HOST, grpc.WithInsecure())
if err != nil {
log.Fatalf("grpc.Dail err: %v", err)
}
defer conn.Close()

client := pb.NewSearchServiceClient(conn)
resp, err := client.Search(context.Background(), &pb.SearchRequest{
Request: "gRPC",
})
if err != nil {
log.Fatalf("client.Search err: %v", err)
}

log.Printf("resp: %s", resp.GetResponse())
}

4. gRPC 流

gRPC 的流式,有三种类型:

  • Server-side Streaming
  • Client-side Streaming
  • Bidirectional Streaming

适合用 Streaming RPC 的场景:

  • 大规模数据包
  • 实时场景

4.1 IDL 和 基础模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
syntax = "proto3";

option go_package = ".;proto";

package proto;

service StreamService {
rpc List(StreamRequest) returns (stream StreamResponse) {};

rpc Record(stream StreamRequest) returns (stream StreamResponse) {};

rpc Route(stream StreamRequest) returns (stream StreamResponse) {};
}

message StreamPoint {
string name = 1;
int32 value = 2;
}

message StreamRequest {
StreamPoint pt = 1;
}

message StreamResponse {
StreamPoint pt = 1;
}

服务器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func main() {
server := grpc.NewServer()
pb.RegisterStreamServiceServer(server, &StreamService{})

ln, err := net.Listen("tcp", ":9002")
if err != nil {
log.Fatalf("net.Listen err: %v", err)
}

server.Serve(ln)
}

func (s *StreamService) List(r *pb.StreamRequest, stream pb.StreamService_ListServer) error {
return nil
}

func (s *StreamService) Record(stream pb.StreamService_RecordServer) error {
return nil
}

func (s *StreamService) Route(stream pb.StreamService_RouteServer) error {
return nil
}

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
func main() {
conn, err := grpc.Dial(":9002", grpc.WithInsecure())
if err != nil {
log.Fatalf("grpc.Dial err: %v", err)
}
defer conn.Close()

client := pb.NewStreamServiceClient(conn)

err = printList(client, &pb.StreamRequest{
Pt: &pb.StreamPoint{
Name: "gRPC Stream Client: List",
Value: 2020,
},
})
if err != nil {
log.Fatalf("printList.err: %v", err)
}

err = printRecord(client, &pb.StreamRequest{
Pt: &pb.StreamPoint{
Name: "gRPC Stream Client: Record",
Value: 2020,
},
})
if err != nil {
log.Fatalf("printRecord.err: %v", err)
}

err = printRoute(client, &pb.StreamRequest{
Pt: &pb.StreamPoint{
Name: "gRPC Stream Client: Route",
Value: 2020,
},
})
if err != nil {
log.Fatalf("printRoute.err: %v", err)
}
}

func printList(client pb.StreamServiceClient, r *pb.StreamRequest) error {
return nil
}

func printRecord(client pb.StreamServiceClient, r *pb.StreamRequest) error {
return nil
}

func printRoute(client pb.StreamServiceClient, r *pb.StreamRequest) error {
return nil
}

4.2 服务器端流式 RPC

  • 单向流
  • Server 为 Stream,多次向客户端发送数据
  • Client 为普通 RPC 请求

4.2.1 服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (s *StreamService) List(r *pb.StreamRequest, stream pb.StreamService_ListServer) error {
for n := 0; n <= 6; n++ {
err := stream.Send(&pb.StreamResponse{
Pt: &pb.StreamPoint{
Name: r.Pt.Name,
Value: r.Pt.Value + int32(n),
},
})

if err != nil {
return nil
}
}

return nil
}

stream.Send() 方法:

1
2
3
4
5
6
7
8
type StreamService_ListServer interface {
Send(*StreamResponse) error
grpc.ServerStream
}

func (x *streamServiceListServer) Send(m *StreamResponse) error {
return x.ServerStream.SendMsg(m)
}

SendMsg() 方法:

  • 消息体(对象)序列化
  • 压缩序列化后的消息体
  • 对正在传输的消息体增加5个字节的header
  • 判断消息体总长度是否大于预设的maxSendMessageSize (默认math.MaxInt32),超过则报错
  • 写入给流的数据集

4.2.2 客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func printList(client pb.StreamServiceClient, r *pb.StreamRequest) error {
stream, err := client.List(context.Background(), r)
if err != nil {
return err
}

for {
resp, err := stream.Recv()
if err == io.EOF {
break
}

if err != nil {
return err
}

log.Printf("resp: pt.name: %s, pt.value: %d\n",
resp.Pt.Name, resp.Pt.Value)
}

return nil
}

stream.Recv()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
type StreamService_ListClient interface {
Recv() (*StreamResponse, error)
grpc.ClientStream
}

func (x *streamServiceListClient) Recv() (*StreamResponse, error){
m := new(StreamResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}

return m, nil
}

RecvMsg()方法:

  • 阻塞等待
  • 流结束 (Close)时,返回 io.EOF
  • 可能的错误
    • io.EOF
    • io.ErrUnexpectedEOF
    • transport.ConnectionError
    • google.golang.org/grpc/codes

4.3 客户端流式RPC

  • 单向流
  • 客户端多次RPC请求服务端
  • 服务端发起一次响应给客户端

4.3.1 服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (s *StreamService) Record(stream pb.StreamService_RecordServer) error {
for {
r, err := stream.Recv()
if err == io.EOF {
return stream.Send(&pb.StreamResponse{
Pt: &pb.StreamPoint{
Name: "gRPC Stream Server: Record",
Value: 1,
},
})
}
if err != nil {
return err
}

log.Printf("stream.Recv pt.name: %s, pt.value: %d", r.Pt.Name, r.Pt.Value)
}
return nil
}

4.3.2 客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func printRecord(client pb.StreamServiceClient, r *pb.StreamRequest) error {
stream, err := client.Record(context.Background())
if err != nil {
return err
}

for n := 0; n < 6; n++ {
err := stream.Send(r)
if err != nil {
return err
}
}

// 主动关闭send
err = stream.CloseSend()
if err != nil {
return err
}

resp, err := stream.Recv()
if err != nil {
return nil
}

log.Printf("resp: pt.name: %s, pt.value: %d", resp.Pt.Name, resp.Pt.Value)

return nil
}

4.4 双向流RPC

4.4.1 服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (s *StreamService) Route(stream pb.StreamService_RouteServer) error {
n := 0
for {
err := stream.Send(&pb.StreamResponse{
Pt: &pb.StreamPoint{
Name: "gPRC Stream Client: Route",
Value: int32(n),
},
})
if err != nil {
return err
}

r, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}

n++

log.Printf("stream.Recv pt.name: %s, pt.value: %d",
r.Pt.Name, r.Pt.Value)
}
return nil
}

4.4.2 客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func printRoute(client pb.StreamServiceClient, r *pb.StreamRequest) error {
stream, err := client.Route(context.Background())
if err != nil {
return err
}

for n := 0; n < 6; n++ {
err = stream.Send(r)
if err != nil {
return err
}

resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}

log.Printf("resp: pt.name: %s, pt.value %d",
resp.Pt.Name, resp.Pt.Value)
}

stream.CloseSend()

return nil
}

5. TLS 证书认证

5.1 生成证书

5.1.1 私钥

1
openssl ecparam -genkey -name secp384r1 -out server.key

5.1.2 自签公钥

1
openssl req -new -x509 -sha256 -key server.key -out server.pem -days 3650

5.2 服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type SearchService struct{}

func (s *SearchService) Search(ctx context.Context, r *pb.SearchRequest) (*pb.SearchResponse, error) {
return &pb.SearchResponse{Response: r.GetRequest() + " Server"}, nil
}

const HOST = ":9001"

func main() {
// 1. 支持TLS
creds, err := credentials.NewServerTLSFromFile("../certs/server.pem", "../certs/server.key")
if err != nil {
log.Fatalf("credentials.NewServerTLSFromFile err: %v", err)
}

// 2. 加入认证
server := grpc.NewServer(grpc.Creds(creds))

pb.RegisterSearchServiceServer(server, &SearchService{})

ln, err := net.Listen("tcp", HOST)
if err != nil {
log.Fatalf("net.Listen err: %v", err)
}

server.Serve(ln)
}

5.3 客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
const HOST = ":9001"

func main() {
// 1. 支持TLS
creds, err := credentials.NewClientTLSFromFile("../certs/server.pem", "go-grpc-example")
if err != nil {
log.Fatalf("credentials.NewClientTLSFromFile err: %v", err)
}

// 2. 传输认证
conn, err := grpc.Dial(HOST, grpc.WithTransportCredentials(creds))
if err != nil {
log.Fatalf("grpc.Dail err: %v", err)
}
defer conn.Close()

client := pb.NewSearchServiceClient(conn)
resp, err := client.Search(context.Background(), &pb.SearchRequest{
Request: "gRPC",
})
if err != nil {
log.Fatalf("client.Search err: %v", err)
}

log.Printf("resp: %s", resp.GetResponse())
}

6. 基于 CA 的 TLS 证书认证

6.1 CA

6.1.1 生成CA证书

根证书(root certificate)是属于根证书颁发机构(CA)的公钥证书。可以通过验证CA的签名从而信任CA,任何人都可以得到CA的证书(含公钥),用以验证它所签发的证书。

1
2
3
4
5
# 生成Key
openssl genrsa -out ca.key 2048

# 生成密钥
openssl req -new -x509 -days 7200 -key ca.key -out ca.pem

6.1.2 服务端证书

CSR: Cerificate Signing Request,证书请求文件。主要作用是 CA 会利用 CSR 文件进行签名使得攻击者无法伪装或篡改原有证书。

1
2
3
4
5
# 生成CSR
openssl req -new -key server.key -out server.csr

# 基于CA签发
openssl x509 -req -sha256 -CA ca.pem -CAkey ca.key -CAcreateserial -days 3650 -in server.csr -out server.pem

6.1.3 客户端证书

1
2
3
4
5
6
7
8
# 生成Key
openssl ecparam -genkey -name secp384r1 -out client.key

# 生成CSR
openssl req -new -key client.key -out client.csr

# 基于CA签发
openssl x509 -req -sha256 -CA ca.pem -CAkey ca.key -CAcreateserial -days 3650 -in client.csr -out client.pem

6.2 TLS认证代码

6.2.1 服务端认证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
type Server struct {
CaFile string
CertFile string
KeyFile string
}

func (t *Server) GetCredentialsByCA() (credentials.TransportCredentials, error) {
cert, err := tls.LoadX509KeyPair(t.CertFile, t.KeyFile)
if err != nil {
return nil, err
}

ca, err := ioutil.ReadFile(t.CaFile)
if err != nil {
return nil, err
}

certPool := x509.NewCertPool()
if ok := certPool.AppendCertsFromPEM(ca); !ok {
return nil, errors.New("certPool.AppendCertsFromPEM err")
}

c := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: certPool,
})

return c, nil
}

func (t *Server) GetTLSCredentials() (credentials.TransportCredentials, error) {
c, err := credentials.NewServerTLSFromFile(t.CertFile, t.KeyFile)
if err != nil {
return nil, err
}

return c, nil
}

6.2.2 客户端认证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
type Client struct {
ServerName string
CaFile string
CertFile string
KeyFile string
}

func (t *Client) GetCredentialsByCA() (credentials.TransportCredentials, error) {
cert, err := tls.LoadX509KeyPair(t.CertFile, t.KeyFile)
if err != nil {
return nil, err
}

ca, err := ioutil.ReadFile(t.CaFile)
if err != nil {
return nil, err
}

certPool := x509.NewCertPool()
if ok := certPool.AppendCertsFromPEM(ca); !ok {
return nil, errors.New("certPool.AppendCertsFromPEM err")
}

c := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
ServerName: t.ServerName,
RootCAs: certPool,
})

return c, nil
}

func (t *Client) GetTLSCredentials() (credentials.TransportCredentials, error) {
c, err := credentials.NewClientTLSFromFile(t.CertFile, t.ServerName)
if err != nil {
return nil, err
}

return c, nil
}

6.3 实现代码

6.3.1 服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
type SearchService struct{}

func (s *SearchService) Search(ctx context.Context, r *pb.SearchRequest) (*pb.SearchResponse, error) {
return &pb.SearchResponse{Response: r.GetRequest() + " Server"}, nil
}

func main() {
tlsServer := gtls.Server{
CaFile: "../../certs/ca.pem",
CertFile: "../../certs/server.pem",
KeyFile: "../../certs/server.key",
}

c, err := tlsServer.GetCredentialsByCA()
if err != nil {
log.Fatalf("tlsServer.GetCredentialsByCA err: %v", err)
}

server := grpc.NewServer(grpc.Creds(c))

pb.RegisterSearchServiceServer(server, &SearchService{})

ln, err := net.Listen("tcp", ":9001")
if err != nil {
log.Fatalf("net.Listen err: %v", err)
}

server.Serve(ln)
}

6.3.2 客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func main() {
tlsClient := gtls.Client{
ServerName: "go-grpc-example",
CaFile: "../../certs/ca.pem",
CertFile: "../../certs/client.pem",
KeyFile: "../../certs/client.key",
}

c, err := tlsClient.GetCredentialsByCA()
if err != nil {
log.Fatalf("tlsClient.GetCredentialsByCA err: %v", err)
}

conn, err := grpc.Dial(":9001", grpc.WithTransportCredentials(c))
if err != nil {
log.Fatalf("grpc.Dail err: %v", err)
}
defer conn.Close()

client := pb.NewSearchServiceClient(conn)
resp, err := client.Search(context.Background(), &pb.SearchRequest{
Request: "gRPC",
})
if err != nil {
log.Fatalf("client.Search err: %v", err)
}

log.Printf("resp: %s", resp.GetResponse())
}

大致流程:

  • Client 通过请求得到 Server 端的证书
  • 使用 CA 认证的根证书对 Server 端证书进行可靠性、有效性等校验
  • 校验 ServerName 是否有效
  • 同样,在设置了 tls.RequireAndVerifyClientCert 模式下,Server 也会使用 CA 认证的根证书对Client的证书进行可靠性、有效性校验。

6.4 补充知识点:ssl/tls 单向认证双向认证

  • 单向认证:只有一个对象校验对端的证书合法性。通常client来校验服务器的合法性。那么client需要一个ca.crt,服务器需要server.crt,server.key。
  • 双向认证:相互校验,服务器需要校验每个client,client也需要校验服务器。server 需要 server.key、server.crt、ca.crt,client 需要 client.key、client.crt、ca.crt。

7. 拦截器

7.1 Unary and Stream interceptor

  • 普通方法:一元拦截器 grpc.UnaryInterceptor
  • 流方法:流拦截器 grpc.StreamInterceptor

7.1.1 grpc.UnaryInterceptor

1
2
3
4
5
6
7
8
9
10
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
return func(o *options) {
if o.unaryInt != nil {
panic("The unary server interceptor was already set and may not be reset.")
}
o.unaryInt = i
}
}

type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)

7.1.2 grpc.StreamInterceptor

1
2
3
func StreamInterceptor(i StreamServerInterceptor) ServerOptions

type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error

7.2 实现多个拦截器

gRPC本身只能设置一个拦截器,但可以采用go-grpc-middleware项目来解决问题

1
2
3
4
5
6
7
8
9
10
import "github.com/grpc-ecosystem/go-grpc-middleware"

myServer := grpc.NewServer(
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
...
)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
...
)),
)

7.3 实现 logging 和 recover 拦截器

7.3.1 logging

1
2
3
4
5
6
func LoggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
log.Printf("gRPC method: %s, %v", info.FullMethod, req)
resp, err := handler(ctx, req)
log.Printf("gRPC method: %s, %v", info.FullMethod, resp)
return resp, err
}

7.3.2 recover

1
2
3
4
5
6
7
8
9
10
func RecoveryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
defer func() {
if e := recover(); e != nil {
debug.PrintStack()
err = status.Errorf(codes.Internal, "Panic err: %v", e)
}
}()

return handler(ctx, req)
}

7.3.3 完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
type SearchService struct{}

func (s *SearchService) Search(ctx context.Context, r *pb.SearchRequest) (*pb.SearchResponse, error) {
return &pb.SearchResponse{Response: r.GetRequest() + " Server"}, nil
}

func main() {
tlsServer := gtls.Server{
CaFile: "../../certs/ca.pem",
CertFile: "../../certs/server.pem",
KeyFile: "../../certs/server.key",
}

c, err := tlsServer.GetCredentialsByCA()
if err != nil {
log.Fatalf("tlsServer.GetCredentialsByCA err: %v", err)
}

// 服务选项
opts := []grpc.ServerOption{
grpc.Creds(c),
grpc_middleware.WithUnaryServerChain(
RecoveryInterceptor,
LoggingInterceptor,
),
}

server := grpc.NewServer(opts...)

pb.RegisterSearchServiceServer(server, &SearchService{})

ln, err := net.Listen("tcp", ":9001")
if err != nil {
log.Fatalf("net.Listen err: %v", err)
}

server.Serve(ln)
}

func LoggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
log.Printf("gRPC method: %s, %v", info.FullMethod, req)
resp, err := handler(ctx, req)
log.Printf("gRPC method: %s, %v", info.FullMethod, resp)
return resp, err
}

func RecoveryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
defer func() {
if e := recover(); e != nil {
debug.PrintStack()
err = status.Errorf(codes.Internal, "Panic err: %v", e)
}
}()

return handler(ctx, req)
}

8. 同时提供 HTTP 服务

8.1 服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
type SearchService struct{}

func (s *SearchService) Search(ctx context.Context, r *pb.SearchRequest) (*pb.SearchResponse, error) {
return &pb.SearchResponse{Response: r.GetRequest() + " Server"}, nil
}

func main() {
http.ListenAndServeTLS(
":9003",
"../../certs/server.pem",
"../../certs/server.key",
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
GetHTTPServeGrpc().ServeHTTP(w, r)
} else {
GetHTTPServeMux().ServeHTTP(w, r)
}
}),
)
}

func GetHTTPServeGrpc() *grpc.Server {
tlsServer := gtls.Server{
CertFile: "../../certs/server.pem",
KeyFile: "../../certs/server.key",
}

c, err := tlsServer.GetTLSCredentials()
if err != nil {
log.Fatalf("tlsServer.GetTLSCredentials err: %v", err)
}
server := grpc.NewServer(grpc.Creds(c))
pb.RegisterSearchServiceServer(server, &SearchService{})

return server
}

func GetHTTPServeMux() *http.ServeMux {
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("result: go-grpc-example"))
})

return mux
}

8.2 gRPC 客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func main() {
tlsClient := gtls.Client{
ServerName: "go-grpc-example",
CertFile: "../../certs/server.pem",
}

c, err := tlsClient.GetTLSCredentials()
if err != nil {
log.Fatalf("tlsClient.GetTLSCredentials err: %v", err)
}

conn, err := grpc.Dial(":9003", grpc.WithTransportCredentials(c))
if err != nil {
log.Fatalf("grpc.Dail err: %v", err)
}
defer conn.Close()

client := pb.NewSearchServiceClient(conn)
resp, err := client.Search(context.Background(), &pb.SearchRequest{
Request: "gRPC",
})
if err != nil {
log.Fatalf("client.Search err: %v", err)
}

log.Printf("resp: %s", resp.GetResponse())
}

8.3 http/1.1 直接访问

1
2
3
curl -k --cert client.pem --key client.key https://127.0.0.1:9003

curl -k --cacert ca.pem https://127.0.0.1:9003

9. 自定义认证

9.1 自定义认证接口

1
2
3
4
5
6
7
type PerRPCCredentials interface {
// 获取当前请求认证所需的元数据 (metadata)
GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error)

// 是否需要基于TLS认证安全传输
RequireTransportSecurity() bool
}

9.2 服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
type SearchService struct {
auth *Auth
}

func (s *SearchService) Search(ctx context.Context, r *pb.SearchRequest) (*pb.SearchResponse, error) {
if err := s.auth.Check(ctx); err != nil {
return nil, err
}

return &pb.SearchResponse{Response: r.GetRequest() + " Server"}, nil
}

func main() {
tlsServer := gtls.Server{
CertFile: "../../certs/server.pem",
KeyFile: "../../certs/server.key",
}

c, err := tlsServer.GetTLSCredentials()
if err != nil {
log.Fatalf("tlsServer.GetTLSCredentials err: %v", err)
}

server := grpc.NewServer(grpc.Creds(c))

pb.RegisterSearchServiceServer(server, &SearchService{})

ln, err := net.Listen("tcp", ":9004")
if err != nil {
log.Fatalf("net.Listen err: %v", err)
}

server.Serve(ln)
}

type Auth struct {
appKey string
appSecret string
}

func (a *Auth) Check(ctx context.Context) error {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return status.Errorf(codes.Unauthenticated, "metadata.FromIncomingContext err")
}

var (
appKey string
appSecret string
)

if value, ok := md["app_key"]; ok {
appKey = value[0]
}

if value, ok := md["app_secret"]; ok {
appSecret = value[0]
}

if appKey != a.GetAppKey() || appSecret != a.GetAppSecret() {
return status.Errorf(codes.Unauthenticated, "invalid token")
}

return nil
}

func (a *Auth) GetAppKey() string {
return "wx20200719163021"
}

func (a *Auth) GetAppSecret() string {
return "7d13b90ae8e40c0160209c4a985b3bdf01321b15"
}

9.3 客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
type Auth struct {
AppKey string
AppSecret string
}

func (a *Auth) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"app_key": a.AppKey,
"app_secret": a.AppSecret,
}, nil
}

func (a *Auth) RequireTransportSecurity() bool {
return true
}

func main() {
tlsClient := gtls.Client{
ServerName: "go-grpc-example",
CertFile: "../../certs/server.pem",
}

c, err := tlsClient.GetTLSCredentials()
if err != nil {
log.Fatalf("tlsClient.GetTLSCredentials err: %v", err)
}

auth := Auth{
AppKey: "wx20200719163021",
AppSecret: "7d13b90ae8e40c0160209c4a985b3bdf01321b15",
}

conn, err := grpc.Dial(":9004", grpc.WithTransportCredentials(c),
grpc.WithPerRPCCredentials(&auth))
if err != nil {
log.Fatalf("grpc.Dail err: %v", err)
}
defer conn.Close()

client := pb.NewSearchServiceClient(conn)
resp, err := client.Search(context.Background(), &pb.SearchRequest{
Request: "gRPC",
})
if err != nil {
log.Fatalf("client.Search err: %v", err)
}

log.Printf("resp: %s", resp.GetResponse())
}

10. gRPC Deadline

10.1 为什么要设置Deadline?

  • 未设置 Deadlines 时,将采用默认的 DEADLINE_EXCEEDED(该时间非常大)
  • 产生阻塞等待时,会造成大量正在进行的请求被保留,直到这些请求都达到最大超时
  • 会导致资源耗尽的风险,也会增加服务的延迟,最坏时可能导致整个进出崩溃

10.2 服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
type SearchService struct{}

func (s *SearchService) Search(ctx context.Context, r *pb.SearchRequest) (*pb.SearchResponse, error) {
// Deadline
if ctx.Err() == context.Canceled {
return nil, status.Errorf(codes.Canceled, "SearchService.Search canceled")
}

return &pb.SearchResponse{Response: r.GetRequest() + " Server"}, nil
}

func main() {
tlsServer := gtls.Server{
CaFile: "../../certs/ca.pem",
CertFile: "../../certs/server.pem",
KeyFile: "../../certs/server.key",
}

c, err := tlsServer.GetCredentialsByCA()
if err != nil {
log.Fatalf("tlsServer.GetCredentialsByCA err: %v", err)
}

// 服务选项
opts := []grpc.ServerOption{
grpc.Creds(c),
grpc_middleware.WithUnaryServerChain(
RecoveryInterceptor,
LoggingInterceptor,
),
}

server := grpc.NewServer(opts...)

pb.RegisterSearchServiceServer(server, &SearchService{})

ln, err := net.Listen("tcp", ":9001")
if err != nil {
log.Fatalf("net.Listen err: %v", err)
}

server.Serve(ln)
}

func LoggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
log.Printf("gRPC method: %s, %v", info.FullMethod, req)
resp, err := handler(ctx, req)
log.Printf("gRPC method: %s, %v", info.FullMethod, resp)
return resp, err
}

func RecoveryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
defer func() {
if e := recover(); e != nil {
debug.PrintStack()
err = status.Errorf(codes.Internal, "Panic err: %v", e)
}
}()

return handler(ctx, req)
}

10.3 客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func main() {
tlsClient := gtls.Client{
ServerName: "go-grpc-example",
CaFile: "../../certs/ca.pem",
CertFile: "../../certs/client.pem",
KeyFile: "../../certs/client.key",
}

c, err := tlsClient.GetCredentialsByCA()
if err != nil {
log.Fatalf("tlsClient.GetCredentialsByCA err: %v", err)
}

conn, err := grpc.Dial(":9001", grpc.WithTransportCredentials(c))
if err != nil {
log.Fatalf("grpc.Dail err: %v", err)
}
defer conn.Close()

// Deadlines
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Duration(5*time.Second)))
defer cancel()

client := pb.NewSearchServiceClient(conn)
resp, err := client.Search(ctx, &pb.SearchRequest{
Request: "gRPC",
})
if err != nil {
statusErr, ok := status.FromError(err)
if ok {
if statusErr.Code() == codes.DeadlineExceeded {
log.Fatalf("client.Search err: deadline")
}
}

log.Fatalf("client.Search err: %v", err)
}

log.Printf("resp: %s", resp.GetResponse())
}