Я работаю дата-сайентистом 5 лет и до сих пор испытываю боль, когда нужно сделать MVP по временным рядам. Начиная с того, как построить несколько графиков одновЯ работаю дата-сайентистом 5 лет и до сих пор испытываю боль, когда нужно сделать MVP по временным рядам. Начиная с того, как построить несколько графиков однов

Покоряем гору временных рядов: делаем прогноз для 200+ рядов с библиотекой Etna

Я работаю дата-сайентистом 5 лет и до сих пор испытываю боль, когда нужно сделать MVP по временным рядам. Начиная с того, как построить несколько графиков одновременно без «слипшихся» меток по осям, заканчивая поиском подходящего метода очистки ряда от аномалий. И всё это венчает цикл по каждому ряду с бесконечным жонглированием данными между numpy, pandas, sklearn, yet_another_library.

Если вы DS, и тоже, как и я, устали от вот этого всего, добро пожаловать под кат. Я покажу, как написать production-ready код для прогноза 200+ временных рядов от EDA до результата. Разберем на практике, как бороться с аномалиями, ловить смены тренда и в итоге – получить масштабируемое решение, а не очередной «велосипед».

b1260d14c9bf618e9b3cc64ad8978735.png

Какую вершину нам предстоит покорить?

Привет, Habr! Меня зовут Саша Солин, я ML-инженер в MAGNIT TECH. Наша команда занимается развитием и поддержкой моделей промо-прогноза. В первом приближении прогнозирование промо может показаться задачей временных рядов, но по ряду причин, мы подходим к ней как к задаче классической регрессии. За счет своей простоты, некоторые из моделей получалось успешно переиспользовать в других проектах, не связанных с промо-прогнозом, например в ценообразовании или доступности товара на полке.

Однако этому пришел конец, когда к нам пришли с задачей… прогнозирования роста журнала изменений БД. По предоставленным временным рядам 200+ сервисов необходимо сделать понедельный прогноз на несколько лет вперед для планирования инфраструктуры БД.

Достаем наш альпинистский набор Data Scientist’a и отправляемся покорять гору рядов!

Базовый лагерь: готовим снаряжение и планируем маршрут

Спойлер: данные представляют собой нестационарные временные ряды без выраженной сезонности, поэтому сразу отмели Prophet & ARIMA-like модели и решили смотреть сторону time-series библиотек с менее требовательными к рядам моделями. Выбор пал на фреймворк Etna - он предоставляет набор удобных инструментов для работы с множественными временными рядами.

Перейдем от слов к делу, посмотрим на часть данных, с которыми будем работать:

import pandas as pd from etna.datasets import TSDataset # Читаем данные и преобразуем в "широкий формат Этна" df = pd.read_csv('data.csv', parse_dates=['timestamp']) ts = TSDataset(df, freq='W-MON') tscb7eabff94de60f012b14783d742a233.png

Данных минимум – четыре временных ряда с временными метками и таргетом (размер журнала изменений). Давайте взглянем на графики:

ts.plot()Настоящий джекпот: нестационарные временные ряды со сменой тренда (1, 2), вкраплениями аномалий (3) и резкими изменениями уровня таргета (4).

Настоящий джекпот: нестационарные временные ряды со сменой тренда (1, 2), вкраплениями аномалий (3) и резкими изменениями уровня таргета (4).
91b1f936c6d8b9380a0b197a22da1416.png

Для начала построим бейзлайн – наивный прогноз, где прогнозное значение равно последнему известному наблюдению. Он послужит отправной точкой для оценки более сложных моделей.

Перед этим разобьем наш датасет на обучающую и тестовую выборки и создадим Pipeline. Он предназначен для последовательного выполнения цепочки создания /преобразования признаков и работы с моделью.

В данном примере использование пайплайна может показаться overkill’ом, но это позволит в дальнейшем проводить все эксперименты по единой схеме и поскорее освоиться с API фреймворка:

from etna.models import NaiveModel from etna.pipeline import Pipeline from etna.analysis import plot_forecast # Формируем тестовую выборку из 20 последних наблюдений HORIZON = 20 train_ts, test_ts = ts.train_test_split(test_size=HORIZON) # Объявляем модель naive_model = NaiveModel() # Объявляем пайплайн naive_pipeline = Pipeline(model=naive_model, horizon=HORIZON) # Обучаем модель и делаем прогноз naive_pipeline.fit(train_ts) naive_forecast_ts = naive_pipeline.forecast() # Визуализируем прогноз plot_forecast(forecast_ts=naive_forecast_ts, test_ts=test_ts, train_ts=train_ts)50315ef5ada09dd6d498666afcb85edc.png

Для оценки прогноза воспользуемся метрикой взвешенной абсолютной процентной ошибки WAPE:

\text{WAPE} = \frac{\sum\limits_{i=1}^{n} |y_i - \hat{y}_i|}{\sum\limits_{i=1}^{n} |y_i|}

from etna.metrics import WAPE wape = WAPE() naive_wape = wape(y_true=test_ts, y_pred=naive_forecast_ts) df_metric = pd.DataFrame.from_dict( data=naive_wape, orient='index', columns=['naive'] ) df_metric.round(2)6eaaa1ba123edf7f800f66f3bf7871f8.png

Бейзлайн готов, ошибка WAPE по каждому из рядов посчитана. Теперь попробуем с наскока снизить ошибку, заменив модель на линейную регрессию.

b8cd965889a41b0baf877393bac9fab4.png

Первый и самый простой признак, который мы создадим для неё — линейный тренд. Это разница в днях между датой каждого наблюдения и началом временного ряда. Для этого нам потребуется преобразование, которое переведет разность дат в число:

from etna.transforms import LambdaTransform def date_to_int(column: pd.DataFrame) -> pd.DataFrame: """ Получаем дату из индекса и вычисляем разницу в днях от константной даты """ mix_date = column.index.min() column = column \ .apply(lambda x: (x.index - mix_date).days) \ .div(7) \ .add(1) \ .astype('float') return column # Объявим лямбда преобразование, применяющее пользовательскую функцию к столбцу date_to_int_transform = LambdaTransform( in_column='target', # столбец, к которому применяется преобразование inplace=False, # результат преобразования не перезаписывает in_column out_column='step', # столбец с результатом выполнения функции transform_func=date_to_int, # функция преобразования inverse_transform_func=lambda x: x, # обратное преобразование не требуется ) date_to_int_transform.fit_transform(train_ts) train_ts.head(3)da2655008efb2e0b76077c3fb19c68de.png

Убедившись, что преобразование работает корректно, добавим его в пайплайн для линейной модели и сделаем новый прогноз. Здесь пайплайн наконец-то раскрывает свой потенциал: он создает новый признак для тренировочной и тестовой выборок, выполняет его стандартизацию и передает в модель:

from etna.models import LinearPerSegmentModel from etna.transforms import StandardScalerTransform # Объявляем линейную модель обучаемую посегментно model = LinearPerSegmentModel() # Объявляем последовательность преобразований base_transforms = [ date_to_int_transform, StandardScalerTransform() ] base_pipeline = Pipeline( model=model, transforms=base_transforms, horizon=HORIZON ) # Явно указываем, что столбец step это регрессор, # Иначе модель его "не увидит" train_ts.regressors.append('step') base_pipeline.fit(train_ts) base_forecast_ts = base_pipeline.forecast() plot_forecast( train_ts=train_ts, test_ts=test_ts, forecast_ts={ 'naive':naive_forecast_ts, 'linear_base':base_forecast_ts } )a6babc8ff04fd5ed7ded6d76538b7201.png

Проверим метрики:

linear_base_wape = wape(y_true=test_ts, y_pred=base_forecast_ts) df_metric['linear_base'] = pd.Series(linear_base_wape) df_metric.round(2)2daac640ea5757b1789035c282bd4ee5.png

Линейной регрессии с простым трендом удалось снизить ошибку относительно бейзлайна лишь в половине случаев. В остальных случаях ошибка, наоборот, выросла — в два и в четыре раза.

Просчитались, но где?

Первый штурм не увенчался успехом. Посмотрим, какое еще снаряжение есть в нашем альпинистском наборе дата-сайентиста.

Обходим трещины и лавины: обнаруживаем и исключаем аномалии

Etna из коробки предлагает готовые методы для детектирования аномалий вместе с удобными средствами визуализации. Ниже приведены примеры некоторых из них.

Метод гистограмм – данные аппроксимируются с помощью гистограммы, где точки, удаление которых снижает ошибку аппроксимации, считаются аномалиями:

from etna.analysis import get_anomalies_hist from etna.analysis import plot_anomalies anomaly_dict = get_anomalies_hist(train_ts, bins_number=4) plot_anomalies(train_ts, anomaly_dict)fc97a25396ee80f7710a8730fe10d0e5.png

Вишенкой на торте является интерактивная визуализация, которая позволяет подбирать параметры детектора и сразу видеть, какие точки будут отмечены как аномальные.

Рассмотрим это на примере метода медианы. Его логика проста: для каждой точки вычисляется медиана и стандартное отклонение (std) внутри скользящего окна заданного размера (window_size). Точка считается аномалией, если её значение отклоняется от медианы больше, чем на alpha * std. Параметр alpha регулирует чувствительность, а window_size задаёт локальность оценки:

from etna.analysis import get_anomalies_median from etna.analysis import plot_anomalies_interactive plot_anomalies_interactive( ts=train_ts, segment='db_service_2', method=get_anomalies_median, params_bounds={'window_size': (10, 30, 10), 'alpha':(2, 3, 0.5)} )53eadf556b4f658869af8c45de2a2889.gif

Универсального рецепта для выбора метода детекции аномалий не существует - всё зависит от природы временного ряда и решаемой задачи.

В Etna работа с аномалиями выстроена в два логических этапа:

Первый этап - обнаружение и разметка. С помощью соответствующего преобразования заменим аномальные значения на NaN:

import copy from etna.transforms import MedianOutliersTransform anomalies_median_best_params = {'window_size': 20, 'alpha': 2.5} outliers_remover_transform = MedianOutliersTransform( in_column="target", **anomalies_median_best_params ) train_ts_wo_anomaly = copy.deepcopy(train_ts) train_ts_wo_anomaly.fit_transform([outliers_remover_transform]) train_ts_wo_anomaly.plot()2435624dde5a42eae35bfbffd0cf8562.png

Второй этап - заполнение пропусков. Образовавшиеся пропуски заполним с помощью TimeSeriesImputerTransform. После этого визуализируем ряды, чтобы убедиться в адекватности замены:

from etna.transforms import TimeSeriesImputerTransform from etna.analysis import plot_imputation outliers_imputer_transform = TimeSeriesImputerTransform(in_column="target", strategy="forward_fill") plot_imputation(imputer=outliers_imputer_transform, ts=train_ts_wo_anomaly) train_ts_wo_anomaly.fit_transform([outliers_imputer_transform])027d90dfa574a9f28b72e834a27b3e63.png

Теперь соберем новый пайплайн, включив в него этапы обработки аномалий, и проверим, удалось ли улучшить прогноз:

anomaly_transforms = [ outliers_remover_transform, outliers_imputer_transform, date_to_int_transform, StandardScalerTransform() ] anomaly_pipeline = Pipeline( model=model, transforms=anomaly_transforms, horizon=HORIZON ) anomaly_pipeline.fit(train_ts) anomaly_forecast_ts = anomaly_pipeline.forecast() plot_forecast( train_ts=train_ts_wo_anomaly, test_ts=test_ts, forecast_ts={ 'naive':naive_forecast_ts, 'linear_base':base_forecast_ts, 'linear_anomaly':anomaly_forecast_ts, } )42e9d3740f659a40910761aba9a2f0e2.png

С аномалиями на ряде db_service_3 мы справились, однако прогноз для рядов db_service_1 и db_service_4 по-прежнему страдает от исторических изменений тренда и сдвигов таргета.

Первая мысль — отбросить «нестабильную» историю и оставить для прогноза только последний период с более-менее постоянным уровнем тренда / таргета. Но как его точно найти? И что, если этот период окажется слишком зашумлённым или коротким, чтобы на нем можно было обнаружить потенциальную аномалию?

Что, если не отбрасывать данные, а, наоборот, обогатить их, разметив все участки с изменениями? Идея в следующем: выделить отрезки между точками смены тренда или уровня и передать эти метки модели в качестве признаков. Тогда у неё появится шанс научиться обобщать закономерности внутри каждого участка.

Находим перевалы: детектируем точки изменения тренда

Для обнаружения точек изменения тренда в Etna есть готовый функционал, включая инструменты для их визуализации. Под капотом используется библиотека Ruptures, специализирующаяся на обнаружении точек разладки (changepoints) в нестационарных временных рядах.

37ae8500842f66a209c1efac14aa5bd2.png

Работа строится по знакомой схеме, похожей на обработку аномалий:

import ruptures as rpt from etna.analysis import find_change_points from etna.analysis import plot_time_series_with_change_points change_points = find_change_points( ts=train_ts_wo_anomaly, in_column="target", change_point_model=rpt.Pelt(model='normal'), pen=10 ) plot_time_series_with_change_points( ts=train_ts_wo_anomaly, change_points=change_points )09c9187fcf5629b3c35d6d062ebf4fb0.png

И, конечно, здесь тоже есть возможность интерактивного подбора параметров детектора точек разладки:

from etna.analysis import plot_change_points_interactive params_bounds = { "min_size": [0, 20, 10], # мин. размер сегмента "pen": [0, 40, 10], # размер штрафа "jump": [1, 10, 1], # шаг поиска сегментов } plot_change_points_interactive( ts=train_ts_wo_anomaly, change_point_model=rpt.Pelt, model='normal', params_bounds=params_bounds, model_params=["min_size" ,"jump"], predict_params=["pen"], figsize=(6, 3) )34d071c1d7bab1c1da60bba54b864ece.gif

Как и в случае с детектированием аномалий, универсального решения для поиска точек разладки не существует. В библиотеке Ruptures реализовано несколько алгоритмов поиска, что позволяет выбрать наиболее подходящий вариант для решаемой задачи.

Встраивание детектора точек разладки в пайплайн состоит из двух этапов:

Первый этап - создание Etna-совместимой обёртки. Инициализириуем класс-адаптер для модели Ruptures, чтобы она могла работать внутри экосистемы Etna:

from etna.transforms.decomposition import RupturesChangePointsModel change_points_model_best_params = {'min_size': 10, 'jump':1} change_points_predict_best_params = {'pen': 30} change_points_model = RupturesChangePointsModel( rpt.Pelt(model='normal', **change_points_model_best_params), **change_points_predict_best_params )

Второй этап - применение преобразования ChangePointsSegmentationTransform. Оно использует ранее созданную обёртку для разметки ряда на сегменты и создания категориального признака:

from etna.transforms import ChangePointsSegmentationTransform change_points_transform = ChangePointsSegmentationTransform( in_column='target', change_points_model=change_points_model, out_column='subsegment_id' )

Так как линейная модель интерпретирует категориальный признак subsegment_id как обычное число (что бессмысленно), его необходимо преобразовать с помощью one-hot encoding (OHE):

from etna.transforms import OneHotEncoderTransform ohe_transform = OneHotEncoderTransform( in_column='subsegment_id', out_column='subsegment', return_type='numeric' ) train_ts_wo_trend = copy.deepcopy(train_ts_wo_anomaly) train_ts_wo_trend.fit_transform([change_points_transform, ohe_transform]) train_ts_wo_trend[: ,'db_service_2', :]f4475a33360e79ae98a845b3ec70ffca.png

Важный нюанс: для прогнозного периода это преобразование будет проставлять флаги, соответствующие последней известной дате, поскольку мы не можем предсказать будущие точки изменения.

На вершине: сводим всё воедино и любуемся панорамой прогноза

Наконец-то мы готовы собрать наш финальный пайплайн!

Рейнджеры Преобразования, объединяемся!
Рейнджеры Преобразования, объединяемся!

from etna.transforms import FilterFeaturesTransform subsegment_transforms = [ outliers_remover_transform, outliers_imputer_transform, change_points_transform, ohe_transform, FilterFeaturesTransform(exclude=['subsegment_id']), date_to_int_transform, StandardScalerTransform(in_column=['step']) ] subsegment_pipeline = Pipeline( model=model, transforms=subsegment_transforms, horizon=HORIZON ) subsegment_pipeline.fit(train_ts) subsegment_forecast_ts = subsegment_pipeline.forecast() plot_forecast( train_ts=train_ts_wo_anomaly, test_ts=test_ts, forecast_ts={ 'naive':naive_forecast_ts, 'linear_base':base_forecast_ts, 'linear_anomaly':anomaly_forecast_ts, 'linear_subsegment':subsegment_forecast_ts, } )6b7970b2ae1a01d82657f6402358b102.png

subsegment_wape = wape(y_true=test_ts, y_pred=subsegment_forecast_ts) df_metric['linear_subsegment'] = pd.Series(subsegment_wape) df_metric.round(2)895d48fa73b35bbbb74b746f7067e9a2.png

Временной ряд db_service_1 покорен! Но настоящий вызов ждет впереди – проверка на оставшихся 200+ рядах!

d69d36968e587abd93988971a4b3ec98.png

Справится ли наш подход с таким масштабом?

Теперь проявляется вся мощь пайплайна: для масштабирования достаточно передать новые данные в уже настроенные цепочки преобразований:

df_all = pd.read_csv('data_all.csv', parse_dates=['timestamp']) ts_all = TSDataset(df_all, freq='W-MON') train_all_ts, test_all_ts = ts_all.train_test_split(test_size=HORIZON) train_all_ts.regressors.append('step') num_segments = test_all_ts.describe()['num_segments'][0] pipelines = [ ['naive', naive_pipeline], ['linear_base', base_pipeline], ['linear_anomaly', anomaly_pipeline], ['linear_subsegment', subsegment_pipeline] ] wape_macro = WAPE(mode='macro') metrics = {} for alias, test_pipeline in pipelines: test_pipeline.fit(train_all_ts) test_forecast_ts = test_pipeline.forecast() metrics[alias] = wape_macro( y_true=test_all_ts, y_pred=test_forecast_ts ) pd.Series(metrics, name=f'WAPE of {num_segments} segments').round(2)87b2be0f72acd1d0289ed727987502b0.png

Финальный пайплайн уверенно берёт вершину — он снижает ошибку WAPE на 4 процентных пункта относительно наивного бейзлайн-прогноза.

Заключение: с вершины к новым горизонтам

Наше восхождение на гору временных рядов подошло к концу. Мы разбили базовый лагерь (подготовка данных, наивная модель), осторожно обошли трещины аномалий, отыскали ключевые перевалы – точки смены трендов. Наконец, мы взошли на вершину, отмасштабировав решение на 200+ рядов, добившись улучшения метрик.

Библиотека Etna в этом путешествии стала надежным шерпой. Она не только помогла взглянуть на данные под новым углом благодаря удобной визуализации, но и автоматизировала рутину - подготовку данных и прогноз для 200+ рядов без единого цикла for.

Однако стоит упомянуть, что Etna не всегда будет лучшим выбором, ведь её основное предназначение - прогноз множества бизнес-рядов средней длины. За этими рамками её сильные стороны могут нивелироваться:

  • Прогноз для одного-двух рядов. Весь пайплайновый аппарат Etna будет похож на стрельбу из пушки по воробьям. Проще использовать, например, Prophet.

  • Миллионы рядов или последовательности длиной в сотни тысяч наблюдений. Производительность может не удовлетворить SLA, здесь стоит смотреть в сторону оптимизированных под такой масштаб решений, например PySpark.

  • Эксперименты с экзотическими моделями. Если ваш фокус — research и тестирование хайповых нейросетевых архитектур, то фреймворки вроде PyTorch Forecasting или Darts дадут больше свободы.

На какие еще инструменты прогноза временных рядов также стоит обратить внимание:

  • Prophet – мощный инструмент для обработки сезонности и трендов с учетом праздников.

  • Sktime – фреймворк с унифицированным интерфейсом для всестороннего анализа временных рядов, включая прогнозирование, классификацию, кластеризацию и регрессию, с поддержкой современных алгоритмов.

  • Greykite – библиотека от LinkedIn, способная учитывать сложные паттерны, такие как множественные сезонности и эффекты взаимодействий, с помощью собственного алгоритма прогноза Silverkite.

Теперь пора ставить новые цели и использовать добытые знания. Удачи в покорении ваших данных!

При подготовке данной статьи использовались материалы из документации Etna.

P.S. С какими неожиданными «горными препятствиями» сталкивались вы в своих временных рядах? Делитесь в комментариях.

Источник

Отказ от ответственности: Статьи, размещенные на этом веб-сайте, взяты из общедоступных источников и предоставляются исключительно в информационных целях. Они не обязательно отражают точку зрения MEXC. Все права принадлежат первоисточникам. Если вы считаете, что какой-либо контент нарушает права третьих лиц, пожалуйста, обратитесь по адресу [email protected] для его удаления. MEXC не дает никаких гарантий в отношении точности, полноты или своевременности контента и не несет ответственности за любые действия, предпринятые на основе предоставленной информации. Контент не является финансовой, юридической или иной профессиональной консультацией и не должен рассматриваться как рекомендация или одобрение со стороны MEXC.