模型上线不用愁,批量推理来加油.

作为一个算法工程师,在日常工作中难免会碰到模型上线的问题。对于一些要求不高的场合,简单找一个web框架实现一下接口就能搞定:对于每个用户请求,调用模型得到结果再返回。但这种朴素的实现往往无法最大化利用GPU,对于性能要求比较高的场景应付起来就略显吃力。 优化的方法有很多,一个增益很大的措施就是把一个请求推理一次改成多个请求一起推理。去年大概也是这个时候我写了一个小工具来实现这个功能,还取了个蛮霸气的名字InferLight,但当时写得并不太好;最近参考香侬科技的Service-Streamer又重构了一版。这个功能看似简单,但是在实现的过程中可以了解很多Python异步编程的知识,感觉收获颇丰,于是写篇短文总结一下。 首先,要提高模型的线上推理吞吐量,应该把推理服务做成异步的。对于web服务来说,异步的意思是当模型在计算的时候它可以处理别的请求。对于Python来说,异步服务可以通过很多优秀的基于Asyncio的框架来实现,例如我常用的Sanic。而推理是计算密集的,也没有什么同步异步的说法,我们的目标就是能够汇聚多个推理请求,高效利用GPU的并行计算能力,并且能将批量推理的结果正确地返回给对应的请求者。 要实现上面的目标,需要以下几个模块 前端服务:用于接收请求、返回结果。可以是Http、PRC等各种协议。是一个独立进程。 推理Worker:负责模型的初始化、批量推理数据构建、推理计算。是一个独立进程。 任务队列:前端服务收到请求之后把计算任务送入任务队列;推理Worker监听该队列,每次取出一个小批量由模型推理 结果队列:推理服务推理完成后将结果送入结果队列;前端服务监听该队列,获得推理结果 结果分发:在将任务送入任务队列前需要生成任务的唯一标识,从结果队列取回结果后根据标识获取到任务对应的结果 其中两个任务队列的实现方式很多,可以通过一些成熟的中间件例如Kafka、Redis等,但为了避免外部依赖,这次我选择使用Python原生的多进程队列。结果队列监听和分发通过前端服务进程的一个子线程来完成。 实现细节 推理服务相对简单,由于各种模型的加载、数据处理步骤千奇百怪,所以我将推理Worker设计成了一个基类,使用时继承它并实现特定方法。 import logging import multiprocessing as mp import time from queue import Empty class BaseInferLightWorker: def __init__(self, data_queue:mp.Queue, result_queue:mp.Queue, model_args:dict, batch_size=16, max_delay=0.1, ready_event=None) -> None: self.data_queue = data_queue self.result_queue = result_queue self.batch_size = batch_size self.max_delay = max_delay self.logger = logging.getLogger('InferLight-Worker') self.logger.setLevel(logging.DEBUG) self.load_model(model_args) # 由于模型载入时间较长 # 加载完成后使用一个event来通知主进程 if ready_event: ready_event.set() def run(self): self.logger.info('Worker started!') while True: data, task_ids = [], [] since = time....

June 20, 2021 · 4 min · Yuanhao