5 ключевых факторов, которые следует учитывать при оптимизации Apache Spark в AWS (часть 2)

Это вторая часть ключевых факторов, о которых следует помнить при оптимизации Apache Spark. Если вы не читали первую, загляните здесь.

Эта статья призвана помочь опытным разработчикам справиться с некоторыми узкими местами, с которыми сталкиваются при работе с огромным объемом данных с ограниченными ресурсами. Речь идет не об основах или методах теоретической оптимизации, которые часто обсуждаются. Предлагаемые решения (или приемы оптимизации) основаны на выводах, сделанных при решении практических задач по оптимизации Apache Spark.

Присоединиться к операциям

Во время объединения, если у вас есть большая таблица и относительно небольшая таблица (таблица поиска или размерная таблица), рекомендуется транслировать небольшую таблицу. При трансляции копия транслируемой таблицы отправляется каждому узлу кластера. Таким образом, при объединении часть более крупной таблицы в узле соединяется с транслируемой таблицей, поэтому данные не перемещаются между узлами и сокращаются операции ввода-вывода, что повышает производительность.

Уловка оптимизации:

если вы присоединяетесь к большому столу с маленьким, то рекомендуется транслировать меньший стол. Но имейте в виду, что меньшая таблица должна быть достаточно маленькой, чтобы поместиться в памяти исполнителя. Если обе таблицы, которые вы пытаетесь объединить, большие и одинаковые по размеру, убедитесь, что обе таблицы не перекошены и не распределены по большему количеству разделов. Если нет, выполните повторное разбиение, чтобы увеличить количество разделов перекошенной таблицы. Из двух таблиц, если вы обнаружите, что одна из таблиц не похожа по размеру с другой таблицей и недостаточно мала для широковещательной передачи, вы можете кэшировать (или сохранить) меньшую таблицу и убедиться, что большая таблица правильно разделена перед выполнение Joins.

Максимальный параллелизм

Один из способов повысить параллелизм обработки искры - увеличить количество исполнителей в кластере. Ниже приведены 2 важных свойства, которые контролируют количество исполнителей.

spark.executor.memory # Amount of memory to use per executor process
spark.executor.cores # number of cores to use on each executor

Давайте рассмотрим пример, чтобы понять, как эти два свойства используются, чтобы решить, сколько исполнителей должно быть создано.

Рассмотрим оборудование с 5 узлами, каждый с 16 ядрами и 32 ГБ. Прежде чем подсчитать количество исполнителей, нужно иметь в виду несколько вещей.

  • Узел может иметь несколько исполнителей, но не наоборот.
  • У Executor может быть несколько ядер.
  • Свойству spark.executor.core следует давать только целые значения.
  • Свойство spark.executor.memory может иметь целые или десятичные значения с точностью до 1 знака после запятой.
  • Не рекомендуется иметь более 5 ядер на каждого исполнителя. Это основано на исследовании, в котором любое приложение с более чем 5 параллельными потоками начало снижать производительность.

Некоторые ресурсы необходимы для ОС и демонов Hadoop, например, необходимо выделить около 1 ядра и 1 ГБ памяти. у нас осталось 15 ядер и 31 ГБ. Поскольку мы не можем учитывать долю ядер для исполнителей, при максимальном мы можем 15 исполнителей, т.е. 1 для ядра для каждого исполнителя. Каждому исполнителю также потребуется некоторая память для накладных расходов, таких как накладные расходы виртуальной машины, интернированная строка и т. Д. При взаимодействии с мастером (Yarn в случае AWS). Обычно это 10% памяти исполнителя с минимум 384 МБ. Таблица ниже даст нам некоторое представление о том, как количество исполнителей может варьироваться в зависимости от разных параметров.

Из приведенной выше таблицы, если у нас есть 1 ядро ​​на исполнителя, у нас может быть 15 исполнителей, каждый с 1,7 ГБ памяти. Большее количество исполнителей означает лучший параллелизм, а больший объем памяти на каждого исполнителя означает, что каждый исполнитель может обрабатывать больший объем данных.

Уловка оптимизации:

Необходимо поддерживать баланс между количеством ядер для каждого исполнителя и объемом памяти исполнителя. Хотя понимание данных и сложности алгоритмов является движущей силой для определения баланса, в большинстве случаев оптимальным будет выбор чего-то из середины таблицы. из приведенной выше таблицы 5 исполнителей (в строке 3) 3 ядра и 5,7 ГБ на исполнителей будут хорошей идеей. Если наши алгоритмы сложны и итеративны (в большинстве алгоритмов машинного обучения), хорошо выбрать что-то в конце таблицы (либо 4 ядра, либо 5 ядер на исполнителя), с другой стороны, если объем данных очень велик, а алгоритмы - нет. этот сложный и итеративный подход, чем выбор чего-то в середине таблицы (3 ядра), должен работать лучше.

Пользовательские функции

Это для людей, которые имеют опыт работы с R / Python. Они пишут функции, которые принимают / возвращают фреймы данных. Хотя это синтаксически работает и возвращает результаты, это снижает производительность системы. Давайте возьмем пример и поймем, как не писать UDF в spark.

Не рекомендуемый способ написания UDF в PySpark

## A function that accepts a dataframe and converts a specific column of a dataframe into upper case.
def myfunc(df, sub_str):
 df['col_2'] = df['col_1'].apply(lambda x: upper(x), axis=1)
 return df

Правильный способ написания UDF в PySpark

## A function that accepts a dataframe and a substring as a parameter and does some string operation
def myfunc(str):
 return upper(str)
myfuncUdf = udf(myfunc, StringType())
# Call the function in spark dataframe
df = df.withColumn('col_2', myfunc(df.col_1))

Несколько моментов, которые следует отметить из приведенных выше двух способов написания UDF в искре.

  • Первый - это способ Python, а второй - способ Spark (или PySpark).
  • В языке Python в качестве аргумента используется фрейм данных. В способе Spark функция принимает в качестве аргумента одну запись.
  • В способе Spark функция работает распределенно и параллельно выполняется всеми исполнителями.

Уловка оптимизации

При записи UDF предположим, что функция принимает одну строку, возвращает одну строку. мы можем ввести несколько столбцов, но только одну строку. Если у вас более двух аргументов (столбцов) в UDF, я бы посоветовал создать один массив, используя все ваши аргументы, и передать его в UDF.

Мониторинг показателей кластера

Amazon EMR предоставляет встроенные инструменты для мониторинга показателей кластера, которые можно выбрать для установки при запуске кластера.

  • Spark Web UI - он доступен по умолчанию, когда мы выбираем Spark и Hadoop в конфигурациях программного обеспечения EMR. Используйте это для справки. Используя Spark UI, мы можем просматривать все запланированные задачи и конфигурации.
  • Ganglia - Его можно выбрать при создании кластера в таблице Create Cluster - ›Advanced -› Шаг настройки программного обеспечения. Это полезно для понимания использования ресурсов кластера, таких как ЦП, память и т. Д.
  • Пользовательский интерфейс диспетчера ресурсов Yarn - Yarn является мастером по умолчанию для Spark в EMR. Пользовательский интерфейс Yarn предоставляет много информации о ресурсах кластера, включая количество исполнителей, ЦП и память на каждого исполнителя.

Уловка оптимизации

Отслеживайте показатели кластера с помощью любого из вышеперечисленных инструментов и заранее устраняйте любые проблемы с производительностью. Такие симптомы, как зависание одной задачи на значительное время или сбой задачи из-за искровых исключений, явным образом указывают на нездоровые состояния, которые можно определить с помощью пользовательского интерфейса Spark. Низкий процент использования ЦП, большее количество простаивающих ЦП или всплески памяти - это симптомы нездорового состояния, идентифицированного с помощью Ganglia. Фактическое количество исполнителей меньше ожидаемого или выделенная память / ЦП меньше ожидаемого - это признаки нездорового состояния, идентифицированные с помощью пользовательского интерфейса Yarn Resource Manager.

Объясните план

Другой способ выявления потенциальных узких мест в Spark - использование плана запроса Explain.

df.explain()
df.explain(True)

объяснение () печатает физический план, тогда как объяснение (истина) печатает логический, проанализированный, оптимизированный и физический план запроса. Логический план - это дерево, представляющее схему и данные трех типов.

  • Разобранный логический план
  • Проанализированный логический план
  • Оптимизированный логический план

Оптимизированный логический план преобразован в физический план для исполнения.

Уловка оптимизации

Изучите все вышеперечисленные планы и определите возможности для оптимизации. По возможности избегайте полного сканирования таблицы, применяйте фильтры как можно раньше на этапах обработки и убедитесь, что происхождение не задолго до выполнения объединений.

Заключение

Хотя в Spark уже есть множество встроенных оптимизаций, необходимо разумно использовать их все, чтобы извлечь из этого максимальную пользу.

Вот первая часть истории, пожалуйста, прочтите ее и поделитесь своими мыслями.



Больше контента на plainenglish.io