【大模型】triton inference server
前言:triton inference server常用于大模型部署,可以采用http或GRPC调用,支持大部分的 backend,单GPU、多GPU都可以支持,CPU也支持。本文主要是使用triton inference server部署大模型的简单流程示例。
目录
- 1. 整体流程
- 2. 搭建本地仓库
- 3. 服务端代码
- 4. 启动服务
- 5. 客户端调用
1. 整体流程
- 搭建模型仓库
- 模型配置
- 服务端调用代码
- docker启动服务
- 客户端调用
2. 搭建本地仓库
本地创建一个仓库文件夹,文件夹中各子文件夹代表不同版本的模型,各子文件夹(必须以非0的数字命名)中包含模型文件夹,模型配置文件config.bptxt,服务端代码model.py,文件树如下:
模型名
|— — 版本号
|— — — —模型文件(.bin/.pth…)
|— — config.pbtxt
|— — model.py
- config.pbtxt
#this MUST be the same name with the outside folder
name: "ibuddha_chitchat"
# pytorch
platform: "pytorch_libtorch"
# you should limit this ,or else the graphic card will doom...
max_batch_size: 64
input [{#pytorch output this 0,1,2 silly name by defaultname: "INPUT__0"#int64 or int32, must be the same as the model definedata_type: TYPE_INT64#dynamic sequence len, means you can input text len from 1 to 510 typically, or else you should put a fix value heredims: [-1]},{name: "INPUT__1"data_type: TYPE_INT64dims: [-1]},{name: "INPUT__2"data_type: TYPE_INT64dims: [-1]}
]
output [{#pytorch silly default namename: "OUTPUT__0"data_type: TYPE_FP32dims: [13088]}
]
# output only one which has bigger version
version_policy: { latest {num_versions: 1}}
#version_policy: { all {}}
# enable dynamic will improve your performance greatly
dynamic_batching {
}
# enable this will make your inference faster
parameters: {
key: "INFERENCE_MODE"value: {string_value:"true"}
}
# disable this. It is slower than default in my test
#parameters: {
#key: "ENABLE_NVFUSER"
# value: {
# string_value:"true"
# }
#}#pytorch model only run in graphic card 0 by default
instance_group [{count: 1kind: KIND_GPUgpus: [ 0 ]}
]
注意: name必须和外层模型文件夹的名字一致
3. 服务端代码
- model.py
import triton_python_backend_utils as pb_utils
from torch.utils.dlpack import from_dlpack,to_dlpack
import torch.nn.functional as F
import torch
import json
import numpy as npclass TritonPythonModel:"""Your Python model must use the same class name. Every Python modelthat is created must have "TritonPythonModel" as the class name."""def initialize(self, args):"""`initialize` is called only once when the model is being loaded.Implementing `initialize` function is optional. This function allowsthe model to intialize any state associated with this model.Parameters----------args : dictBoth keys and values are strings. The dictionary keys and values are:* model_config: A JSON string containing the model configuration* model_instance_kind: A string containing model instance kind* model_instance_device_id: A string containing model instance device ID* model_repository: Model repository path* model_version: Model version* model_name: Model name"""# You must parse model_config. JSON string is not parsed hereself.model_config = json.loads(args['model_config'])input0_config = pb_utils.get_input_config_by_name(self.model_config, "INPUT__0")input1_config = pb_utils.get_input_config_by_name(self.model_config, "INPUT__1")input2_config = pb_utils.get_input_config_by_name(self.model_config, "INPUT__2")output0_config = pb_utils.get_output_config_by_name(self.model_config, "OUTPUT__0")output1_config = pb_utils.get_output_config_by_name(self.model_config, "OUTPUT__1")# Convert Triton types to numpy typesself.input0_dtype = pb_utils.triton_string_to_numpy(input0_config['data_type'])self.input1_dtype = pb_utils.triton_string_to_numpy(input1_config['data_type'])self.input2_dtype = pb_utils.triton_string_to_numpy(input2_config['data_type'])self.output0_dtype = pb_utils.triton_string_to_numpy(output0_config['data_type'])self.output1_dtype = pb_utils.triton_string_to_numpy(output1_config['data_type'])#self.cls, self.sep, self.pad, self.speaker1, self.speaker2 = self.tokenizer.convert_tokens_to_ids(["[CLS]", "[SEP]", "[PAD]", "[speaker1]", "[speaker2]"])#self.special_tokens_ids = [self.cls, self.sep, self.pad, self.speaker1, self.speaker2]self.special_tokens_ids = [0, 2, 1, 13086, 13087]self.output_min_length = 1self.output_max_length = 64 #TODO: changeself.temperature = 0.7self.top_p = 0.7self.round = 1def execute(self, requests):"""`execute` must be implemented in every Python model. `execute`function receives a list of pb_utils.InferenceRequest as the onlyargument. This function is called when an inference request is madefor this model. Depending on the batching configuration (e.g. DynamicBatching) used, `requests` may contain multiple requests. EveryPython model, must create one pb_utils.InferenceResponse for everypb_utils.InferenceRequest in `requests`. If there is an error, you canset the error argument when creating a pb_utils.InferenceResponseParameters----------requests : listA list of pb_utils.InferenceRequestReturns-------listA list of pb_utils.InferenceResponse. The length of this list mustbe the same as `requests`"""responses = []# Every Python backend must iterate over everyone of the requests# and create a pb_utils.InferenceResponse for each of them.for request in requests:# Get INPUT0in_0 = pb_utils.get_input_tensor_by_name(request, "INPUT__0")in_1 = pb_utils.get_input_tensor_by_name(request, "INPUT__1")in_2 = pb_utils.get_input_tensor_by_name(request, "INPUT__2")#pytorch_tensor = from_dlpack(in_0.to_dlpack())#print(pytorch_tensor)# Get Model Name#model_name = pb_utils.get_input_tensor_by_name(# request, "MODEL_NAME")# Model Name string#model_name_string = model_name.as_numpy()[0]model_name_string = "ibuddha_chitchat"# Create inference request object# Perform synchronous blocking inference request# Create InferenceResponse. You can set an error here in case# there was a problem with handling this inference request.# Below is an example of how you can set errors in inference# response:## pb_utils.InferenceResponse(# output_tensors=..., TritonError("An error occured"))## Because the infer_response of the models contains the final# outputs with correct output names, we can just pass the list# of outputs to the InferenceResponse object.#print(type(infer_response))output_ids = []output_confidences = []for i in range(self.output_max_length):infer_request = pb_utils.InferenceRequest(model_name=model_name_string,requested_output_names=["OUTPUT__0"],inputs=[in_0, in_1, in_2])infer_response = infer_request.exec()if infer_response.has_error():raise pb_utils.TritonModelException(infer_response.error().message())output0 = pb_utils.get_output_tensor_by_name(infer_response, 'OUTPUT__0')#_logits = output0.as_numpy()#logits = torch.from_numpy(np.array(_logits))logits = from_dlpack(output0.to_dlpack())#print(pytorch_tensor)#_logits = self.triton_infer(encoded_input)[0]#logits = torch.from_numpy(np.array(_logits))logits = logits[0, :] / self.temperaturetop_logits = self.top_filtering(logits, self.top_p)probs = F.softmax(top_logits, dim=-1)prev = torch.multinomial(probs, num_samples=1)if i < self.output_min_length and prev.item() in self.special_tokens_ids:while prev.item() in self.special_tokens_ids:prev = torch.multinomial(probs, num_samples=1)output_id = prev.item()if output_id in self.special_tokens_ids:breakoutput_ids.append(output_id)output_confidences.append(probs[output_id].item())input_ids = torch.from_numpy(in_0.as_numpy())attention_mask = torch.from_numpy(in_1.as_numpy())token_type_ids = torch.from_numpy(in_2.as_numpy())#input_ids = from_dlpack(in_0.to_dlpack())#attention_mask = from_dlpack(in_1.to_dlpack())#token_type_ids = from_dlpack(in_2.to_dlpack())input_ids = torch.cat((input_ids, torch.LongTensor([[output_id]])), 1)attention_mask = torch.cat((attention_mask, torch.LongTensor([[1]])), 1)token_type_ids = torch.cat((token_type_ids, torch.LongTensor([[output_id]])), 1)in_0 = pb_utils.Tensor("INPUT__0", input_ids.numpy().astype(self.input0_dtype))in_1 = pb_utils.Tensor("INPUT__1", attention_mask.numpy().astype(self.input1_dtype))in_2 = pb_utils.Tensor("INPUT__2", token_type_ids.numpy().astype(self.input2_dtype))#in_0 = pb_utils.Tensor.from_dlpack("INPUT__0", to_dlpack(input_ids))#in_1 = pb_utils.Tensor.from_dlpack("INPUT__1", to_dlpack(attention_mask))#in_2 = pb_utils.Tensor.from_dlpack("INPUT__2", to_dlpack(token_type_ids))#print(infer_response.output_tensors())output_ids = torch.tensor(output_ids)output_confidences = torch.tensor(output_confidences)output_0 = pb_utils.Tensor("OUTPUT__0", output_ids.numpy().astype(self.output0_dtype))output_1 = pb_utils.Tensor("OUTPUT__1", output_confidences.numpy().astype(self.output1_dtype))#output_0 = pb_utils.Tensor.from_dlpack("OUTPUT__0", to_dlpack(output_ids))#output_1 = pb_utils.Tensor.from_dlpack("OUTPUT__1", to_dlpack(output_confidences))inference_response = pb_utils.InferenceResponse(output_tensors=[output_0, output_1])#print(type(inference_response))responses.append(inference_response)# You should return a list of pb_utils.InferenceResponse. Length# of this list must match the length of `requests` list.return responsesdef top_filtering(self, logits, top_p=0.0, threshold=-float('Inf'), filter_value=-float('Inf')):#assert logits.dim() == 1 # Only work for batch size 1 for now - could update but it would obfuscate a bit the codeif top_p > 0.0:sorted_logits, sorted_indices = torch.sort(logits, descending=True)cumulative_probabilities = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1)sorted_indices_to_remove = cumulative_probabilities > top_psorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone()sorted_indices_to_remove[..., 0] = 0indices_to_remove = sorted_indices[sorted_indices_to_remove]logits[indices_to_remove] = filter_valueindices_to_remove = logits < thresholdlogits[indices_to_remove] = filter_valuereturn logitsdef finalize(self):"""`finalize` is called only once when the model is being unloaded.Implementing `finalize` function is OPTIONAL. This function allowsthe model to perform any necessary clean ups before exit."""print('Cleaning up...')
4. 启动服务
$ docker run --gpus=1 --rm -p8000:8000 -p8001:8001 -p8002:8002 -v/path/to/model/repository:/models <tritonserver image name> tritonserver --model-repository=/models
5. 客户端调用
# 构造triton的输入body
json_buf = b'{\"inputs\":[{\"name\":\"INPUT\",\"datatype\":\"BYTES\",\"shape\":[1],\"parameters\":{\"binary_data_size\":' + \bytes(str(len(data)), encoding = "utf8") + b'}}],\"outputs\":[{\"name\":\"RESULT\",\"parameters\":{\"binary_data\":true}}]}'
push_data = json_buf + dataprint("Inference-Header-Content-Length ",str(len(json_buf)), " Content-Length ",str(len(data) + len(json_buf)))
# 构造triton-header
header = {"Content-Type": "application/octet-stream", "Accept": "*/*","Inference-Header-Content-Length":str(len(json_buf)),"Content-Length":str(len(data) + len(json_buf))}server_url = "127.0.0.1:8000"
model_name = "hrnet_pose_estimate"# 请求
response = post('http://' + server_url + '/v2/models/' + model_name + '/infer', data=push_data, headers=header)