[튜토리얼3] 케라스를 사용한 다중 워커(Multi-worker) 훈련

이 튜토리얼에서는 tf.distribute.Strategy API를 사용하여 케라스 모델을 다중 워커로 분산 훈련하는 방법을 살펴보겠습니다. 다중 워커를 사용하여 훈련할 수 있도록 전략을 디자인했기 때문에, 단일 워커 훈련용으로 만들어진 케라스 모델도 코드를 조금만 바꾸면 다중 워커를 사용하여 훈련할 수 있습니다.

참고 : 현재 플랫폼에서는 GPU 기능이 지원되지 않습니다! 분산 훈련을 활용하는 방법에 대해 학습하는 목적으로 튜토리얼을 진행해주세요.

from __future__ import absolute_import, division, print_function, unicode_literals

import warnings
warnings.simplefilter('ignore')

import tensorflow_datasets as tfds
import tensorflow as tf

tfds.disable_progress_bar()

목차

  1. 데이터셋 준비하기
  2. 케라스 모델 만들기
  3. 다중 워커 구성
  4. 적절한 전략 고르기
  5. MuliWorkerMirroredStrategy로 모델 훈련하기
  6. 성능
  7. 내결함성

1. 데이터셋 준비하기

MNIST 데이터셋을 TensorFlow Datasets에서 받아옵시다. MNIST 데이터셋은 0-9 숫자를 손으로 쓴 28x28 픽셀 흑백 이미지입니다. 6만 개의 훈련 샘플과 만 개의 테스트 샘플이 들어있습니다.

BUFFER_SIZE = 10000
BATCH_SIZE = 64

# MNIST 데이터를 (0, 255] 범위에서 (0., 1.] 범위로 조정
def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label

datasets, info = tfds.load(name='mnist',
                           with_info=True,
                           as_supervised=True)

train_datasets_unbatched = datasets['train'].map(scale).cache().shuffle(BUFFER_SIZE)
train_datasets = train_datasets_unbatched.batch(BATCH_SIZE)

2. 케라스 모델 만들기

tf.keras.Sequential API를 사용하여 간단한 합성곱 신경망 케라스 모델을 만들고 컴파일하도록 하겠습니다. 우리 MNIST 데이터셋으로 훈련시킬 모델입니다.

def build_and_compile_cnn_model():
    model = tf.keras.Sequential([
        tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
        tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dense(10, activation='softmax')
    ])
    model.compile(
        loss=tf.keras.losses.sparse_categorical_crossentropy,
        optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
        metrics=['accuracy'])
    return model

먼저 단일 워커를 이용하여 적은 수의 에포크만큼만 훈련을 해보고 잘 동작하는지 확인해봅시다. 에포크가 넘어감에 따라 손실(loss)은 줄어들고 정확도는 1.0에 가까워져야 합니다.

single_worker_model = build_and_compile_cnn_model()
single_worker_model.fit(x=train_datasets, epochs=3)

3. 다중 워커 구성

자 이제 다중 워커 훈련의 세계로 들어가 봅시다. 텐서플로에서 여러 장비를 사용할 때는 TF_CONFIG 환경 변수를 설정해야 합니다. 하나의 클러스터를 구성하는 각 장비에 클러스터 구성을 알려주고 각각 다른 역할을 부여하기 위해 TF_CONFIG를 사용합니다.

TF_CONFIGclustertask 두 개의 부분으로 구성됩니다.

다중 워커 훈련에서는 보통 일반적인 워커보다 조금 더 많은 일을 하는 특별한 워커가 하나 필요합니다. 이 워커는 체크포인트를 저장하거나, 서머리(summary)를 쓰는 일 등을 추가로 담당하게 됩니다. 보통 치프(‘chief’) 워커라고 부르고, 관례적으로 index 번호가 0인 워커가 치프 워커가 됩니다(사실 tf.distribute.Strategy가 이렇게 구현되었습니다). 한편 task에는 현재 워커의 작업에 대한 정보를 지정합니다.

이 예에서는 작업(task) type"worker"로 지정하고, index0으로 지정하였습니다. 이 말은 이 장비가 첫 번째 워커이고, 따라서 치프 워커이며, 다른 워커보다 더 많은 일을 하게 된다는 뜻입니다. 물론 다른 장비들에도 TF_CONFIG 환경변수가 설정되어야 합니다. 다른 장비들에도 cluster에는 동일한 딕셔너리를 지정하겠지만, task에는 각 장비의 역할에 따라 다른 작업 type이나 index를 지정해야 합니다.

이 튜토리얼에서는 두 개의 워커를 localhost에 띄우는 방법을 보여드리겠습니다. 실제로는 각 워커를 다른 장비에서 띄우기 위해서는 실제 IP 주소와 포트를 할당하고, 그에 맞게 TF_CONFIG를 지정해야 합니다.

이 예제에서는 학습률을 바꾸지 않고 그대로 사용한 것에 주의하십시오. 실제로는 전역(global) 배치 크기에 따라 학습률을 조정해야 할 수 있습니다.

4. 적절한 전략 고르기

텐서플로의 분산 전략은 크게 각 훈련 단계가 워커들이 가진 복제본들끼리 동기화되는 동기 훈련 방식과, 동기화가 엄격하게 이루어지지 않는 비동기 훈련 방식이 있습니다.

이 튜토리얼에서 다루는 MultiWorkerMirroredStrategy는 동기 다중 워커 훈련에서 추천하는 전략입니다. 모델을 훈련하려면 tf.distribute.experimental.MultiWorkerMirroredStrategy 인스턴스를 하나 만듭니다. MultiWorkerMirroredStrategy는 모델의 레이어에 있는 모든 변수의 복사본을 각 워커의 장치마다 만듭니다. 그리고 수집 작업을 위한 텐서플로 연산인 CollectiveOps를 사용하여 그래디언트를 모으고, 각 변수의 값을 동기화합니다.

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

참고: MultiWorkerMirroredStrategy.__init__()가 호출될 때, TF_CONFIG를 파싱하고 텐서플로 gRPC 서버가 구동됩니다. 따라서 TF_CONFIG 환경변수는 tf.distribute.Strategy 인스턴스를 만들기 전에 설정해야 합니다.

MultiWorkerMirroredStrategyCollectiveCommunication 매개변수로 선택할 수 있는 여러 가지 구현체를 제공합니다. RING(링)은 링 구조 기반의 수집 작업 구현체이고, 장비 간 통신을 위하여 gRPC를 사용합니다. NCCLNvidia의 NCCL로 수집 작업을 구현한 것입니다. AUTO를 지정하면, 런타임이 알아서 선택합니다. 어떤 수집 작업 구현체가 최적인지는 GPU의 종류와 수, 클러스터 내 네트워크 연결 등 여러 요소에 따라 달라집니다.

5. MultiWorkerMirroredStrategy로 모델 훈련하기

다중 워커 분산 훈련을 위하여 tf.distribute.Strategy API를 tf.keras와 함께 사용하려면, 딱 한 가지만 바꾸면 됩니다. 바로 모델 구성과 model.compile() 호출 코드를 strategy.scope() 안으로 넣는 것입니다. 분산 전략의 범위(scope)를 써서 변수를 어디에 어떻게 만들지 지정할 수 있습니다. MultiWorkerMirroredStrategy의 경우, 만들어지는 변수는 MirroredVariable이고, 각 워커에 복제본이 생깁니다.

참고: 아래 코드가 예상과 같이 동작하는 것처럼 보이겠지만, 사실은 단일 워커로 동작하는 것입니다. TF_CONFIG가 설정되어 있지 않기 때문입니다. 실제로 TF_CONFIG를 설정하고 아래 예제를 실행하면, 여러 장비를 활용하여 훈련 속도가 빨라지는 것을 볼 수 있습니다.

NUM_WORKERS = 2
# `tf.data.Dataset.batch`에는 전역 배치 크기를 지정해야 하기 때문에
# 여기서 배치 크기는 워커의 수를 곱한 크기로 늘려야 합니다. 
# 전에는 64였지만, 이제 128이 됩니다.
GLOBAL_BATCH_SIZE = 64 * NUM_WORKERS
train_datasets = train_datasets_unbatched.batch(GLOBAL_BATCH_SIZE)
with strategy.scope():
    multi_worker_model = build_and_compile_cnn_model()
multi_worker_model.fit(x=train_datasets, epochs=3)

5.1 데이터셋 샤딩과 배치 크기

다중 워커 훈련에서는 수렴과 성능을 위하여 데이터를 여러 부분으로 샤딩(sharding)해야 합니다. 하지만, 위 코드 예에서는 데이터셋을 샤딩하지 않고 바로 model.fit()으로 보낸 것을 볼 수 있습니다. 이는 tf.distribute.Strategy API가 다중 워커 훈련에 맞게 자동으로 데이터셋을 샤딩해주기 때문입니다.

만약 훈련할 때 샤딩을 직접 하고 싶다면, tf.data.experimental.DistributeOptions API를 사용해서 자동 샤딩 기능을 끌 수 있습니다.

또 하나 주목할 점은 datasets의 배치 크기입니다. 앞서 코드에서 GLOBAL_BATCH_SIZE = 64 * NUM_WORKERS로 지정하였습니다. 단일 워커일 때보다 NUM_WORKERS 배만큼 크게 지정한 것입니다. 이는 실제로 각 워커에 전달되는 배치 크기가 tf.data.Dataset.batch()에 매개변수로 전달된 전역 배치 크기를 워커의 수로 나눈 것이 되기 때문입니다. 즉, 이렇게 바꾸어야 실제로 워커가 처리하는 배치 크기가 단일 워커일 때와 동일한 값이 됩니다.

6. 성능

이제 케라스 모델이 완성되었습니다. MultiWorkerMirroredStrategy를 사용하여 여러 워커를 사용하여 훈련할 수 있습니다. 다중 워커 훈련의 성능을 더 높이려면 다음 기법들을 확인해 보십시오.

7. 내결함성

동기 훈련 방식에서는, 워커 중 하나가 죽으면 전체 클러스터가 죽어버리고, 복구 메커니즘이 따로 없습니다. 하지만 케라스와 tf.distribute.Strategy를 함께 사용하면, 워커가 죽거나 불안정해지는 경우에도 내결함성을 제공합니다. 이는 사용자가 선택한 분산 파일 시스템에 훈련 상태를 저장하는 기능을 제공하기 때문입니다. 기존 인스턴스가 죽거나 정지당해서 재시작되는 경우에도 훈련 상태를 복구할 수 있습니다.

모든 워커가 훈련 에포크 혹은 스텝에 따라 동기화되므로, 다른 워커들은 죽거나 정지당한 워커가 복구될 때까지 기다려야 합니다.

7.1 ModelCheckpoint 콜백

다중 워커 훈련의 내결함 기능을 사용하려면, tf.keras.Model.fit()를 호출할 때 tf.keras.callbacks.ModelCheckpoint의 인스턴스를 제공해야 합니다. 이 콜백이 체크포인트와 훈련 상태를 ModelCheckpointfilepath 매개변수에 지정한 디렉터리에 저장합니다.

# `filepath` 매개변수를 모든 워커가 접근할 수 있는 파일 시스템 경로로 바꾸십시오.
callbacks = [tf.keras.callbacks.ModelCheckpoint(filepath='/tmp/keras-ckpt')]
with strategy.scope():
    multi_worker_model = build_and_compile_cnn_model()
multi_worker_model.fit(x=train_datasets, epochs=3, callbacks=callbacks)

워커가 정지당하면, 정지당한 워커가 다시 살아날 때까지 전체 클러스터가 잠시 멈춥니다. 워커가 클러스터에 다시 들어오면, 다른 워커도 재시작됩니다. 모든 워커가 이전에 저장한 체크포인트 파일을 읽고, 예전 상태를 불러오면 클러스터가 다시 일관된 상태가 됩니다. 그리고서 훈련이 재개됩니다.

ModelCheckpointfilepath가 위치한 디렉터리를 살펴보면, 임시로 생성된 체크포인트 파일들을 확인할 수 있을 것입니다. 이 파일들은 실패한 작업을 복구하는데 필요한 것들로, 다중 워커 훈련 작업을 성공적으로 마치고 나면 tf.keras.Model.fit() 함수가 끝날 때 라이브러리가 알아서 삭제할 것입니다.

Copyright 2019 The TensorFlow Authors.

#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.