В предыдущей статье я детально рассмотрел пример анализа временного ряда с помощью сверточной нейронной сети.
В этой статье я сделаю небольшое упрощение функции подготовки временного ряда для анализа сверточной нейронной сетью. Опущу теорию в части зачем выполняется такая подготовка, поскольку детали подробно разобраны в предыдущей статье.
Сформируем тестовую последовательность.
num = 20 xData = np.arange(0, num).reshape(num,1) print(xData) stepsForward = 1 xLen = 5 valLen = 3 xChannels = 0 print("xData.shape", xData.shape) print("Range:", xData.shape[0] - xLen + 1 - stepsForward)
[[ 0] [ 1] [ 2] ... [16] [17] [18] [19]] xData.shape (20, 1) Range: 15
Немного подправлю функцию для нормализации данных, добавив возможность указать тип нормализации -1, т.е. без нормализации. При этом функция просто возвращается столбцы отобранные по переданному списку Channels:
#data - Numpy array #Normalization = 0 - нормальное распределение, 1 - к отрезку [0;1], -1 - не нормализовывать def DataNormalization(data, Channels, Normalization): #Выбираем тип нормализации x #0 - нормальное распределение #1 - нормирование до отрезка 0-1 if (Normalization == 0): scaler = StandardScaler() if (Normalization == 1): scaler = MinMaxScaler() #Берём только те каналы, которые указаны в аргументе функции resData = data[:,Channels] if (len(resData.shape) == 1): #Если размерность входного массива - одномерный вектор, print("Add one dimension") resData = np.expand_dims(resData, axis=1) #то добавляем размерность if (Normalization == -1): scaler = np.zeros(resData.shape[0]) return (resData, scaler) #Обучаем нормировщик scaler.fit(resData) #Нормируем данные resData = scaler.transform(resData) return (resData, scaler)
Теперь сама функция по «раскусыванию» временного ряда для подготовки к передаче на сверточную сеть:
#Функция "раскусывания" данных для временных рядов #data - данные #xLen - размер "окна", по которому предсказываем #xChannels - лист, номера каналов, по которым делаем анализ #yChannels - лист, номера каналов, которые предсказываем #stepsForward - на сколько шагов предсказываем в будущее # если 1 - то на 1 шаг, можно использовать только при одном канале, указанном в yChannels #xNormalization - нормализация входных каналов, 0 - нормальное распределение, 1 - к отрезку [0;1], -1 - не нормализовывать #yNormalization - нормализация прогнозируемых каналов, 0 - нормальное распределение, 1 - к отрезку [0;1], -1 - не нормализовывать #returnFlatten - True - если на выходе получить одномерный вектор для Dense сетей #valLen - сколько примеров брать для проверочной выборки (количество для обучающей посчитается автоматиески) #convertToDerivative - bool, преобразовывали ли входные сигналы в производную def getTrainSeq(data, xLen, xChannels, yChannels, stepsForward, xNormalization, yNormalization, returnFlatten, valLen, convertToDerivative): #Если указано превращение данных в производную #То вычитаем поточечно из текущей точки предыдущую if (convertToDerivative): data = np.array([(d[1:]-d[:-1]) for d in data.T]).copy().T else: if isinstance(data,(pd.core.series.Series, pd.core.frame.DataFrame)): #Проверяем, если на входе Pandas - то, берем values, для получения numpy Array print("Convert Pandas.Series to Numpy array") data = data.values #Нормализуем данные (xData, xScaler) = DataNormalization(data, xChannels, xNormalization) (yData, yScaler) = DataNormalization(data, yChannels, yNormalization) valLen = valLen + xLen - 1 + stepsForward #Вычисляем сколько данных с конца нужно взять, чтобы размерность xVal/yVal была valLen #Разбивка входного ряда до обработки xTrain = xData[:xData.shape[0]-valLen] yTrain = yData[:yData.shape[0]-valLen] xVal = xData[xData.shape[0]-valLen:] yVal = yData[yData.shape[0]-valLen:] xTrain = np.array([xTrain[i:i + xLen, xChannels] for i in range(xTrain.shape[0] - xLen + 1 - stepsForward)]) yTrain = np.array([yTrain[i:i + stepsForward, yChannels] for i in range(xLen, yTrain.shape[0] + 1 - stepsForward)]) xVal = np.array([xVal[i:i + xLen, xChannels] for i in range(xVal.shape[0] - xLen + 1 - stepsForward)]) yVal = np.array([yVal[i:i + stepsForward, yChannels] for i in range(xLen, yVal.shape[0] + 1 - stepsForward)]) #Если в функцию передали вернуть flatten сигнал (для Dense сети), то xTrain и xVal превращаем в flatten if (returnFlatten == True): xTrain = np.array([x.flatten() for x in xTrain]) xVal = np.array([x.flatten() for x in xVal]) return (xTrain, yTrain), (xVal, yVal), (xScaler, yScaler)
Протестируем работу функции на тестовом numpy.array:
xNormalization = -1 yNormalization = -1 xChannels = [0] yChannels = [0] convertToDerivative = False returnFlatten = False stepsForward = 1 xLen = 5 valLen = 3 (xTrain, yTrain), (xVal, yVal), (xScaler, yScaler) = getTrainSeq(xData, xLen, xChannels, yChannels, stepsForward, xNormalization, xNormalization, returnFlatten, valLen, convertToDerivative) print("xTrain:\r\n", xTrain) print("yTrain:\r\n", yTrain) print("xVal:\r\n", xVal) print("yVal:\r\n", yVal)
Как и указывалось, valLen = 3, т.е. на проверочную выборку отводится 3, а остальное на обучающую выборку.
xTrain: [[ 0 1 2 3 4] [ 1 2 3 4 5] [ 2 3 4 5 6] [ 3 4 5 6 7] [ 4 5 6 7 8] [ 5 6 7 8 9] [ 6 7 8 9 10]] yTrain: [[ 5] [ 6] [ 7] [ 8] [ 9] [10] [11]] xVal: [[12 13 14 15 16] [13 14 15 16 17] [14 15 16 17 18]] yVal: [[17] [18] [19]]
Нейронная сеть будет пытаться найти закономерности между каждой серией X длиной xLen и Y длиной stepsForward. Если
xLen = 300 #Анализируем по 300 прошедшим точкам stepsForward = 10 #Тренируем сеть для предсказания на 10 шагов вперед xChannels = range(data.shape[1]) #Используем все входные каналы yChannels = 0 #Предказываем только open канал xNormalization = 0 #Нормируем входные каналы стандартным распределением yNormalization = 0 #Нормируем выходные каналы стандартным распределением valLen = 30000 #Используем 30.000 записей для проверки0 returnFlatten = False #Если True, то вернуть одномерные векторы, если False, то двумерные convertToDerivative = False #Не True, то превращать в производную (xTrain, yTrain), (xVal, yVal), (xScaler, yScaler) = getTrainSeq(data, xLen, xChannels, yChannels, stepsForward, xNormalization, yNormalization, returnFlatten, valLen, convertToDerivative) #Выводим размеры данных для проверки print("xTrain.shape:", xTrain.shape) print("yTrain.shape:", yTrain.shape) print("xVal.shape:", xVal.shape) print("yVal.shape:", yVal.shape)
Convert Pandas.Series to Numpy array Add one dimension xTrain.shape: (136745, 300, 1) yTrain.shape: (136745, 10) xVal.shape: (30000, 300, 1) yVal.shape: (30000, 10)
Прогноз курса GBPUSD по данным с bloomberg
Подгружаем данные спарсенные с bloomberg по курсам валют в csv файл.
#Считываем данные с помощью pandas base_data = pd.read_csv('bloomberg.txt', sep=';') print(base_data.shape)
Отбираем данные только по курсам GBPUSD:
GBPUSD = base_data[base_data['ticker'] == 'GBPUSD']
Парсим данные из колонки date_time в соответствии с шаблоном и пересохраняем колонку ‘date_time’.
GBPUSD['date_time'] = pd.to_datetime(GBPUSD['date_time'], format='%d.%m.%Y %H:%M:%S') GBPUSD.info()
Теперь в этой колонке не строка, а тип datetime64.
Int64Index: 167363 entries, 4 to 500413 Data columns (total 3 columns): date_time 167363 non-null datetime64[ns] ticker 167363 non-null object value 167363 non-null float64 dtypes: datetime64[ns](1), float64(1), object(1)
Поскольку после фильтрации данных в index выпады, переиндексируем массив, присвоив колонку с типом дата в индекс и попутно удалив колонки ‘date_time’ и ‘ticker’ за ненадобностью.
data = GBPUSD data = data.drop(columns=['date_time','ticker'], axis=1) data.index = GBPUSD.date_time
Отрисуем временной ряд:
fig, ax = plt.subplots(1, 1, figsize=(15, 8)) data.plot(ax=ax, lw=.5) #ax.set_ylim(min, max) ax.set_xlabel('Date') ax.set_ylabel('GBPUSD rate')

Отмечу, что по оси x отображаются даты, а не номера отсчетов.
Если сделать прогноз временного ряда курсов GBPUSD взятых с bloomberg на 10 шагов вперед, то визуально видно, что прогноз довольно плотно огибает исходный ряд.

Scatter оригинального временного ряда и прогноза на 10 шаговпо проверочной выборке.

Чтобы нейронка смогла выявлять какие-то сезонные закономерности нужно в качестве каналов добавить параметры даты и/или времени. Как это сделать? Подавать параметры даты лучше по отдельности. Например, 22.12.2019 11:59:55 -> 5 каналов: 22;12;11;59;55. Нужно ли подавать год — это вопрос.
При наличии ежегодного цикла (можно выявить с помощтю преобразования Фурье) нейронке достаточно информации из столбца месяц, чтобы вытащить информацию о годовой сезонности.
Если во временном ряде присутствует сезонность с периодом больше, чем 1 год, возможно, в качестве отдельного канала стоит добавить год.
Пример добавления в Pandas.DataFrame колонок с датами с последующей сортировкой колонок, чтобы можно было исключить колонку с годом.
GBPUSD['Month'] = GBPUSD['date_time'].dt.month GBPUSD['Day'] = GBPUSD['date_time'].dt.day GBPUSD['Hour'] = GBPUSD['date_time'].dt.hour GBPUSD['Minute'] = GBPUSD['date_time'].dt.minute GBPUSD['Second'] = GBPUSD['date_time'].dt.second GBPUSD['Year'] = GBPUSD['date_time'].dt.year GBPUSD = GBPUSD.drop(columns=['date_time'], axis=1) GBPUSD.columns = ['GBPUSD', 'Month', 'Day', 'Hour', 'Minute', 'Second', 'Year']
День недели/месяц, некоторые флаги событий (например, праздник, выходной или обычный день), можно подавать как числовые метрики: 1/0 — праздник/не праздник; 1-7 день недели.