陈思煜

V1

2022/11/24阅读:9主题:默认主题

Python-gRPC实践(4)--简述gRPC拦截器

前言

世界上没有百分之百不会挂的服务,只能人为的去增加服务的可用性,为了能让服务的可用性增加,需要为服务添加服务治理的功能,而在gRPC中,可以通过拦截器实现一些服务治理的功能。

1.什么是拦截器

Python中,很少有框架把自己的功能称为拦截器,反而都称为中间件或者是钩子,gRPC的拦截器功能与PythonWeb框架常用的中间件基本一样,它主要的功能是可以在调用函数之前,之后已经发生异常时能够捕获到对应的数据,比如如下代码:

def demo_rpc_method():
    # run before code
    try:
        pass  # rpc logic
    except Exception:
        pass  # exc code
    finally:
        # run after code

这段代码提现了拦截器的主要特点:

  • 1.裹住了一个RPC调用逻辑
  • 2.能够在裹住的逻辑调用之前,之后以及调用异常的时候自定义数据

对于Python开发人员来说,看到这思路后脑子里的第一个想法就是可以通过Python的装饰器轻松的实现gRPC拦截器的逻辑。

所以在官方尚未敲定拦截器实现的时候,大家也都是使用装饰器来实现拦截器的,比如下面这段代码(源码见grpc_example_common/helper/grpc_wrapper.py):

def grpc_client_func_wrapper(*args: Any, **kwargs: Any) -> Callable:
    def wrapper(func: Callable) -> Callable:
        # func是装饰器装饰的一个方法
        @wraps(func)
        def _wrapper(*_args: Any, **_kwargs: Any) -> Any:
            # 查看调用段方法是否有指定metadata参数,如果没有就定义一个
            if "metadata" not in _kwargs:
                _kwargs["metadata"] = []
            return func(*_args, **_kwargs)

        return _wrapper

    return wrapper

这是我目前还在用的一个通过装饰器实现的拦截器,因为目前gRPC的客户端在调用方法没定义metadata参数时,gRPC客户端拦截器是没办法去更改metadata参数的,强行修改还会报错,而meatdata参数类似于HTTP的Header参数,很多通过拦截器实现的功能如链路追踪,身份校验等功能的实现都需要通过拦截器更改或写入metadata参数来实现,所以这个装饰器一直保留着。

既然有了装饰器,那为什么gRPC最终还是选择拦截器这个方案呢,原因是装饰器虽然很灵活,但是本身有很多缺点:

  • 0.装饰器只能用于方法上,如果遇到一个无法调用到指定的方法的请求,那么这个请求就无法没监控到。
  • 1.需要显式的为每个方法添加装饰器。
  • 2.过多的装饰器会变得非常混乱。
  • 3.上篇文章提到的由于装饰器导致需要延后声明才能使方法mock生效。

Note: 可以通过grpc_example_common/helper/grpc_wrapper.py函数自动应用装饰器,使用方法见grpc_service/book_service.py

2.如何使用拦截器

由于装饰器自己带有一些缺点,所以gRPC最后自己定义了一个拦截器方案,不过Python实现gRPC拦截器API非常复杂,按照官方文档的指引,我们需要为每种请求类型定义一个拦截器,但是他们的代码却是相同的,这对开发人员来说非常的不友好,不过好在有一个grpc-interceptor包进行了统一,使用起来非常方便,只用继承他提供的类,再覆盖intercept方法就可以同时为一对一,多对一,一对多的请求套上拦截器,比如我定义了一个可以把错误类型在不同基于Python实现的gRPC服务传播的拦截器:

##############
# 服务端拦截器 #
##############
class ServerCustomerTopInterceptor(BaseInterceptor):
    def intercept(
        self,
        next_handler_method: Callable,
        request_proto_message: Any,
        context: grpc.ServicerContext,
    )
 -> Any:

        # 在这个区域实现未调用方法的代码
        start_time: float = time.time()
        return_initial_metadata: List[Tuple] = [("customer-user-agent""Python3")]
        try:
            # 调用下一个拦截器或者是方法 
            return next_handler_method(request_proto_message, context)
        except Exception as e:
            # 发生错误的处理,解析错误 
            if self.metadata_dict.get("customer-user-agent""") == "Python3":
                return_initial_metadata.append(("exc_name", e.__class__.__name__))
                return_initial_metadata.append(("exc_info", str(e)))
            logging.exception(f"{context_proxy.method} request exc:{e.__class__.__name__} error:{e}")
            # 记住一定要把异常抛出来,这样才能走grpc的异常逻辑,否则客户端那边会解析到错误的body
            raise e
        finally:
            # 调用方法后的逻辑,这里把metadata插入到context中
            context.send_initial_metadata(return_initial_metadata)
            # 打印access log
            logging.info(
                f"Got Request. method:{self.method}, code:{context.code()}, detail:{context.details()}, duration:{time.time() - start_time}"
            )
##############
# 客户端拦截器 #
##############
class ClientCustomerTopInterceptor(BaseInterceptor):
    def __init__(self, exc_list: Optional[List[Type[Exception]]] = None):
        # 注入可用的异常
        self.exc_dict: Dict[str, Type[Exception]] = {}
        for key, exc in globals()["__builtins__"].items():
            if inspect.isclass(exc) and issubclass(exc, Exception):
                self.exc_dict[key] = exc

        if exc_list:
            for exc in exc_list:
                if issubclass(exc, Exception):
                    self.exc_dict[exc.__name__] = exc

    def intercept(
        self,
        method: Callable,
        request_or_iterator: Any,
        call_details: ClientCallDetailsType,
    )
 -> GRPC_RESPONSE:

        # 调用方法之前的操作,注意一定要判断metadata是否为空,如果是的话,metadata是不可写的,强行写入会报错
        if call_details.metadata is not None:
            call_details.metadata.append(("customer-user-agent""Python3"))  # type: ignore
            call_details.metadata.append(("request_id", context_proxy.request_id))
        # 调用方法
        response: GRPC_RESPONSE = method(call_details, request_or_iterator)
        # 调用完方法的对应操作
        metadata_dict: dict = {item.key: item.value for item in response.initial_metadata()}
        if metadata_dict.get("customer-user-agent") == "Python3":
            exc_name: str = metadata_dict.get("exc_name""")
            exc_info: str = metadata_dict.get("exc_info""")
            exc: Optional[Type[Exception]] = self.exc_dict.get(exc_name)
            if exc:
                raise exc(exc_info)
        return response

可以看到源码中的拦截器都有调用之前,调用中,调用后三大逻辑,与我们前面说的一样,给予开发者极强的自定义能力,这样一来就可以通过拦截器来实现一些服务治理的功能,对于如何实现可见RPC框架编写实践系列

实现完拦截器后就需要应用到gRPC服务中,由于grpc-interceptor是对官方拦截器的简单安装,所以可以像官方拦截器一样应用到服务中,服务端应用拦截器可参考grpc-example-book-grpc-service项目grpc.Server初始化时通过interceptors参数把拦截器列表传进去,简要代码如下:

def main(
    host: str = "0.0.0.0", port: str = "9000", ssl_port: Optional[str] = None
)
 -> None:

    interceptor_list: List[BaseInterceptor] = [CustomerTopInterceptor()]
    server: grpc.server = grpc.server(
        futures.ThreadPoolExecutor(max_workers=10),
        interceptors=interceptor_list,
    )
    manager_service.add_BookManagerServicer_to_server(ManagerService(), server)
    social_service.add_BookSocialServicer_to_server(SocialService(), server)

而对于客户端可以参考grpc-example-api-backend-service项目在通过生成channel时,把拦截器通过参数传进去

class BookGrpcService(BookSocialGrpcServiceMixin, BookManagerGrpcServiceMixin):
    def __init__(self, host: str, port: int) -> None:
        self.channel: grpc.Channel = grpc.intercept_channel(
            grpc.insecure_channel(f"{host}:{port}"), CustomerTopInterceptor()
        )
        BookSocialGrpcServiceMixin.__init__(self, self.channel)
        BookManagerGrpcServiceMixin.__init__(self, self.channel)

拦截器编写完成后应该去测试它,但是大多数都拦截器都依赖于一些特定的服务,如Prometheus监控依赖Prometheus,链路追踪可能依赖到的jaeger等,所以很多时候都会在测试环境或者预发布环境进行测试,但是对于其它无服务依赖的拦截器则可以使用grpc_interceptor包实现的测试模块来进行测试,如官方的例子:

from grpc_interceptor import ExceptionToStatusInterceptor
from grpc_interceptor.exceptions import NotFound
from grpc_interceptor.testing import dummy_client, DummyRequest, raises

def test_exception():
    special_cases = {"error": raises(NotFound())}
    # 指定的拦截器
    interceptors = [ExceptionToStatusInterceptor()]
    # 模拟一个客户端发起请求,该客户端响应的数据等于input时指定的数据
    # 不过也可以通过special_cases来指定input数据对应的异常响应
    with dummy_client(special_cases=special_cases, interceptors=interceptors) as client:
        # 输入input时必定会响应一个output
        assert client.Execute(DummyRequest(input="foo")).output == "foo"
        # 输入error时,会抛出一个对应的响应
        with pytest.raises(grpc.RpcError) as e:
            client.Execute(DummyRequest(input="error"))
        assert e.value.code() == grpc.StatusCode.NOT_FOUND

通过该例子可以方便的实现一个正常请求和错误请求来测试拦截器的返回,但是对于一些内在逻辑最好通过pytest-mock或者是官方的unitest-mock来判断是否有调用到,调用几次,调用时的内容是否符合标准等等。

分类:

后端

标签:

Python

作者介绍

陈思煜
V1