Tech

Apache Beam으로 머신러닝 데이터 파이프라인 구축하기 3편 - RunInference로 모델 추론하기

Apache Beam Native API, RunInference로 대규모 데이터 모델 추론하기

김성환 | 2022년 11월 28일 | #Machine_Learning #Engineering

Apache Beam으로 머신러닝 데이터 파이프라인 구축하기를 주제로 하는 3번째 글로 다시 찾아뵙게 되었어요. 1편: 도입과 사용에서는 Apache Beam이 무엇이고, 핑퐁팀이 왜 Beam을 사용하게 되었는지 설명해 드렸습니다. 2편: 개발 및 최적화에서는 머신러닝 파이프라인을 효율적으로 개발하고 유지보수하기 위해 어떤 점들을 고려해야 하는지 알아보았습니다. 이번 블로그 3편: RunInference로 모델 추론하기에서는 Dataflow 환경 위에서 대규모 데이터셋의 ML Model Bulk Inference를 어떻게 수행하는지에 초점을 맞추어 보려 해요. 이번 글은 1편과 2편을 먼저 본 다음 이어서 보시는 것을 추천드려요! 👩‍🏫

Why ML Model on Apache Beam?

학습된 머신러닝 모델을 활용하는 방법은 정말 여러 가지가 있어요. 저희 핑퐁팀이 만드는 이루다와 같이 사용자의 요청이 들어올 때마다 머신러닝 모델을 추론하는 Live Inference를 수행하는 경우도 있고, 주기적으로 대량의 데이터셋을 불러와서 머신을 띄우고 모델을 추론하는 Bulk Inference를 수행하는 경우도 있어요. 보통 이러한 작업을 수행하기 전 “어떻게 하면 데이터의 양에 따라 Scalable 하게 리소스를 할당할 수 있을까?”와 같은 고민을 하게 되죠. Live Inference의 경우에는 ML Model을 Inference 하는 서버를 GPU Util 또는 요청량에 따라 자동으로 Scaling시키는 방법을 생각해볼 수 있어요. (Kubernetes 위에서 요청량에 따라서 Scaling 하는 자세한 방법은 저희가 이전에 다른 블로그 글로 소개드린 바 있습니다!)

Bulk Inference를 수행하는 방법

Bulk Inference의 경우에는 Live Inference 상황보다 Latency에 대한 제약이 비교적 적기 때문에, 더욱 다양한 방법을 생각해볼 수 있어요. 만약, Bulk Inference에서 활용할 모델이 Live Inference 때 사용했던 모델과 동일하다면 Live Inference에서 사용했던 인프라를 활용해서 Bulk Inference에 활용하는 방법을 생각할 수 있을 것 같아요. 가령 Live 환경의 모델 인프라를 그대로 복제해서 Airflow와 같은 Tool을 통해 주기적으로 모델 인프라의 서버에 모델 추론 요청을 보내도록 파이프라인을 구축할 수도 있죠. 하지만, 현재 조직 내에서 Live Inference를 수행하고 있지 않거나 Live 환경의 모델 인프라와는 전혀 다른 모델을 통해 추론하고 싶다면, 매번 파이프라인을 작성할 때마다 새로 인프라를 구축하는 일은 매우 번거로울 것이에요.

그러면 어떻게 하면 Bulk Inference 상황에서 데이터의 양에 따라 처리를 위한 머신을 자동으로 Scaling 해주고, 개발자는 Inference를 위한 코드에만 집중하도록 만들 수 있을까요? Apache Beam은 분산 데이터 스토리지와 파이프라인의 로직을 추상화하여 파이프라인을 개발하는 사람은 데이터 처리에 필요한 비즈니스 로직에만 집중하고, 그러면서도 GCP Dataflow 서비스를 활용해서 손쉽게 대규모 데이터에 대한 분산처리를 수행할 수 있었어요. 이 Beam을 ML Model Inference에 활용한다면 방금 던졌던 질문을 효과적으로 해결할 수 있을 것 같지 않나요?

Recap) Beam 위에서 ML Inference 하기

지난 블로그 내용으로 잠시 돌아가볼까요?

저희 블로그 2편머신러닝 모델을 데이터 파이프라인에 활용하기 절 에서 ML Model을 Apache Beam에서 활용하는 방법을 소개해 드린 바 있어요. 혹시라도 아직 이전 편을 안 보신 분들을 위해 간략하게 요약해보자면,

코드를 통해 보는게 더 이해가 빠르겠죠? 위 내용들을 종합해서 간단하게 머신러닝 모델을 추론하는 파이프라인을 아래와 같이 작성할 수 있었어요.

@dataclass
class WeakRefModel:
    model: SomeModel


class MLInferenceShared(DoFn):
    def __init__(self, shared_handle, gpu_model="nvidia-tesla-t4", gpu_count=1):
        self.shared_handle = shared_handle
        self.gpu_model = gpu_model
        self.gpu_count = gpu_count

    def setup(self):
        def initialize_model():
            model = SomeModel()
            return WeakRefModel(model)

        self.model = self.shared_handle.acquire(initialize_model)

    def process(self, element):
        # 1. Batch로 묶인 여러개의 Element들을 하나의 Tensor로 만듭니다.
        batched_tensor = _batched_items_to_tensor(element)
        # 2. Model Inference
        inference_result = self.model(element)
        # 3. Inference 결과 Tensor를 후처리합니다.
        result = _postprocess_tensor(inference_result)
        yield result

...
shared_handle = shared.Shared()
inferenced = (
    raw_data
    | beam.BatchElements(min_batch_size=1024, max_batch_size=4096)
    | beam.ParDo(MLInferenceShared(shared_handle)).with_resource_hints(
        accelerator=f"type:nvidia-tesla-t4;count:1;install-nvidia-driver",
        min_ram="16GB",
    )
)

Beam에서 ML Inference를 하는 Native한 방법: RunInference

Apache Beam 2.40.0 버전부터는 위에서 설명한 내용들을 신경 쓰지 않고 ML Inference를 수행할 수 있는 RunInference PTransform이 구현되었어요. 기존에는 ML Model을 Apache Beam에서 바로 활용하려면 TensorFlow 모델의 경우 tfx-bsl 패키지에 있는 API를 사용하거나, 앞서 소개한 코드처럼 Shared Class와 BatchElements PTransform을 활용하여 개발자가 직접 DoFn을 구현해야 했죠. 하지만, 2.40.0 버전부터는 Beam에서 공식적으로 제공하는 RunInference API를 통해 PyTorch 모델, TensorFlow 모델, 심지어 Scikit-learn 모델들까지 Beam에서 활용할 수 있어요.

# Model Handler는 외부 저장소로부터 Model을 불러오고,
# Input을 Batching해서 Model Inference를 수행하는 로직을 담당.
model_handler = PytorchModelHandlerTensor(
    state_dict_path="path/to/model.pt",
    model_class=SomePyTorchModule,
    model_params={"some": "args"},
    device="GPU",
)

# RunInference는 앞서 구현한 Model Handler를 이용해서 실제로 Inference를 수행.
result = tensors | RunInference(
    model_handler=model_handler,
    inference_args={"some": "args"},
)

RunInference API는 Model을 Load하고 Input을 원하는 형태로 Batching 하는 로직을 담당하는 ModelHandler와 미리 구현된 Model Handler를 이용하여 실제로 Inference를 수행하는 RunInference PTransform으로 나누어져 있어요.

PyTorch 모델을 사용할 때는 Beam에 미리 구현된 PyTorch 전용 Model Handler를 사용할 수 있어요. PyTorch Model Handler는 아래와 같이 두 종류가 있는데,

이외에도 TensorFlow Model을 사용하고 싶으면 tfx_bsl에 구현된 ModelHandler를 사용할 수 있고, Custom하게 구현한 다른 ML Framework을 사용하고 싶으면 ModelHandler를 상속해서 직접 구현할 수도 있어요.

RunInference Transform은 아래와 같이 PredictionResult라는 이름의 NamedTuple을 반환합니다. example에는 Model의 Input으로 사용한 Tensor가 담기고, inference에는 모델 추론 결과가 담겨요.

# Result
>>> result | beam.Map(print)
PredictionResult(
    example=Tensor(<some_input_tensor>),
    inference=Tensor(<inferenced_result>),
)

만약, RunInference의 Input으로 Tensor가 아니라 Tuple[str, Tensor]와 같이 해당 Element의 고유한 Key 값을 유지하고 싶다면 앞서 구현한 Model Handler를 아래와 같이 KeyedModelHandler로 Wrapping해서 사용할 수 있어요.

# Input is like: ("key", torch.Tensor(<some_tensor>))
result = tensors_with_key | RunInference(
    model_handler=KeyedModelHandler(model_handler),  # <--- KeyedModelHandler
    inference_args={"some": "args"},
)

# Result
>>> result | beam.Map(print)
("key", PredictionResult(  # <--- Output with key
    example=Tensor(<some_input_tensor>),
    inference=Tensor(<inferenced_result>),
))

RunInference로 Input Batching하기

RunInference는 기본적으로 Shared Class와 BatchElements PTransform을 통해 Model Inference를 수행하는 패턴을 Wrapping한 PTransform이에요. 그렇기에 기본적으로 PTransform 내에서 Input을 BatchElements PTransform으로 Batching을 하고, ModelHandler에 구현된 run_inference() 함수를 실행해요. BatchElements는 처리 속도에 따라 Batch Size를 자동으로 조절해주며, min_batch_sizemax_batch_size를 통해 최대/최소 크기를 지정해줄 수 있었죠? 공식 가이드에 따르면 ModelHandler에서는 클래스의 batch_elements_kwargs()를 아래와 같이 오버라이딩해서 Batch의 크기를 똑같이 조절할 수 있어요.

class PytorchNoBatchModelHandler(PytorchModelHandlerKeyedTensor):
    def batch_elements_kwargs(self):
        return {"min_batch_size": 1024, "max_batch_size": 4096}

Apache Beam 2.43.0 기준, 이렇게 Batching된 Element들은 ModelHandler의 run_inference()함수로 들어가게 되고, 구현체를 보면 아래처럼 Tensor의 List들을 torch.stack()을 통해 Batched Tensor로 만든 후, Model의 Input으로 넣어주고 있어요.

def run_inference(
      self,
      batch: Sequence[torch.Tensor],
      model: torch.nn.Module,
      inference_args: Optional[Dict[str, Any]] = None
  ) -> Iterable[PredictionResult]:
    ...
    inference_args = {} if not inference_args else inference_args

    # torch.no_grad() mitigates GPU memory issues
    # https://github.com/apache/beam/issues/22811
    with torch.no_grad():
      batched_tensors = torch.stack(batch)  # <--- Where Batched Tensor is made
      batched_tensors = _convert_to_device(batched_tensors, self._device)
      predictions = model(batched_tensors, **inference_args)
      return _convert_to_result(batch, predictions)

Batch 내 Tensor의 Size가 모두 동일한 경우에는 문제가 없지만, Language Model을 사용할 때의 상황을 생각해보면 Tensor들의 Size가 문장의 길이에 따라 달라지는 경우 torch.stack이 아닌, pad_sequence처럼 Custom한 Padding 로직을 집어넣어야 해요. 공식 가이드에서는 이런 경우 Batch Size를 1로 설정하라고 안내하고 있지만, GPU의 Utilization을 매우 떨어뜨려서 파이프라인의 성능을 저하시키게 되죠! 이를 해결하기 위해 아래와 같이 PyTorch Model Handler를 상속해서 run_inference에서 직접 Padding하는 로직을 넣어줄 수 있어요.

class CustomPyTorchModelHandler(PytorchModelHandlerKeyedTensor):
		...
    def run_inference(
        self,
        batch: Sequence[Dict[str, torch.Tensor]],
        model: torch.nn.Module,
        inference_args: Optional[Dict[str, Any]] = None,
    ) -> Iterable[PredictionResult]:
        inference_args = {} if not inference_args else inference_args

        with torch.inference_mode():
            # Model features (torch.nn.utils.rnn의 pad_sequence를 예시로 사용합니다.)
            inputs = pad_sequence([features["input_ids"] for features in batch], batch_first=True, padding_value=0)

            # To device
            inputs = _convert_to_device(inputs, self._device)

            # Inference
            predictions = model(inputs, **inference_args)

            # Inference Result to Apache Beam Compatable Results
            return [PredictionResult(inputs, logit) for logit in predictions]

이 글을 작성하는 시점에는 Apache Beam Repo의 master Branch에 머지된 Pull Request들 중, 위 방법과 같이 상속을 하지 않고도 Custom한 Inference 로직을 주입시킬 수 있도록 구현한 Add custom inference function support to the PyTorch model handler #24062 Pull Request를 찾을 수 있었고, 추후 2.44.0, 또는 그 이후 버전 릴리즈 이후부터는 아래와 같이 Custom Inference Logic을 넣어줄 수 있을 것으로 보여요.

model_handler = PytorchModelHandlerTensor(
    state_dict_path="path/to/model.pt",
    model_class=SomePyTorchModule,
    model_params={"some": "args"},
    device="GPU",
    inference_fn=custom_batching_logic,  # <--- Custom Inference Logic
)

위 내용을 종합하면, 아래 코드와 같이 PyTorch Model을 사용하여 Inference를 하는 파이프라인을 작성할 수 있어요.

model_handler = PytorchModelHandlerTensor(
    state_dict_path="path/to/model.pt",
    model_class=SomePyTorchModule,
    model_params={"some": "args"},
    device="GPU",
    inference_fn=custom_batching_logic,  # 추후 릴리즈에서 변경 가능성이 있어요!
)

with beam.Pipeline() as p:
    texts = (
        p
        | "ReadSentences" >> beam.io.ReadFromTextWithFilename("path/to/input.txt")
        | "PreprocessTexts" >> beam.ParDo(Preprocess())
    )
    processed = (
        texts
        | "Tokenize" >> beam.ParDo(RunTokenizer(pretrained_path))
        | "RunInference"
        >> RunInference(KeyedModelHandler(model_handler)).with_resource_hints(
            accelerator=f"type:nvidia-tesla-t4;count:1;install-nvidia-driver",
            min_ram="16GB",
        )
        | "Postprocess" >> beam.Map(lambda x: (x[0], postprocess(x[1].inference)))
    )
    (
        processed
        | "DropFilename" >> beam.Map(lambda x: x[1])
        | "WriteOutput" >> beam.io.WriteToText("path/to/output.txt")
    )

RunInference는 Model Inference와 관련된 metric들을 남기고 있기 때문에, Dataflow를 활용한다면 아래와 같이 파이프라인의 오른쪽 Panel에서 Model의 Input으로 들어가는 평균 Batch Size는 몇인지, Model에 Inference하는데 걸린 시간은 평균적으로 얼마나 걸렸는지 손쉽게 확인할 수 있어요!

Dataflow Metrics, Inference 수행시 발생한 다양한 Metric들을 확인할 수 있어요!

마치며

지금까지 Apache Beam의 RunInference API를 이용해서 손쉽게 Bulk Inference Pipeline을 작성하는 방법을 알아보았어요. 이렇게 만든 Bulk Inference Pipeline은 조직 내에서 다양하게 사용될 수 있어요! 사내의 대규모 데이터셋들을 GCP의 Dataflow를 사용하여 주기적으로 Preprocessing하는 파이프라인을 만들 수도 있고, ML Model을 Production에 보내기 전 평가용 데이터셋으로 Inference를 수행하여 모델의 정량평가 결과를 빠르게 확인할 수도 있어요. 이제는 RunInference PTransform으로 모든 ML Inference 관련 로직들이 추상화되었기 때문에 PyTorch, TensorFlow와 같은 ML Framework를 사용할 줄만 안다면 손쉽게 Scalable 한 ML Inference 파이프라인을 구축할 수 있어요!

MLOps의 여러 복잡한 문제들을 저희와 함께 풀어보고 싶으시다면, 언제든 채용 공고를 참고해주세요! 🚀

참고할 만한 페이지들

스캐터랩이 직접 전해주는
AI에 관한 소식을 받아보세요

능력있는 현업 개발자, 기획자, 디자이너가
지금 스캐터랩에서 하고 있는 일, 세상에 벌어지고 있는 흥미로운 일들을 알려드립니다.