쿠버네티스 Argo workflow 소개

쿠버네티스 Argo workflow 소개

이번 포스트에서는 쿠버네티스 위에서 동작하는 workflow engine인 Argo workflow에 대해 소개하고 어떻게 활용할 수 있는지에 대해서 살펴보도록 하겠습니다.

What is Argo workflow?

01.png

Argo workflow는 Argo Project에서 만든 컨테이너 기반 워크플로우 엔진입니다. Airlfow와 비슷하게 원하는 Job을 실행할 수 있으고 각 Job간의 종속성을 부여하여 순서대로 실행할 수 있으며 병렬로 여러 Job을 동시에 실행할 수 있습니다. 차이점은 Job의 단위가 프로세스가 아닌 컨테이너 단위입니다. 또한 Airflow에서는 DAG의 표현을 파이썬 스크립트를 통해서 표현했다면 Argo workflow에서는 쿠버네티스 선언형 명령 스타일로 CustomResourceDefinition을 정의하고 YAML 파일을 만들어 쿠버네티스에 호출합니다. 그러면 Argo workflow controller가 그것을 받아서 DAG를 정의한대로 스케줄링합니다. 쿠버네티스 자체 Job과의 가장 큰 차이점은 Job간의 종속성을 부여하여 workflow를 실행할 수 있다는 점입니다.

Airflow vs Argo workflow 비교

구분 Airflow Argo workflow
UI webserver Argo UI
스케줄러 scheduler kube-scheduler
메세지큐 Celery Argo Controller
메타데이터 DB postgreSQL etcd
Worker Worker Node
DAG 정의 Python script YAML
Job 단위 Operator Container

위의 그래프는 Airflow와 Argo workflow를 비교한 표입니다. 정확하게 1대1 매핑 시킬 수는 없겠지만 이해하기 쉬운 선에서 비교하였습니다. 보시다시피 Argo workflow는 Job을 실행하기 위한 여러 컴포넌트들을 쿠버네티스 자체 컴포넌트에 의존하는 것을 확인할 수 있습니다. 결국 Argo workflow도 여러 컨테이너를 batch 프로세스 형식으로 오케스트레이션하기 때문에 쿠버네티스 플랫폼 위에서 동작하기 보다는 쿠버네티스 컴포넌트들을 직접 활용하여 workflow engine을 개발한 것으로 보입니다.

02.png

동작방법

  1. 사용자가 YAML DAG 명세를 작성하여 쿠버네티스 마스터에 요청을 합니다.
  2. 쿠버네티스 API 서버가 명세를 받아 etcd DB에 workflow 정보를 저장합니다.
  3. Argo controller가 reconcilation loop에서 etcd DB의 새로운 정보를 확인하고 kube-scheduler에 필요한 Pod를 요청합니다.
  4. kube-scheduler는 Pod를 적절한 노드에 스케줄링합니다.
  5. Argo controller가 다음번 reconcilation loop에서 다음 dependency가 걸려 있는 Job을 요청합니다.

Argo workflow의 장점

  1. 실행의 단위가 컨테이너 레벨에서 이루어지기 때문에 고립성이 높습니다. 그렇기 때문에 개별 Job마다 실행환경이 다양한 경우 실행환경이 서로 뒤엉키지 않고 각각의 단독적인 환경을 제공할 수 있습니다.
  2. 하나의 역할만을 담당하는 Job을 단일하게 개발할 수 있기 때문에 재사용성을 무척 높일 수 있습니다. 데이터의 입출 인터페이스만 잘 맞춰 놓는다면 단일한 역할을 담당하는 Job을 여러개 만들어 놓고 마치 레고 블럭처럼 쌓아 올릴 수 있는 강점이 있습니다.

Argo workflow의 단점

  1. Pod를 생성하고 삭제하는 비용이 작지 않기 때문에 (이미지를 다운받고 가상 네트워크 디바이스를 연결하고 IP를 부여하고 컨테이너를 실행하는 등) 작은 일을 처리하는 많은 Job을 생성할 경우 오히려 성능 저하가 일어납니다. 작업이 간단하고 리소스가 많이 필요하지 않은 경우에는 차라리 프로세스 혹은 쓰레드 레벨에서 처리하는 것이 더 효율적인 경우도 있습니다.
  2. 각 스탭마다 개별적인 컨테이너를 실행하기 때문에 Job간의 데이터를 빠르게 공유하는 것이 비교적 힘듭니다. 물론 쿠버네티스의 emtpyDir volume의 memory 미디어를 활용한다면 메모리간 데이터 이동을 할 수 있겠지만 Host 서버의 메모리 크기에 제한을 받기에 유념하여 사용해야 합니다. (정정합니다. Sunghoon Kang님이 댓글로 바로 잡아주셨는데 Job간에는 emtpyDir volume을 사용할 수 없고 Pod 내부의 Container간에만 volume을 공유할 수 있습니다. 이것은 argo workflow의 제약사항이 아닌 쿠버네티스의 emptyDir volume의 제약사항입니다. 내부적으로 NFS volume을 통해 Pod간 데이터를 공유하고 있는데 해당 내용과 혼동하여 작성하였습니다.)

설치 방법

Argo workflow를 설치하는 방법은 비교적 간단합니다. 다음 사이트에 나와 있는 방법을 그대로 작성하였습니다. (2021년 10월 기준)

kubectl create ns argo
kubectl apply -n argo -f https://raw.githubusercontent.com/argoproj/argo-workflows/master/manifests/quick-start-postgres.yaml

해당 명령을 실행하면 쿠버네티스에 CustomResourceDefinition, Argo UI, Argo Controller 등과 같은 리소스를 생성하게 됩니다.

사용 방법

Argo workflow를 사용하는 것도 굉장히 쉽습니다. 쿠버네티스 리소스를 한번이라도 작성해 본 경험이 있으시다면 Workflow YAML 명세를 보는 것만으로도 바로 이해가 가능합니다. 아래에 나와 있는 예제를 위주로 간단하게 설명을 드리겠습니다. https://github.com/argoproj/argo-workflows/blob/master/examples/README.md

1. 첫 Workflow 작성

아래는 가장 간단한 workflow 정의입니다. 거의 쿠버네티스의 Job 리소스와 동일하다고 보시면 됩니다. 위와 같이 YAML 파일을 정의한 후 쿠버네티스 마스터로 호출을 합니다.

# hello-world.yaml
apiVersion: argoproj.io/v1alpha1
kind: Workflow                  # CRD - Workflow
metadata:
  generateName: hello-world-    # workflow 이름
spec:
  entrypoint: whalesay          # workflow 시작점
  templates:
  - name: whalesay
    container:
      image: docker/whalesay
      command: [cowsay]
      args: ["hello world"]
kubectl create -f hello-world.yaml

명령 호출 후 Argo UI 화면으로 들어가시면 hello-world-xxxx 라는 workflow가 새롭게 생성된 것을 보실 수 있습니다. 해당 Job을 클릭하시면 조금 더 자세한 정보들을 보실 수 있는데요. 특히 Logs 버튼을 누리게 되면 실제 Workflow가 실행한 stdout 로그 화면을 보실 수 있습니다.

03.png 04.png

2. 여러 step이 있는 workflow

두번째 workflow는 약간 복잡합니다. 먼저 크게 template을 정의하는 부분과 (윗 부분) 정의한 template을 사용하는 부분 (아래 부분)으로 나뉩니다. template 정의부에서 어떤 컨테이너를 사용할지 전달 받은 파라미터를 어디에 사용할지 등을 미리 지정합니다. template을 활용하는 부분에서는 어떤 순서를 가지고 workflow를 실행할지, 이때 어떤 파라미터 등을 넘길지를 결정합니다.

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: steps-
spec:
  entrypoint: hello-hello-hello    # workflow 시작점 위치
  templates:
  #########################
  # Template을 만듭니다.
  #########################
  - name: whalesay
    inputs:
      parameters:
      - name: message
    container:
      image: docker/whalesay
      command: [cowsay]
      args: [""]   # 전달 받은 파라미터를 args에서 사용합니다.
  #############################################
  # 위에서 만든 template을 step에서 활용합니다.
  #############################################
  - name: hello-hello-hello         # <-- 여기서부터 시작합니다.
    steps:
    - - name: hello1                # 1번째 step
        template: whalesay
        arguments:
          parameters:
          - name: message
            value: "hello1"         # message 파라미터를 hello1이라고 넘깁니다.
    - - name: hello2a               # 더블 대시: 1번째 step 이후에 실행됩니다.
        template: whalesay
        arguments:
          parameters:
          - name: message
            value: "hello2a"        # 또 다른 파라미터 전달
      - name: hello2b               # 싱글 대시: 2번째 step과 동시에 실행됩니다.
        template: whalesay
        arguments:
          parameters:
          - name: message
            value: "hello2b"

위의 workflow를 마찬가지로 쿠버네티스에 호출하면 아래와 같이 삼각형 모양의 workflow가 그려지게 됩니다. 이때 각 job 마다 다르게 넘긴 파라미터에 따라 다른 메세지를 출력하게 됩니다. (hello1, hello2a, hello2b)

05.png

  • 초록색은 완료 표시 / 파란색은 현재 실행 중인 상태를 나타냅니다.

3. DAG 작성

마지막으로 가장 복잡하지만 표현력이 뛰어난 DAG 형식의 workflow를 정의해 보겠습니다. 위의 step 형식과는 다르게 DAG 형식에서는 단순한 순차 / 병렬 실행 이외에 다양한 트리구조를 표현할 수 있습니다. dependencies라는 키워드를 이용하여 Job간의 앞뒤 종속성을 부여함으로 복잡한 DAG를 YAML로 표현할 수 있게 만들어 줍니다.

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: dag-diamond-
spec:
  entrypoint: diamond             # 시작점 위치
  templates:
  #####################
  # template을 정의합니다.
  #####################
  - name: echo
    inputs:
      parameters:
      - name: message
    container:
      image: alpine:3.7
      command: [echo, ""]
  ####################################
  # 위에서 정의한 template을 사용합니다.
  # 여기에서 DAG 모양을 표현합니다.
  ####################################
  - name: diamond                 # <-- 여기서부터 시작합니다.
    dag:
      tasks:
      - name: A                   # 먼저 A라는 job을 실행합니다.
        template: echo            # 위에서 정의한 echo template을 이용하여
        arguments:                # 이때 A라는 파라미터를 전달합니다.
          parameters: [{name: message, value: A}]
      - name: B                   # B라는 job을 실행하는데
        dependencies: [A]         # A가 완료된 이후에 실행합니다.
        template: echo
        arguments:
          parameters: [{name: message, value: B}]
      - name: C                   # C도 마찬가지로
        dependencies: [A]         # A가 완료된 이후에 실행합니다.
        template: echo
        arguments:
          parameters: [{name: message, value: C}]
      - name: D                   # D는
        dependencies: [B, C]      # B,C가 완료된 이후에 실행합니다.
        template: echo
        arguments:
          parameters: [{name: message, value: D}]

위의 DAG를 실행하면 아래와 같이 diamond 형식의 DAG를 표현할 수 있습니다. 06.png


위에서 설명드린 3가지 workflow 표현 뿐만 아니라 volume을 연결하는 방법, conditional한 workflow 작성방법, timeout 설정, workflow 실행 이후 post-execution 기능 등 다양한 workflow 지원 기능들을 제공하고 있고 사용하는 방법이 비교적 직관적이어서 예제에 나와있는 workflow들을 한번씩 확인해 보시기를 추천드립니다. 예제만 따라해도 대충 전반적인 내용을 다 이해할 수 있습니다.

Argo workflow 활용 방법

07.png

지금까지 Argo workflow가 어떤 녀석이고 어떻게 사용하는지 알아봤는데요. 그렇다면 과연 이 컨테이너 기반의 workflow engine을 어디에 활용할 수 있을까요?

저는 개인적으로 Argo workflow를 데이터 파이프라인 및 기계학습 모델훈련에 활용하고 있습니다. 데이터 파이프라인과 기계학습에는 다양한 라이브러리를 사용합니다. 데이터타입에 따라, 데이터를 추출하는 소스에 따라 상이한 실행환경을 가지는데요. 예를 들면, 데이터추출에 Apache Sqoop을 사용하게 되면 가장 먼저 자바 runtime이 필요하고 하둡 라이브러리가 필요합니다. 반대로 S3에서 데이터를 추출하는 경우에는 aws cli나 python boto 라이브러리 등이 필요하게 됩니다.

기계학습 실행환경을 살펴보면 더 다양해집니다. 가장 분석 언어로 R을 사용할 수도, python을 사용할 수도 있습니다. 각 분석 언어 안에서도 버전에 따라 호환되지 않는 API들이 존재하기도 합니다. 분석 패키지를 살펴보게 되면 엄청 파편화되어 같은 실행환경 아래에서 다른 모델을 돌리게 되면 재앙과 같은 일이 벌어지게 됩니다. 이렇듯 데이터 파이프라인 및 기계학습 실행환경은 고립성이 굉장히 중요하게 되는데 Argo workflow가 이런 문제점을 해결하기에 딱 적절한 엔진이 아닌가 싶습니다. 또한 앞에서 언급한대로 각 Job간의 데이터 입출 인터페이스만 잘 맞춰 놓게되면 여러가지 조합을 통하여 빠르게 새로운 데이터 파이프라인 및 기계학습 실행환경을 구축할 수 있어 잘 활용한다면 정말 좋은 선택이 될 수 있어 보입니다.