https://habrahabr.ru/post/324244/- Программирование
- Машинное обучение
- Python
- PostgreSQL
- API
Сегодня предлагаю поразмышлять о том, как искать паттерны в биржевых данных и как их использовать для успешной торговли.
Будем получать биржевые данные Forex от одного из брокеров, сохраним в базу данных PostgreSQL и попробуем найти закономерности при помощи алгоритмов машинного обучения.
В статье есть несколько приятных бонусов в виде кода на Python — Вы сможете сами проанализировать любые (почти) биржевые данные (или значения индикаторов), запустить собственного торгового робота и проверить любую торговую стратегию.
Все условия и определения паттернов в статье приведены для примера, вы можете использовать любые критерии.
Что такое паттерн и как его использовать?
Паттерн — это устойчивая, повторяющаяся фигура последовательных биржевых данных, после возникновения которой цена с большой вероятностью изменится в нужную сторону.
Проанализировать статистику, для того, чтобы найти повторяющиеся закономерности — задача не из легких, но если зависимости удается найти, то предсказать движение цены удается достаточно точно. При помощи методов машинного обучения поиск паттернов сводится к выбору наилучшего классификатора — алгоритма, обучающегося на исторических данных и прогнозирующего движение цены с определенной вероятностью.
Такой механизм вполне может стать частью успешной торговой стратегии в совокупности с другими методами анализа рынка.
Подготовка
- Для того, чтобы получать исторические данные и выставлять заявки на Forex через RESTv20 API, нам потребуется demo счет у известного брокера. Регистрация занимает минуту, после чего Вы получаете token (уникальный ключ для доступа) и номер счета.
- Необходим Python версии 2.7 с установленными библиотеками: oandapyV20, sklearn, matplotlib, numpy, psycopg2. Их можно установить через pip.
- Необходим PostgreSQL, у меня версия 9.6.
Описание модели
Самое первое, что нужно описать — собственно, исторические данные.
Создадим класс Candle, который будет хранить информацию о каждой свече:
class Candle:
def __init__(self, datetime, ask, bid, volume):
self.datetime = datetime
self.ask = ask
self.bid = bid
self.volume = volume
Описание паттерна будет таким:
class Pattern:
result = ''
serie = list()
def __init__(self, serie, result):
self.serie = serie
self.result = result
Каждой серии данных будет соответствовать результат, в нашем случае, покупка или продажа.
Здесь нужно не забыть, что нас интересует
форма. Это значит, просто ценами паттерн описывать не верно, необходима их нормализация. Об этом ниже.
Введем еще два параметра:
- Длина серии (Length) — количество последовательных элементов в серии паттерна
- Ширина окна (window size) — количество последовательных элементов после серии, для хотя бы одного из которых выполняется условие выбора паттерна
Какими будут условия для выбора паттернов? — такими, чтобы получить прибыль:
Если мы покупаем по цене
ask = X, то продать должны по возросшей цене
bid > X. И наоборот, если мы продаем по цене
bid = Y, то купить должны по цене
ask < Y. В этом изменение цены будет больше спреда на момент покупки, и мы получим прибыль.
Сегодня я предлагаю использовать эти простые правила для отбора паттернов, но, на самом деле, чтобы все хорошо работало, к ним нужно добавить несколько фильтров. Это я предлагаю сделать Вам позже самостоятельно. Не забывайте, что выбор исходных данных (периода, рынка, инструмента и тп) очень важен — где-то паттерны есть, а где-то нет. Или нужно изменить условия их отбора.
Получаем данные
Получим данные от брокера и сохраним их в БД PostgreSQL.
Первым делом, создадим класс, который будет загружать данные:
import pandas
from oandapyV20.endpoints import instruments
class StockDataDownloader(object):
def get_data_from_finam(self, ticker, period, marketCode, insCode, dateFrom, dateTo):
"""Downloads data from FINAM.ru stock service"""
addres = 'http://export.finam.ru/data.txt?market=' + str(marketCode) + '&em=' + str(insCode) + '&code=' + ticker + '&df=' + str(dateFrom.day) + '&mf=' + str(dateFrom.month-1) + '&yf=' + str(dateFrom.year) + '&dt=' + str(dateTo.day) + '&mt=' + str(dateTo.month-1) + '&yt=' + str(dateTo.year) + '&p=' + str(period + 2) + 'data&e=.txt&cn=GAZP&dtf=4&tmf=4&MSOR=1&sep=1&sep2=1&datf=5&at=1'
return pandas.read_csv(addres)
def get_data_from_oanda_fx(self, API, insName, timeFrame, dateFrom, dateTo):
params = 'granularity=%s&from=%s&to=%s&price=BA' % (timeFrame, dateFrom.isoformat('T') + 'Z', dateTo.isoformat('T') + 'Z')
r = instruments.InstrumentsCandles(insName, params=params)
API.request(r)
return r.response
Бонус: я оставил в этом классе метод, который загружает любые исторические данные с Финама. Это очень удобно, потому что можно проанализировать как Forex, так и рынки ММВБ и ФОРТС. Минус только в том, что данные могут быть загружены с периодом не менее 1 минуты, в то время как второй метод может загрузить 5-секундные свечи.
Теперь сделаем простой скрипт, которые загружает данные в БД:
import psycopg2
from StockDataDownloader import StockDataDownloader
from Conf import DbConfig, Config
from datetime import datetime, timedelta
import oandapyV20
import re
step = 60*360 # download step, s
daysTotal = 150 # download period, days
dbConf = DbConfig.DbConfig()
conf = Config.Config()
connect = psycopg2.connect(database=dbConf.dbname, user=dbConf.user, host=dbConf.address, password=dbConf.password)
cursor = connect.cursor()
print 'Successfully connected'
cursor.execute("SELECT * FROM pg_tables WHERE schemaname='public';")
tables = list()
for row in cursor:
tables.append(row[1])
for name in tables:
cmd = "DROP TABLE " + name
print cmd
cursor.execute(cmd)
connect.commit()
tName = conf.insName.lower()
cmd = ('CREATE TABLE public."{0}" (' \
'datetimestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL,' \
'ask FLOAT NOT NULL,' \
'bid FLOAT NOT NULL,' \
'volume FLOAT NOT NULL,' \
'CONSTRAINT "PK_ID" PRIMARY KEY ("datetimestamp"));' \
'CREATE UNIQUE INDEX timestamp_idx ON {0} ("datetimestamp");').format(tName)
cursor.execute(cmd)
connect.commit()
print 'Created table', tName
downloader = StockDataDownloader.StockDataDownloader()
oanda = oandapyV20.API(environment=conf.env, access_token=conf.token)
def parse_date(ts):
# parse date in UNIX time stamp
return datetime.fromtimestamp(float(ts))
date = datetime.utcnow() - timedelta(days=daysTotal)
dateStop = datetime.utcnow()
candleDiff = conf.candleDiff
if conf.candlePeriod == 'M':
candleDiff = candleDiff * 60
if conf.candlePeriod == 'H':
candleDiff = candleDiff * 3600
last_id = datetime.min
while date < dateStop - timedelta(seconds=step):
dateFrom = date
dateTo = date + timedelta(seconds=step)
data = downloader.get_data_from_oanda_fx(oanda, conf.insName, '{0}{1}'.format(conf.candlePeriod, conf.candleDiff),
dateFrom, dateTo)
if len(data.get('candles')) > 0:
cmd = ''
cmd = ('INSERT INTO {0} VALUES').format(tName)
cmd_bulk = ''
for candle in data.get('candles'):
id = parse_date(candle.get('time'))
volume = candle.get('volume')
if volume != 0 and id!=last_id:
cmd_bulk = cmd_bulk + ("(TIMESTAMP '{0}',{1},{2},{3}),\n"
.format(id, candle.get('ask')['c'], candle.get('bid')['c'],
volume))
last_id = id
if len(cmd_bulk) > 0:
cmd = cmd + cmd_bulk[:-2] + ';'
cursor.execute(cmd)
connect.commit()
print ("Saved candles from {0} to {1}".format(dateFrom, dateTo))
date = dateTo
cmd = "REINDEX INDEX timestamp_idx;"
print cmd
cursor.execute(cmd)
connect.commit()
connect.close()
Если вы внимательно посмотрите на данные от Oanda, то увидите, что некоторые свечи пропущены. Причем, чем меньше период загружаемых данных, тем больше пропусков. Это не ошибка, а связано с тем, что цена за время пропусков не изменилась. Поэтому есть два способа загрузки таких данных — сохранять как есть, или добавлять пропущенные свечи со значениями, аналогичными последней свече от брокера с нулевым объемом. В
репозитории на Github реализованы оба варианта, последний закомментирован. Так же, если Вы посчитаете нужным добавлять пропущенные свечи, есть скрипт DbCheck.py, проверяющий правильность последовательности свечей для этого случая.
Анализ данных
Сделаем простой класс, который будет содержать методы для поиска паттернов и преобразовывать их в векторы для алгоритмов машинного обучения:
import psycopg2
from Conf import DbConfig, Config
from Desc.Candle import Candle
from Desc.Pattern import Pattern
import numpy
def get_patterns_for_window_and_num(window, length, limit=None):
conf = Config.Config()
dbConf = DbConfig.DbConfig()
connect = psycopg2.connect(database=dbConf.dbname, user=dbConf.user, host=dbConf.address, password=dbConf.password)
cursor = connect.cursor()
print 'Successfully connected'
tName = conf.insName.lower()
cmd = 'SELECT COUNT(*) FROM {0};'.format(tName)
cursor.execute(cmd)
totalCount = cursor.fetchone()[0]
print 'Total items count {0}'.format(totalCount)
cmd = 'SELECT * FROM {0} ORDER BY datetimestamp'.format(tName)
if limit is None:
cmd = '{0};'.format(cmd)
else:
cmd = '{0} LIMIT {1};'.format(cmd, limit)
cursor.execute(cmd)
wl = list()
patterns = list()
profits = list()
indicies = list()
i = 1
for row in cursor:
nextCandle = Candle(row[0], row[1], row[2], row[3])
wl.append(nextCandle)
print 'Row {0} of {1}, {2:.3f}% total'.format(i, totalCount, 100*(float(i)/float(totalCount)))
if len(wl) == window+length:
# find pattern of 0..length elements
# that indicates price falls / grows
# in the next window elements to get profit
candle = wl[length-1]
ind = length + 1
# take real data only
if candle.volume != 0:
while ind <= window + length:
iCandle = wl[ind-1]
# define patterns for analyzing iCandle
if iCandle.volume != 0:
if iCandle.bid > candle.ask:
# buy pattern
p = Pattern(wl[:length],'buy')
patterns.append(p)
indicies.append(ind - length)
profits.append(iCandle.bid - candle.ask)
break
if iCandle.ask < candle.bid:
# sell pattern
p = Pattern(wl[:length],'sell')
patterns.append(p)
indicies.append(ind - length)
profits.append(candle.bid - iCandle.ask)
break
ind = ind + 1
wl.pop(0)
i = i + 1
print 'Total patterns: {0}'.format(len(patterns))
print 'Mean index[after]: {0}'.format(numpy.mean(indicies))
print 'Mean profit: {0}'.format(numpy.mean(profits))
connect.close()
return patterns
def pattern_serie_to_vector(pattern):
sum = 0
for candle in pattern.serie:
sum = sum + float(candle.ask + candle.bid) / 2;
mean = sum / len(pattern.serie)
vec = []
for candle in pattern.serie:
vec = numpy.hstack((vec, [ (candle.ask+candle.bid) / (2 * mean) ]))
return vec
def get_x_y_for_patterns(patterns, expected_result):
X = []
y = []
for p in patterns:
X.append(pattern_serie_to_vector(p))
if (p.result == expected_result):
y.append(1)
else:
y.append(0)
return X, y
В первом методе как раз описываются условия для выбора паттернов, а последний возвращает векторы для алгоритмов. Обратите внимание на метод
pattern_serie_to_vector, который нормализует данные. Как уже говорилось выше, цены могут быть разные, а
форма одинаковая (аналог в тех анализе — паттерн треугольник, неважно какие цены, важно взаимное расположение последовательных свечей).
А теперь самое интересное, проверим результат работы двух классификаторов — градиентного бустинга и линейной регрессии. Будем оценивать площать под ROC кривой (AUC_ROC) для кроссвалидации по 5 блокам, в зависимости от настроек алгоритма.
Напоминаю, что площадь под ROC кривой меняет свое значение от 0.5 (самый плохой классификатор) до 1 (самый лучший классификатор). Наша цель — получить хотя бы 0.8.
Проверим несколько классификаторов и выберем наилучший, а так же длину серии паттерна и окно.
Градиентный бустинг с возможным перебором по длине серии и окну (в хорошей модели с увеличением числа деревьев точность должна расти, поэтому надо выбрать подходящую длину серии и окно):
# gradient boosting
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import KFold, cross_val_score
from sklearn.ensemble import GradientBoostingClassifier
from PatternsCollector import get_patterns_for_window_and_num, get_x_y_for_patterns
import seaborn
nums = [2,5,10]
i = 0
wrange = [1,5,10]
lrange = [5,10]
values = list()
legends = list()
for wnd in wrange:
for l in lrange:
scores = []
patterns = get_patterns_for_window_and_num(wnd, l)
X, y = get_x_y_for_patterns(patterns, 'buy')
for n in nums:
i = i+1
kf = KFold(n_splits=5, shuffle=True, random_state=100)
model = GradientBoostingClassifier(n_estimators=n, random_state=100)
ms = cross_val_score(model, X, y, cv=kf, scoring='roc_auc')
scores.append(np.mean(ms))
print 'Calculated {0}-{1}, num={2}, {3:.3f}%'.format(wnd, l, n, 100 * i/float((len(nums)*len(wrange)*len(lrange))))
values.append(scores)
legends.append('{0}-{1}'.format(wnd, l))
plt.xlabel('estimators count')
plt.ylabel('accuracy')
for v in values:
plt.plot(nums, v)
plt.legend(legends)
plt.show()
Аналогично, линейная регрессия с настройкой параметров:
# logistic regression
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from PatternsCollector import get_patterns_for_window_and_num, get_x_y_for_patterns
from sklearn.model_selection import KFold, cross_val_score
import numpy as np
import matplotlib.pyplot as plt
import seaborn
cr = [10.0 ** i for i in range(-3, 1)]
i = 0
wrange = [1,5,10]
lrange = [5, 10]
values = list()
legends = list()
for wnd in wrange:
for l in lrange:
scores = []
patterns = get_patterns_for_window_and_num(wnd, l)
X, y = get_x_y_for_patterns(patterns, 'buy')
sc = StandardScaler()
X_sc = sc.fit_transform(X)
for c in cr:
i = i+1
kf = KFold(n_splits=5, shuffle=True, random_state=100)
model = LogisticRegression(C=c, random_state=100)
ms = cross_val_score(model, X_sc, y, cv=kf, scoring='roc_auc')
scores.append(np.mean(ms))
print 'Calculated {0}-{1}, C={2}, {3:.3f}%'.format(wnd, l, c, 100 * i/float((len(cr)*len(wrange)*len(lrange))))
values.append(scores)
legends.append('{0}-{1}'.format(wnd, l))
plt.xlabel('C value')
plt.ylabel('accuracy')
for v in values:
plt.plot(cr, v)
plt.legend(legends)
plt.show()
Как я уже говорил, условия неполные. Поэтому получаем точность всего 0.52. Но если вы их дополните, то точность будет лучше. Можете попробовать другие алгоритмы — нейросети, random forest и многие другие. Нужно не забыть про проблему переобучения — например, при большом числе деревьев в градиентном бустинге.
Проверка на ошибки в коде: если вместо реальных данных в БД взять от них sin(), то для обоих классификаторов AUC_ROC на кроссвалидации будет 0.96.
Торговый робот
В заключение предлагаю вам код торгового робота, который может ставить заявки как на demo счете, так и на реальном. Самое главное — при закрытии сделок он строит гистограмму профитов по сделке, основываясь на информации, полученной от брокера. То есть, вы реально сможете проверить, как работает ваша торговая стратегия.
import datetime
from datetime import datetime
from os import path
import matplotlib.pyplot as plt
import oandapyV20
import oandapyV20.endpoints.orders as orders
import oandapyV20.endpoints.positions as positions
from oandapyV20.contrib.requests import MarketOrderRequest
from oandapyV20.contrib.requests import TakeProfitDetails, StopLossDetails
from oandapyV20.endpoints.accounts import AccountDetails
from oandapyV20.endpoints.pricing import PricingInfo
from Conf.Config import Config
import seaborn
config = Config()
oanda = oandapyV20.API(environment=config.env, access_token = config.token)
pReq = PricingInfo(config.account_id, 'instruments='+config.insName)
asks = list()
bids = list()
long_time = datetime.now()
short_time = datetime.now()
if config.write_back_log:
f_back_log = open(path.relpath(config.back_log_path + '/' + config.insName + '_' + datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))+'.log', 'a');
time = 0
times = list()
last_ask = 0
last_bid = 0
if config.write_back_log:
print 'Backlog file name:', f_back_log.name
f_back_log.write('DateTime,Instrument,ASK,BID,Price change,Status, Spread, Result \n')
def process_data(ask, bid, status):
global last_result
global last_ask
global last_bid
global long_time
global short_time
if status != 'tradeable':
print config.insName, 'is halted.'
return
asks.append(ask)
bids.append(bid)
times.append(time)
# --- begin strategy here ---
# --- end strategy here ---
if len(asks) > config.maxLength:
asks.pop(0)
if len(bids) > config.maxLength:
bids.pop(0)
if len(times) > config.maxLength:
times.pop(0)
if config.write_back_log:
f_back_log.write('%s,%s,%s,%s,%s,%s,%s \n' % (datetime.datetime.now(), config.insName, pReq.response.get('prices')[0].get('asks')[1].get('price'), pReq.response.get('prices')[0].get('bids')[1].get('price'), pChange, ask-bid, result))
def do_long(ask):
if config.take_profit_value!=0 or config.stop_loss_value!=0:
order = MarketOrderRequest(instrument=config.insName,
units=config.lot_size,
takeProfitOnFill=TakeProfitDetails(price=ask+config.take_profit_value).data,
stopLossOnFill=StopLossDetails(price=ask-config.stop_loss_value).data)
else:
order = MarketOrderRequest(instrument=config.insName,
units=config.lot_size)
r = orders.OrderCreate(config.account_id, data=order.data)
resp = oanda.request(r)
print resp
price = resp.get('orderFillTransaction').get('price')
print time, 's: BUY price =', price
return float(price)
def do_short(bid):
if config.take_profit_value!=0 or config.stop_loss_value!=0:
order = MarketOrderRequest(instrument=config.insName,
units=-config.lot_size,
takeProfitOnFill=TakeProfitDetails(price=bid+config.take_profit_value).data,
stopLossOnFill=StopLossDetails(price=bid-config.stop_loss_value).data)
else:
order = MarketOrderRequest(instrument=config.insName,
units=-config.lot_size)
r = orders.OrderCreate(config.account_id, data=order.data)
resp = oanda.request(r)
print resp
price = resp.get('orderFillTransaction').get('price')
print time, 's: SELL price =', price
return float(price)
def do_close_long():
try:
r = positions.PositionClose(config.account_id, 'EUR_USD', {"longUnits": "ALL"})
resp = oanda.request(r)
print resp
pl = resp.get('longOrderFillTransaction').get('pl')
real_profits.append(float(pl))
print time, 's: Closed. Profit = ', pl, ' price = ', resp.get('longOrderFillTransaction').get('price')
except:
print 'No long units to close'
def do_close_short():
try:
r = positions.PositionClose(config.account_id, 'EUR_USD', {"shortUnits": "ALL"})
resp = oanda.request(r)
print resp
pl = resp.get('shortOrderFillTransaction').get('tradesClosed')[0].get('realizedPL')
real_profits.append(float(pl))
print time, 's: Closed. Profit = ', pl, ' price = ', resp.get('shortOrderFillTransaction').get('price')
except:
print 'No short units to close'
def get_bal():
r = AccountDetails(config.account_id)
return oanda.request(r).get('account').get('balance')
plt.ion()
plt.grid(True)
do_close_long()
do_close_short()
real_profits = list()
while True:
try:
oanda.request(pReq)
ask = float(pReq.response.get('prices')[0].get('asks')[0].get('price'))
bid = float(pReq.response.get('prices')[0].get('bids')[0].get('price'))
status = pReq.response.get('prices')[0].get('status')
process_data(ask, bid, status)
plt.clf()
plt.subplot(1,2,1)
plt.plot(times, asks, color='red', label='ASK')
plt.plot(times, bids, color='blue', label='BID')
if last_ask!=0:
plt.axhline(last_ask, linestyle=':', color='red', label='curr ASK')
if last_bid!=0:
plt.axhline(last_bid, linestyle=':', color='blue', label='curr BID')
plt.xlabel('Time, s')
plt.ylabel('Price change')
plt.legend(loc='upper left')
plt.subplot(1, 2, 2)
plt.hist(real_profits, label='Profits')
plt.legend(loc='upper left')
plt.xlabel('Profits')
plt.ylabel('Counts')
plt.tight_layout()
except Exception as e:
print e
plt.pause(config.period)
time = time + config.period
Полный исходный код
здесь.
Я надеюсь, что сэкономил время тем, кому интересен алготрейдинг. Ведь теперь для проверки Ваших идей вам нужно лишь немного поменять код, запустить робота и получить статистику по Вашим сделкам от брокера. И вы можете проанализировать почти любые биржевые данные.
Отдельное спасибо хочу сказать авторам курса от Яндекс по машинному обучению на Coursera. А так же Andrew Ng за замечательные лекции на этом же ресурсе.