grpc流的使用方法和注意事项

我们将分别实现客户端流式、服务端流式和双向流式模式。

步骤 1: 创建 Protobuf 文件

首先确保你已经有了如下的 Protobuf 文件 hello_stream.proto

syntax = "proto3";

package hello_stream;

option go_package = "./services/hello_stream";

service HelloStreamService {
  // 客户端流式 RPC
  rpc HelloClientStream(stream HelloRequest) returns (HelloResponse);
  
  // 服务端流式 RPC
  rpc HelloServerStream(HelloRequest) returns (stream HelloResponse);
  
  // 双向流式 RPC
  rpc HelloStream(stream HelloRequest) returns (stream HelloResponse);
}

message HelloRequest {
  string message = 1;
}

message HelloResponse {
  string message = 1;
}

步骤 2: 安装 gRPC 和 Protobuf 相关工具

你需要安装以下工具来生成 gRPC 的 Go 代码:

  1. 安装 protoc 编译器:你可以从 Protocol Buffers 官方 GitHub 下载合适的版本。
  2. 安装 gRPC 插件:bash复制编辑go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
  3. 配置环境变量:bash复制编辑export PATH="$PATH:$(go env GOPATH)/bin"

步骤 3: 生成 Go 代码

运行以下命令来生成 Go 文件:

protoc --go_out=. --go-grpc_out=. hello_stream.proto

生成的文件会在当前目录下,通常会包含:

  • hello_stream.pb.go:包含消息类型的定义。
  • hello_stream_grpc.pb.go:包含服务接口的定义。

步骤 4: 实现服务端代码

创建一个新的 Go 文件,server.go,并实现服务器逻辑:

package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"net"
	"services/hello_stream" // 修改为你的 Go 包路径

	"google.golang.org/grpc"
)

type server struct {
	hello_stream.UnimplementedHelloStreamServiceServer
}

// HelloClientStream 实现客户端流式 RPC
func (s *server) HelloClientStream(stream hello_stream.HelloStreamService_HelloClientStreamServer) error {
	var messages []string
	for {
		req, err := stream.Recv()
		if err == io.EOF {
			// 客户端流结束
			break
		}
		if err != nil {
			return err
		}
		messages = append(messages, req.GetMessage())
	}

	// 处理完客户端消息后返回响应
	return stream.SendAndClose(&hello_stream.HelloResponse{
		Message: fmt.Sprintf("Received: %v", messages),
	})
}

// HelloServerStream 实现服务端流式 RPC
func (s *server) HelloServerStream(req *hello_stream.HelloRequest, stream hello_stream.HelloStreamService_HelloServerStreamServer) error {
	for i := 0; i < 5; i++ {
		if err := stream.Send(&hello_stream.HelloResponse{
			Message: fmt.Sprintf("Hello %s, message %d", req.GetMessage(), i),
		}); err != nil {
			return err
		}
	}
	return nil
}

// HelloStream 实现双向流式 RPC
func (s *server) HelloStream(stream hello_stream.HelloStreamService_HelloStreamServer) error {
	for {
		req, err := stream.Recv()
		if err == io.EOF {
			// 客户端结束流
			return nil
		}
		if err != nil {
			return err
		}

		// 返回响应给客户端
		if err := stream.Send(&hello_stream.HelloResponse{
			Message: "Received: " + req.GetMessage(),
		}); err != nil {
			return err
		}
	}
}

func main() {
	lis, err := net.Listen("tcp", ":50051")
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}

	s := grpc.NewServer()
	hello_stream.RegisterHelloStreamServiceServer(s, &server{})

	log.Println("Server listening on port :50051")
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

步骤 5: 实现客户端代码

创建另一个 Go 文件 client.go,实现客户端逻辑:

package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"services/hello_stream" // 修改为你的 Go 包路径
	"google.golang.org/grpc"
)

const (
	serverAddr = "localhost:50051"
)

func main() {
	conn, err := grpc.Dial(serverAddr, grpc.WithInsecure(), grpc.WithBlock())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()

	client := hello_stream.NewHelloStreamServiceClient(conn)

	// 客户端流式 RPC
	clientStream, err := client.HelloClientStream(context.Background())
	if err != nil {
		log.Fatalf("could not send stream: %v", err)
	}
	for i := 0; i < 5; i++ {
		if err := clientStream.Send(&hello_stream.HelloRequest{Message: fmt.Sprintf("Message %d", i)}); err != nil {
			log.Fatalf("could not send message: %v", err)
		}
	}
	resp, err := clientStream.CloseAndRecv()
	if err != nil {
		log.Fatalf("could not receive response: %v", err)
	}
	fmt.Println("Client Stream Response:", resp.GetMessage())

	// 服务端流式 RPC
	serverStream, err := client.HelloServerStream(context.Background(), &hello_stream.HelloRequest{Message: "Hello from client"})
	if err != nil {
		log.Fatalf("could not receive stream: %v", err)
	}
	for {
		resp, err := serverStream.Recv()
		if err == io.EOF {
			// 流结束
			break
		}
		if err != nil {
			log.Fatalf("error receiving stream: %v", err)
		}
		fmt.Println("Server Stream Response:", resp.GetMessage())
	}

	// 双向流式 RPC
	stream, err := client.HelloStream(context.Background())
	if err != nil {
		log.Fatalf("could not establish stream: %v", err)
	}
	go func() {
		for i := 0; i < 5; i++ {
			if err := stream.Send(&hello_stream.HelloRequest{Message: fmt.Sprintf("Client Message %d", i)}); err != nil {
				log.Fatalf("error sending message: %v", err)
			}
		}
	}()
	for {
		resp, err := stream.Recv()
		if err == io.EOF {
			// 流结束
			break
		}
		if err != nil {
			log.Fatalf("error receiving message: %v", err)
		}
		fmt.Println("Bidirectional Stream Response:", resp.GetMessage())
	}
}

步骤 6: 运行服务和客户端

  1. 启动服务端:bash复制编辑go run server.go
  2. 启动客户端:bash复制编辑go run client.go

你已经成功实现了 gRPC 流模式的 Go 代码,包括:

  • 客户端流式 RPC(HelloClientStream)。
  • 服务端流式 RPC(HelloServerStream)。
  • 双向流式 RPC(HelloStream)。

这样就可以进行高效的流数据传输,适用于需要大量数据交换或者实时交互的应用场景。

细节说明:不同类型的 gRPC 流模式

在 gRPC 中,我们有三种流模式:客户端流式、服务端流式和双向流式。每种模式的使用场景和实现方式略有不同。下面我将详细解释每种模式的实现和适用场景。

1. 客户端流式 RPC (Client-Stream RPC)

在客户端流式 RPC 中,客户端会发送一系列请求消息,然后服务端在客户端结束流之后,发送一个响应。客户端和服务端之间的交互是单向的,客户端负责流的发送,服务端只在客户端发送完所有消息后返回响应。

适用场景

  • 客户端发送大量数据到服务端,而服务端根据这些数据进行处理后返回单个结果。
  • 比如上传文件,客户端逐块发送数据,最后服务端返回上传成功的信息。

实现细节

  • 客户端可以多次调用 Send 方法发送请求消息,直到所有消息都发送完毕。
  • 客户端通过 CloseAndRecv 或者 SendAndClose 来关闭流并接收最终的响应。

示例代码

// 客户端流式 RPC
func (s *server) HelloClientStream(stream hello_stream.HelloStreamService_HelloClientStreamServer) error {
	var messages []string
	for {
		req, err := stream.Recv()
		if err == io.EOF {
			// 客户端流结束
			break
		}
		if err != nil {
			return err
		}
		messages = append(messages, req.GetMessage())
	}

	// 处理完客户端消息后返回响应
	return stream.SendAndClose(&hello_stream.HelloResponse{
		Message: fmt.Sprintf("Received: %v", messages),
	})
}

2. 服务端流式 RPC (Server-Stream RPC)

在服务端流式 RPC 中,客户端发送一个请求后,服务端会返回多个响应消息。这意味着客户端只发送一次请求,而服务端可能会多次返回响应数据,直到服务端的数据流结束。

适用场景

  • 客户端请求一个操作,服务端持续提供结果,直到任务完成。
  • 比如实时获取服务器的监控数据,或者从服务端获取分页的数据。

实现细节

  • 服务端可以多次调用 Send 方法返回消息,直到数据流结束。
  • 客户端只需要一次调用 Recv 来接收每一个响应。

示例代码

// 服务端流式 RPC
func (s *server) HelloServerStream(req *hello_stream.HelloRequest, stream hello_stream.HelloStreamService_HelloServerStreamServer) error {
	for i := 0; i < 5; i++ {
		if err := stream.Send(&hello_stream.HelloResponse{
			Message: fmt.Sprintf("Hello %s, message %d", req.GetMessage(), i),
		}); err != nil {
			return err
		}
	}
	return nil
}

3. 双向流式 RPC (Bidirectional Stream RPC)

双向流式 RPC 允许客户端和服务端同时进行流式传输。客户端和服务端都可以在接收到对方的消息后发送自己的消息。这是最灵活的一种流模式。

适用场景

  • 双向通信的场景,如实时聊天、在线游戏中的消息传输、视频流传输等。
  • 适合要求实时双向通信的应用。

实现细节

  • 客户端和服务端都可以通过 Send 和 Recv 方法独立地进行发送和接收操作。
  • 双向流可以是同时进行的,双方可以互相交替发送和接收消息。

示例代码

// 双向流式 RPC
func (s *server) HelloStream(stream hello_stream.HelloStreamService_HelloStreamServer) error {
	for {
		req, err := stream.Recv()
		if err == io.EOF {
			// 客户端结束流
			return nil
		}
		if err != nil {
			return err
		}

		// 返回响应给客户端
		if err := stream.Send(&hello_stream.HelloResponse{
			Message: "Received: " + req.GetMessage(),
		}); err != nil {
			return err
		}
	}
}

总结

  1. 客户端流式 RPC:客户端发送多个请求,服务端在收到所有消息后返回单个响应。适用于客户端需要发送多个数据块,服务端返回处理结果的场景。
  2. 服务端流式 RPC:客户端发送一个请求,服务端返回多个响应。适用于从服务端获取一系列数据或事件流的场景。
  3. 双向流式 RPC:客户端和服务端都可以同时发送和接收消息。适用于实时交互和持续的双向数据传输。

每种流模式在实际应用中都有其独特的优势,选择哪种模式取决于你的具体需求。通过 gRPC,你可以轻松实现高效的流式数据传输,满足各种实时通信需求。


文章标签:

评论(0)