gRPC基本用法:以人脸识别为例,搭建一个简单的gRPC服务

2023-12-14 14:53:11

0. gRPC简介

相关网站:

中文文档:gRPC 官方文档中文版_V1.0

官网:gRPC

介绍(以下引自官方文档中文版中的介绍):

gRPC是一个高性能、开源和通用的 RPC 框架,面向移动和 HTTP/2 设计。目前提供 C、Java 和 Go 语言版本,分别是:grpc, grpc-java, grpc-go. 其中 C 版本支持 C, C++, Node.js, Python, Ruby, Objective-C, PHP 和 C# 支持.

gRPC 基于 HTTP/2 标准设计,带来诸如双向流、流控、头部压缩、单 TCP 连接上的多复用请求等特。这些特性使得其在移动设备上表现更好,更省电和节省空间占用。

在我们需要对外提供某些服务时,通常有几个选择:Django、Flask、gRPC,他们都可以将你的算法或者其他服务封装起来,以接口的形式对外提供服务。

本文以人脸识别服务为例,介绍一下gRPC这个框架。

1. 协议的定义与生成

1.1 编写proto文档,定义相关字段

syntax = "proto3";

package api.product.pft.v1.face;


// 搜索人脸请求
message SearchFaceReq {
    repeated float feats = 1;
}

// 搜索人脸响应
message SearchFaceReply {
    string student_id = 2;
}


service API {
  rpc SearchFace (SearchFaceReq) returns (SearchFaceReply) {}
}

1.2 生成序列化文档

# xx为第1步编写的协议文件名称,将path/to/xx.proto改为你的协议文件地址
python -m grpc_tools.protoc -I . --python_out=. --grpc_python_out=. path/to/xx.proto

2. 编写服务程序和测试程序

通常server和client是分开的,比如在服务器只需要server相关代码,启动之后就一直在监听有无请求,而client一般在用户端定义并使用,通过连接到服务器,调用服务接口来完成人脸识别功能。

server.py:

import time
import grpc
from concurrent import futures
import logging
import cv2
import glob
import os
import sys
import numpy as np

sys.path.insert(0, os.getcwd())
sys.path.insert(0, 'ai_bridge')

from ..proto import face_pb2, face_pb2_grpc
from ..utils.tools import img_binary_to_cv, img_cv_to_binary

# from py_imo_core import api as pic
# from configs import auth

logging.basicConfig(
    format='%(levelname)s %(asctime)s [%(filename)s:%(lineno)d] %(message)s',
    level=logging.DEBUG
)


class FaceServicer(face_pb2_grpc.APIServicer):
    def __init__(self, features_dir="data/faces/*.npy", thres=0.8):
        feature_paths = sorted(glob.glob(features_dir))
        features, face_ids = [], []
        for feature_path in feature_paths:
            face_id = int(os.path.split(feature_path)[-1].split('.')[0])
            face_ids.append(face_id)
            feature = np.load(feature_path)
            features.append(feature / np.linalg.norm(feature))

        self.features = np.asarray(features)
        self.face_ids = face_ids

        self.thres = thres

    def SearchFace(self, request, context):
        logging.info("Got request!".format())
        feature = np.asarray(request.feats)
        feature = feature / np.linalg.norm(feature)
        print(len(feature))

        # 人脸比对,得到face_id
        scores = np.dot(self.features, feature.T)
        face_id = self.face_ids[np.argmax(scores)] if np.max(scores) >= self.thres else ''
        print("face rec result: id:{}, score:{}".format(face_id, np.max(scores)))

        reply = self._pase_reply(face_id)
        return reply

    def _pase_reply(self, face_id):
        reply = face_pb2.SearchFaceReply(
            student_id=str(face_id)
        )
        return reply


class Server(object):
    def __init__(self, algo_kwargs, ip_port='0.0.0.0:50051', max_workers=10, options=None):
        self.ip_port = ip_port
        self.max_workers = max_workers
        self.options = options
        self.face_server = FaceServicer(**algo_kwargs)

    def run(self):
        server = grpc.server(futures.ThreadPoolExecutor(max_workers=self.max_workers), options=self.options)
        face_pb2_grpc.add_APIServicer_to_server(self.face_server, server)
        server.add_insecure_port(self.ip_port)
        server.start()
        logging.info('listening on %s...' % self.ip_port)
        server.wait_for_termination()

client.py:

import grpc

from ..proto import face_pb2, face_pb2_grpc


class Client(object):
    def __init__(self, ip_port='0.0.0.0:50051', options=None):
        self.ip_port = ip_port
        self.options = options
        self.channel = grpc.insecure_channel(ip_port, options=options)
        self.stub = face_pb2_grpc.APIStub(self.channel)

    def Exec(self, req):
        reply = self.stub.SearchFace(req)
        return reply

3. 编写服务启动脚本、客户端启动脚本

与server和client一一对应,且同样是在不同的端进行使用。

run_server.py:

import os
import sys;

sys.path.insert(0, os.getcwd())

from api.core.server import Server


def main():
    MAX_MESSAGE_LENGTH = 1024 * 1024 * 1024

    algo_kwargs = dict(
        features_dir="data/faces/*.npy"
    )
    server = Server(
        algo_kwargs,
        ip_port='0.0.0.0:50053',
        options=[
            ('grpc.max_send_message_length', MAX_MESSAGE_LENGTH),
            ('grpc.max_receive_message_length', MAX_MESSAGE_LENGTH),
        ]
    )
    server.run()


if __name__ == '__main__':
    main()

run_client.py:

import logging
import time
import cv2
import numpy as np
import glob
import os
import sys

sys.path.insert(0, os.getcwd())
logging.basicConfig(
    format='%(levelname)s %(asctime)s [%(filename)s:%(lineno)d] %(message)s',
    level=logging.DEBUG
)

from api.core.client import Client
from api.proto import face_pb2


def main():
    MAX_MESSAGE_LENGTH = 1024 * 1024 * 1024
    client = Client(
        # ip_port='www.aimall-cd.cn:8036',
        ip_port='0.0.0.0:50050',
        options=[
            ('grpc.max_send_message_length', MAX_MESSAGE_LENGTH),
            ('grpc.max_receive_message_length', MAX_MESSAGE_LENGTH),
        ]
    )

    feature = np.load("data/faces/20001.npy")
    req = face_pb2.ExecReq(
        feature=[f for f in feature],
    )
    reply = client.Exec(req)

    # parse gRPC result
    face_id = reply.face_id

    print(face_id)


if __name__ == '__main__':
    main()

4. 后台启动server服务,运行利用client进行测试

先启动server:

# 启动上面编写的run_server脚本
python /path/to/run_server.py

? ? ? ? 然后,启动client,即可得到测试结果:

# 启动上面编写的run_client脚本
python /path/to/run_client.py

5. 本文所有代码已打包放在github

欢迎大家使用,并给个star:

GitHub - aiLiwensong/grpc_face_recognition: gRPC Demo: Taking facial recognition as an example

文章来源:https://blog.csdn.net/oYeZhou/article/details/128964983
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。