Как максимизировать пропускную способность данных и сэкономить деньги

В предыдущих постах (таких как здесь и здесь) мы обсуждали различные варианты потоковой передачи данных с Amazon S3 в тренировочную сессию TensorFlow. В этом посте мы возвращаемся к теме обучения из S3, на этот раз с акцентом на обучение PyTorch.

Как и в наших предыдущих сообщениях, сценарий, к которому мы обращаемся, — это сценарий, в котором наш набор данных настолько велик, что он либо:

  1. не может быть полностью загружен на учебный экземпляр, или
  2. его загрузка привела бы к значительной задержке нашего обучения.

Существуют разные подходы к решению этой задачи. Один из подходов, который выходит за рамки этого поста, заключается в настройке какой-либо формы постоянной системы хранения, которая отражает данные S3 и которая легко доступна и доступна, как только запускается наш учебный экземпляр. Один из способов реализовать это с Amazon FSx. Несмотря на то, что у этого подхода есть преимущества (как обсуждалось здесь), он может потребовать значительного дополнительного обслуживания и затрат. Подход, который мы используем в этом посте, заключается в потоковой передаче данных непосредственно из S3 в наш цикл обучения.

Потоковые данные из Amazon S3

Хотя потоковая передача данных из Amazon S3 непосредственно в цикл обучения может показаться простой задачей, при неправильном проектировании она может стать узким местом в конвейере обучения. В этой нежелательной ситуации вычислительные ресурсы вашей системы будут бездействовать, ожидая поступления данных от S3. Наша цель — максимально использовать системные ресурсы и, как следствие, скорость обучения. Существует ряд факторов, которые контролируют степень, в которой потоковые данные из S3 могут повлиять на время шага обучения, в том числе следующие:

  1. Размер входной пропускной способности сети для вашего типа инстанса. Это свойство выбранного вами типа экземпляра обучения.
  2. Общий объем данных (в байтах), необходимых для этапа обучения. Вы должны стремиться уменьшить размер каждой выборки данных, передавая только те данные, которые необходимы для обучения, и учитывая различные методы сжатия. (Хотя вы также должны учитывать дополнительные вычисления, которые потребуются для распаковки.)
  3. Выбор формата файла, в котором хранятся данные. Например, вам может быть лучше использовать последовательный формат файлов, такой как форматы WebDataset или TFRecord, а не формат, который требует загрузки целых файлов, чтобы иметь возможность открывать и анализировать их.
  4. Размеры отдельных файлов, в которых хранятся ваши данные, также могут влиять на производительность потоковой передачи данных. Например, хранение каждой выборки данных в отдельном файле размером в один мегабайт увеличит накладные расходы на транзакции с S3. Мы стремимся хранить наши данные в нескольких файлах размером в несколько сотен мегабайт, каждый из которых содержит последовательности из нескольких выборок данных. Следует отметить, что хранение ваших данных таким образом создает другие проблемы, которые мы рассмотрели в предыдущем посте.
  5. Инструменты, используемые для выполнения потоковой передачи данных, будут обсуждаться в этом посте.

В этом посте мы рассмотрим несколько способов обучения с S3. Пожалуйста, не интерпретируйте наше упоминание или отсутствие того или иного метода как одобрение или отклонение. Мы считаем, что важно знать несколько методов. У каждого есть свои сильные и слабые стороны, и лучший вариант, вероятно, будет зависеть от деталей вашего проекта.

По мере того, как ландшафт машинного обучения продолжает развиваться, развиваются и многие вспомогательные платформы и библиотеки. Имейте в виду, что некоторые из упомянутых нами API и инструментов могут устареть к тому времени, когда вы будете читать этот пост. Мы настоятельно рекомендуем вам пользоваться самыми современными инструментами, поскольку они могут включать в себя существенные улучшения и оптимизации.

Хотя наше внимание будет сосредоточено на обучении с использованием PyTorch (версии 1.10 и 1.11) и из Amazon S3, многое из того, что мы говорим, будет так же актуально для других учебных сред и других служб хранения объектов.

Особая благодарность Ицхаку Леви за помощь в создании этого поста.

Измерение пропускной способности

В следующих разделах мы рассмотрим различные методы потоковой передачи данных из S3. Мы сравним методы, рассчитав производительность обучения, измеряемую количеством выборок данных, поступающих в цикл обучения в секунду. Чтобы сосредоточиться только на производительности потоковой передачи данных, мы измерим пропускную способность в случае пустого шага обучения, как показано в блоке кода ниже.

import torch, time
from statistics import mean, variance
dataset=get_dataset()
dl=torch.utils.data.DataLoader(dataset, batch_size=4, num_workers=4)
stats_lst = []
t0 = time.perf_counter()
for batch_idx, batch in enumerate(dl, start=1):
    if batch_idx % 100 == 0:
        t = time.perf_counter() - t0
        print(f'Iteration {batch_idx} Time {t}')
        stats_lst.append(t)
        t0 = time.perf_counter()
mean_calc = mean(stats_lst[1:])
var_calc = variance(stats_lst[1:])
print(f'mean {mean_calc} variance {var_calc}')

Важно иметь в виду, что, хотя это сравнительное измерение может дать нам хорошее представление о максимальной пропускной способности, которую может поддерживать каждый метод, оно может не дать хорошего прогноза того, как выбранный вами метод повлияет на фактическую пропускную способность обучения для двух основных причины:

  1. Выбранный вами метод может не повлиять на общее время тренировочного шага. Например, если ваш этап обучения требует интенсивных вычислений, может не иметь значения, занимает ли извлечение файла из S3 1 секунду или 10 секунд.
  2. Типичный этап обучения будет включать в себя множество дополнительных операций, которые могут повлиять на реальную производительность обучения. В частности, некоторые операции могут конкурировать за те же ресурсы, которые передают данные из S3.

Другой метод, который мы часто используем для измерения влияния потоковой передачи данных на общую скорость обучения, заключается в измерении того, как изменяется время шага при работе с образцами кэшированных данных, а не с образцами потоковых данных, как показано ниже.

import torch, time
from statistics import mean, variance
dataset=get_dataset()
dl=torch.utils.data.DataLoader(dataset, batch_size=4, num_workers=4)
batch = next(iter(dl))
t0 = time.perf_counter()
for batch_idx in range(1,1000):
    train_step(batch)
    if batch_idx % 100 == 0:
        t = time.perf_counter() - t0
        print(f'Iteration {batch_idx} Time {t}')
        t0 = time.perf_counter()

Пример игрушки — WebDataset

В целях демонстрации мы будем использовать синтетический набор данных, состоящий из случайных изображений и сегментов изображений, сохраненных в формате файла WebDataset, формате, основанном на файлах tar, специально разработанном для обучения с большими наборами данных. В частности, мы сгенерировали несколько 400-мегабайтных tar-файлов, используя следующий блок кода:

import webdataset as wds
import numpy as np
from PIL import Image
import io
out_tar = 'wds.tar'
sink = wds.TarWriter(out_tar)
im_width = 1024
im_height = 1024
num_classes = 256
for i in range(100):
    image = Image.fromarray(np.random.randint(0, high=256,
                  size=(im_height,im_width,3), dtype=np.uint8))
    label = Image.fromarray(np.random.randint(0, high=num_classes,
                  size=(im_height,im_width), dtype=np.uint8))
    image_bytes = io.BytesIO()
    label_bytes = io.BytesIO()
    image.save(image_bytes, format='PNG')
    label.save(label_bytes, format='PNG')
    sample = {"__key__": str(i),
              f'image': image_bytes.getvalue(),
              f'label': label_bytes.getvalue()}
    sink.write(sample)

Потоковая передача с Amazon S3

В этом разделе мы рассмотрим ряд инструментов и методов потоковой передачи данных из Amazon S3. Обзор ни в коем случае не является исчерпывающим; есть много дополнительных инструментов, которые мы здесь не рассматриваем. Мы продемонстрируем некоторые параметры на примере WebDataset, который мы показали выше. В целом мы делим решения на два типа: решения, в которых мы явно извлекаем данные из S3 в среду обучения, и решения, которые предоставляют приложению интерфейс в стиле файловой системы.

Загрузка файлового объекта

Ряд решений для обучения от Amazon S3 предполагает явную загрузку данных в локальную среду обучения.

Скачивание объекта с помощью интерфейса командной строки AWS.
Один из самых простых способов получить файл из S3 — использовать инструмент Интерфейс командной строки AWS. Команда ниже загрузит объектный файл, хранящийся в S3:

aws s3 cp s3://<path in s3>/wds0.tar -

Замена дефиса на локальный путь приведет к сохранению файла на локальный диск. Подробнее об использовании этого инструмента смотрите здесь.

Библиотека WebDataset поддерживает конвейерную передачу в потоке байтов из файла, полученного с помощью команды cp AWS S3. Мы демонстрируем, как создать набор данных PyTorch таким образом в блоке кода ниже:

import io, webdataset
def get_dataset():
    urls = [f's3://<path in s3>/{i}.tar' for i in range(num_files)]
    # add awscli command to urls
    urls = [f'pipe:aws s3 cp {url} -' for url in urls]
    dataset = (
           webdataset.WebDataset(urls, shardshuffle=True)
            .shuffle(10)
    )
    return dataset

Загрузка объектов с помощью Boto3:
Boto3 — это библиотека Python, которая позволяет загружать объектные файлы из S3. Приведенная ниже функция демонстрирует, как извлечь содержимое данных файла в поток байтов в локальной памяти.

import boto3, io, webdataset, re
client = boto3.client("s3")
def get_bytes_io(path):
    byte_io = io.BytesIO()
    _, bucket, key, _ = re.split("s3://(.*?)/(.*)$", path)
    client.download_fileobj(bucket, key, byte_io)
    byte_io.seek(0)
    return byte_io

Хотя WebDataset не включает встроенную поддержку такого использования, мы можем легко добавить ее, переопределив функцию url_opener в webdataset.tariterators:

import webdataset
from webdataset.handlers import reraise_exception
def url_opener(data, handler=reraise_exception, **kw):
    for sample in data:
        url = sample["url"]
        try:
            stream = get_bytes_io(url)
            sample.update(stream=stream)
            yield sample
        except Exception as exn:
            exn.args = exn.args + (url,)
            if handler(exn):
                continue
            else:
                break
webdataset.tariterators.url_opener = url_opener
def get_dataset():
    urls = [f's3://<path in s3>/{i}.tar' for i in range(num_files)]                                     
    dataset = (
           webdataset.WebDataset(urls, shardshuffle=True)
            .shuffle(10)
    )
    return dataset

Режим SageMaker Pipe Mode:
Amazon SageMaker – это управляемая служба, предоставляемая AWS для выполнения облачного машинного обучения в масштабе. Многочисленные служебные инструменты, которые он предлагает, включают специальные API для взаимодействия с обучающими данными, хранящимися в Amazon S3. В Документации SageMaker подробно описаны различные поддерживаемые режимы ввода данных. Мы подробно рассказали о некоторых свойствах SageMaker Pipe Mode в предыдущем посте. Pipe Mode — это еще один способ явного извлечения и потоковой передачи данных из S3 в локальную среду обучения. При использовании конвейерного режима обучающие данные поступают через выделенный канал Linux FIFO.

with open(fifo_path, ‘rb’, buffering=0) as fifo:
    # read and parse data stream to yield samples

Недостатком конвейерного режима является то, что он требует разбора входящего потока данных. Это означает, что его использование ограничено форматами файлов, которые поддерживают синтаксический анализ таким образом. В предыдущем посте мы продемонстрировали построение обучающего входного пайплайна таким образом.

Решения доступа S3

Хотя приложения обычно программируются для работы с файловыми системами, Amazon S3 — это объектное хранилище, а не файловая система. Ряд решений предназначен для преодоления этого разрыва путем предоставления файловой системы, подобной интерфейсу, для Amazon S3.

S3Fs:
S3Fs — это одно из нескольких решений Python на основе FUSE для монтирования корзины S3 в качестве файловой системы. Хотя WebDataset не включает встроенную поддержку использования S3F, мы можем переопределить функцию url_opener в webdataset.tariterators, чтобы использовать ее. Обратите внимание, что для использования S3F с PyTorch нам нужно установить для метода запуска многопроцессорной обработки значение "spawn".

torch.multiprocessing.set_start_method('spawn')
import s3fs, webdataset
from webdataset.handlers import reraise_exception
fs = s3fs.S3FileSystem()
def url_opener(data, handler=reraise_exception, **kw):
    for sample in data:
        url = sample["url"]
        try:
            stream = fs.open(url.replace("s3://", ""), mode='rb')
            sample.update(stream=stream)
            yield sample
        except Exception as exn:
            exn.args = exn.args + (url,)
            if handler(exn):
                continue
            else:
                break
webdataset.tariterators.url_opener = url_opener
def get_dataset():
    urls = [f's3://<path in s3>/{i}.tar' for i in range(num_files)]                                     
    dataset = (
           webdataset.WebDataset(urls, shardshuffle=True)
            .shuffle(10)
    )
    return dataset

Ознакомьтесь с этим классным постом, чтобы узнать больше о методах извлечения данных S3 на основе FUSE, включая альтернативы S3F, такие как goofys и rclone.



Плагин Amazon S3 PyTorch:
В прошлом году AWS объявила о выпуске специальной библиотеки для извлечения данных из S3 в учебную среду PyTorch. Подробности об этом плагине, включая инструкции по использованию, можно найти в этом проекте github. Следует отметить, что недавно авторы объявили об устаревании этой библиотеки, а также о планах заменить ее поддержкой S3 IO в библиотеке TorchData. (Подробнее об этом ниже.) Блок кода ниже демонстрирует создание итерируемого набора данных PyTorch с нашими игрушечными файлами WebDataset с помощью подключаемого модуля S3 PyTorch.

from awsio.python.lib.io.s3.s3dataset import S3IterableDataset
class S3_Dataset(torch.utils.data.IterableDataset):
    def __init__(self, urls):
        self._s3_iter_dataset = S3IterableDataset(urls, True)
    def data_generator(self):
        try:
            while True:
                image_fname, image_fobj = next(self._s3_iter)
                label_fname, label_fobj = next(self._s3_iter)
                yield {
                    'image': image_fobj,
                    'label': label_fobj
                }
        except StopIteration:
            return
    def __iter__(self):
        self._s3_iter = iter(self._s3_iter_dataset)
        return self.data_generator()
def get_dataset():
    urls = [f's3://<path in s3>/{i}.tar' for i in range(num_files)]                                     
    dataset = S3_Dataset(urls)
    return dataset

Быстрый файловый режим SageMaker:
Amazon SageMakerпредлагает дополнительное решение на основе FUSE для доступа к файлам в S3-вызове Быстрый файловый режим ( ФФМ). Когда вы программируете задание SageMaker для использования режима быстрого ввода файлов, путь S3 монтируется в предопределенный локальный путь к файлу. В недавнем посте мы расширили этот параметр режима ввода, продемонстрировали его использование и обсудили его плюсы и минусы. Адаптировать наш WebDataset для использования FFM очень просто:

import os, webdataset
def get_dataset():
    ffm = os.environ['SM_CHANNEL_TRAINING']
    urls = [os.path.join(ffm, f'{i}.tar') for i in range(num_files)]
    dataset = (
           webdataset.WebDataset(urls, shardshuffle=True)
            .shuffle(10)
    )
    return dataset

Обратите внимание, что на момент написания этой статьи производительность FFM может зависеть от количества файлов, а также от количества разделов в предопределенном пути S3.

Полученные результаты

В приведенной ниже таблице указано среднее время шага, которое мы получили при запуске пустого цикла обучения, описанного выше, на разных наборах данных в экземпляре EC2 c5.xlarge. Эти результаты представлены в качестве примера того, какие сравнительные результаты производительности вы можете получить. Мы предостерегаем от того, чтобы делать какие-либо выводы из этих результатов в отношении вашего собственного проекта, поскольку производительность, вероятно, будет сильно зависеть от деталей модели обучения и данных.

Потоковая передача с использованием конвейеров TorchData

TorchData — это новая интересная библиотека PyTorch для создания наборов данных PyTorch. В настоящее время он выпущен как бета-версия продукта и требует версии PyTorch 1.11 (или выше). Официальный релиз ожидается в ближайшие месяцы. Страница проекта TorchData gihub содержит информацию о дизайне библиотек, а также документацию по API и примеры. TorchData включает в себя множество модулей стандартных блоков для создания конвейеров данных, включая модули для загрузки набора данных, хранящегося в формате WebDataset, и модули для извлечения данных из S3.

В разделах ниже мы демонстрируем несколько решений, поддерживаемых библиотекой TorchData. Имейте в виду, что некоторые API могут быть изменены до официального выпуска библиотеки.

Использование IoPathFileOpener:
IoPathFileOpener поддерживает загрузку файлов непосредственно из облачного хранилища. Это зависит от библиотеки абстракции ввода-вывода iopath. Блок кода ниже демонстрирует его использование:

from torchdata.datapipes.iter import IoPathFileOpener, IterableWrapper
def get_dataset():
    urls = [f's3://<path in s3>/{i}.tar' for i in range(num_files)]
    urls = IterableWrapper(urls).shuffle().cycle()
    tars = IoPathFileOpener(urls, mode="rb").load_from_tar()
    samples = tars.groupby(lambda x:     
                               os.path.basename(x[0]).split(".")[0],
                           group_size=2, guaranteed_group_size=2)
    dataset = samples.map(lambda x: 
                  {'image': x[0][1].read(),
                   'label': x[0][1].read()})
    dataset = dataset.shuffle(buffer_size=10)
return dataset

Использование FSSpecFileOpener:
FSSpecFileOpener поддерживает аналогичную функциональность, на этот раз основанную на библиотеке S3Fs. Блок кода ниже демонстрирует его использование:

from torchdata.datapipes.iter import FSSpecFileOpener, IterableWrapper
def get_dataset():
    urls = [f's3://<path in s3>/{i}.tar' for i in range(num_files)]
    urls = IterableWrapper(urls).shuffle().cycle()
    tars = FSSpecFileOpener(urls, mode="rb").load_from_tar()
    samples = tars.groupby(lambda x:     
                               os.path.basename(x[0]).split(".")[0],
                           group_size=2, guaranteed_group_size=2)
    dataset = samples.map(lambda x: 
                  {'image': x[0][1].read(),
                   'label': x[0][1].read()})
    dataset = dataset.shuffle(buffer_size=10)
return dataset

С SageMaker FFM:
При обучении с использованием Amazon SageMaker мы также можем использовать стандартный класс FileOpener, используя FFM и указывая на монтирование локального файла.

import os
from torchdata.datapipes.iter import FileOpener, IterableWrapper
def get_dataset():
    ffm = os.environ['SM_CHANNEL_TRAINING']
    urls = [os.path.join(ffm, f'{i}.tar') for i in range(num_files)]       
    tars = FileOpener(urls, mode="rb").load_from_tar()
    samples = tars.groupby(lambda x:     
                               os.path.basename(x[0]).split(".")[0],
                           group_size=2, guaranteed_group_size=2)
    dataset = samples.map(lambda x: 
                  {'image': x[0][1].read(),
                   'label': x[0][1].read()})
    dataset = dataset.shuffle(buffer_size=10)
return dataset

Использование S3FileLoader.
Реинкарнация подключаемого модуля Amazon S3 PyTorch, описанного выше, S3FileLoader — это канал данных, специально созданный AWS для загрузки данных из S3. На момент написания этой статьи он не входит в стандартный пакет torchdata и требует некоторых шагов по установке, как описано здесь.

from torchdata.datapipes.iter import S3FileLoader, IterableWrapper
def get_dataset():
    urls = [f's3://<path in s3>/{i}.tar' for i in range(num_files)]
    urls = IterableWrapper(urls).shuffle().cycle()
    tars = S3FileLoader(urls).load_from_tar()
    samples = tars.groupby(lambda x:     
                               os.path.basename(x[0]).split(".")[0],
                           group_size=2, guaranteed_group_size=2)
    dataset = samples.map(lambda x: 
                  {'image': x[0][1].read(),
                   'label': x[0][1].read()})
    dataset = dataset.shuffle(buffer_size=10)
return dataset

Полученные результаты

И снова мы делим среднее время шагов при запуске одного и того же пустого цикла обучения, описанного выше, для разных наборов данных, созданных с помощью новой библиотеки TorchData, и для экземпляра EC2 c5.xlarge.

Хотя мы по-прежнему осторожно относимся к каким-либо значимым выводам из этих результатов, кажется, что новый TorchData предлагает обновление не только в предлагаемых функциях, но и в скорости потоковой передачи с S3.

Краткое содержание

В этом посте мы рассмотрели несколько вариантов потоковой передачи данных из Amazon S3 в среду обучения. Этот список ни в коем случае не является исчерпывающим; имеется множество дополнительных инструментов и методов.

Мы считаем, что знакомство с несколькими методами имеет решающее значение, поскольку вы можете обнаружить, что ваш лучший выбор либо не имеет отношения к одному из ваших проектов, либо что его производительность неудовлетворительна.

Пожалуйста, не стесняйтесь обращаться ко мне с любыми комментариями, исправлениями или вопросами.