导航菜单
首页 » 问答 » 正文

基于Market Profile的CNN(Lenet)

最近在公众号上看到这篇文章,故决定复现下,并再次基础上做些修改

其论文的核心思想是将市场轮廓指标(TPO)作为输入,通过改变灰度使CNN更重视最近的时间段。算是CNN应用在时间序列上的技巧(邪术)。

首先导入相关包

import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow import keras
print('numpy     :', np.__version__)
print('pandas    :', pd.__version__)
print('tensorflow:', tf.__version__)

:

numpy: 1.20.3

: 1.3.4

: 2.8.0

数据是从这个网站弄来的,免费的数字货币数据来源。但是数据不怎么干净,需要预处理。

预处理过后是标准的OHLCV,只要数据处理成一样的格式都可以运行,date, open, 都不是必须的。

df = pd.read_csv('BTCUSDT-1h.csv', index_col=0)
close = np.array(df['close'])
high = np.array(df['high'])
low = np.array(df['low'])
df

BTC 小时数据

根据 的原理生成一张张图。

def market_profile(high, low, t, size, x_split):
    cut_list = np.linspace(0, size, x_split + 1).astype(int)
    split_len = int(size / x_split)
    matrix = np.array([[]])
    price_list = np.linspace(np.min(low[cut_list[0]:cut_list[-1]]),
                             np.max(high[cut_list[0]:cut_list[-1]]),
                             size)
    for c in range(1, len(cut_list)):
        h = high[cut_list[c - 1]:cut_list[c]]
        l = low[cut_list[c - 1]:cut_list[c]]
        out_array = np.full((size, split_len), 0)
        time_list = np.linspace(0, split_len, split_len + 1).astype(int)
        for i in range(1, len(time_list)):
            index = np.where((l[i-1] <= price_list) & (price_list <= h[i-1]))[0]
            for row in index:
                for column in range(split_len):
                    if out_array[row][column] == 0:
                        out_array[row][column] = i
                        break
        if c == 1:
            matrix = out_array
        else:
            matrix = np.hstack((matrix, out_array))
    return matrix.astype(np.float32)
T = 8
SIZE = 32
SPLIT = 4
tpo = market_profile(high, low, T, SIZE, SPLIT)
plt.matshow(tpo)
plt.show()

滚动对每32个小时生成x,并将其改为渐变灰度图。

x = []
gray_scale = np.linspace(0.6, 1, SIZE)
for i in range(SIZE, len(close)-T):
    # 生成TPO
    tpo = market_profile(high[i-SIZE:i], low[i-SIZE:i], T, SIZE, SPLIT)
    # 改变灰度值
    tpo[tpo > 0] = 1
    for i in range(len(tpo[0])):
        tpo[:, i] = tpo[:, i] * gray_scale[i]
    x.append(tpo)
x = np.array(x)
np.save('x.npy', x)
x = np.load('x.npy')
x.shape

(39897, 32, 32)

像是这样的图:

生成y, 有多,空和中性三个分类, 这里设置一个r是让多空条件明确一些,不然会有误判。y的条件可以随便设置。反转,双底,震荡理论上来说都行。

r = 0.02
y = []
for i in range(SIZE, len(close)-T):
    if (1-r)*close[i-T] > close[i] > (1+r)*close[i+T]:
        y.append(0)
    elif (1+r)*close[i-T] < close[i] < (1-r)*close[i+T]:
        y.append(2)
    else:
        y.append(1)
y = np.array(y)
np.save('y.npy', y)
y = np.load('y.npy')
y.shape

(39897,)

将x和y转换为输入需要的格式

x = x.reshape(x.shape[0], SIZE, SIZE, 1)
print(x.shape)
onehot_y = np.zeros((y.size, 3))
onehot_y[np.arange(y.size), y] = 1
print(onehot_y.shape)

(39897, 32, 32, 1)

(39897, 3)

按6-4的比例分割x,y为训练集和测试集

split = int(0.6 * len(close))
x_train = x[:split]
y_train = onehot_y[:split]
x_test = x[split:]
y_test = onehot_y[split:]
print('x_train: ', x_train.shape)
print('y_train: ', y_train.shape)
print('x_test: ', x_test.shape)
print('y_test: ', y_test.shape)

: (23962, 32, 32, 1)

: (23962, 3)

: (15935, 32, 32, 1)

: (15935, 3)

这是lenet-5的模型,我这里将原来的10分类改为了3分类

batch_size = 128
shape = (SIZE,SIZE,1)
model = keras.Sequential([
    keras.layers.Conv2D(6, 5),
    keras.layers.MaxPooling2D(pool_size=2, strides=2),
    keras.layers.ReLU(),
    keras.layers.Conv2D(16, 5),
    keras.layers.MaxPooling2D(pool_size=2, strides=2),
    keras.layers.ReLU(),
    keras.layers.Conv2D(120, 5),
    keras.layers.Flatten(),
    keras.layers.Dense(84, activation='relu'),
    keras.layers.Dense(3, activation='softmax')
])
model.build(input_shape=(batch_size, shape[0], shape[1], shape[2]))
model.summary()

训练模型

model.compile(optimizer=keras.optimizers.Adam(), loss=keras.losses.CategoricalCrossentropy(), metrics=['accuracy'])
history = model.fit(x_train, y_train, epochs=50, validation_data=(x_test, y_test))
# model.save('model.h5')
# model = keras.models.load_model('model.h5')

预测,这里的准确率没有什么参考价值因为有很多标签为1(中性)

loss, acc = model.evaluate(x_test, y_test)
print('Loss    :', loss)
print('Accuracy:', acc)
pred = model.predict(x_test)
pred

CNN使用了将结果转换为了一个总和为1的概率分布,那么我们可以将这个概率当成每天的持仓大小。并用close计算每天的收益率。(注意: 我这里只是对每小时收益率进行了简单的累积加法,在真实情况下仓位需要持有T=8天,因此实际盈亏会部分夸大)

price = np.array(close[SIZE+split:])
y_pred = np.round(pred[:,2] - pred[:, 0], 2)
return_log = np.cumsum(y_pred * ((price[T:] / price[:-T]) - 1)) + 1
print('Return:', return_log[-1])

: 3.55166

加起来看起来还可以,最后画个图

plot_df = pd.DataFrame({'price': price[:-T], 'return': return_log})
fig = plt.figure(figsize=(16,6))
ax1 = fig.add_subplot(111)
plot_df['price'].plot(ax=ax1,grid=True,alpha=0.8,style='b',label='Price')
plt.xlabel('Hour')
ax1.set_ylabel('Price')
plt.plot(np.where(y_pred>0)[0], price[np.where(y_pred>0)[0]], '^', markersize=10, color='g', alpha=0.4, label='Long')
plt.plot(np.where(y_pred<0)[0], price[np.where(y_pred<0)[0]], 'v', markersize=10, color='r', alpha=0.4, label='Short')
plt.legend(loc=2)
ax2 = ax1.twinx()
plot_df['return'].plot(ax=ax2,label='Return',style='y',alpha=0.7)
ax2.set_ylabel('Profit')
plt.legend(loc=1)
plt.title('Predicted returns and changes in Bitcoin price')
plt.savefig('result.png', dpi=400, bbox_inches='tight')

虽然在大多数运行情况下模型的预测结果是赚钱的,但是看运气,也就是不够稳定。欢迎大佬们来指正或是提点建议。

评论(0)

二维码