最近在公众号上看到这篇文章,故决定复现下,并再次基础上做些修改
其论文的核心思想是将市场轮廓指标(TPO)作为输入,通过改变灰度使CNN更重视最近的时间段。算是CNN应用在时间序列上的技巧(邪术)。
首先导入相关包
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow import keras
print('numpy :', np.__version__)
print('pandas :', pd.__version__)
print('tensorflow:', tf.__version__)
:
numpy: 1.20.3
: 1.3.4
: 2.8.0
数据是从这个网站弄来的,免费的数字货币数据来源。但是数据不怎么干净,需要预处理。
预处理过后是标准的OHLCV,只要数据处理成一样的格式都可以运行,date, open, 都不是必须的。
df = pd.read_csv('BTCUSDT-1h.csv', index_col=0)
close = np.array(df['close'])
high = np.array(df['high'])
low = np.array(df['low'])
df
BTC 小时数据
根据 的原理生成一张张图。
def market_profile(high, low, t, size, x_split):
cut_list = np.linspace(0, size, x_split + 1).astype(int)
split_len = int(size / x_split)
matrix = np.array([[]])
price_list = np.linspace(np.min(low[cut_list[0]:cut_list[-1]]),
np.max(high[cut_list[0]:cut_list[-1]]),
size)
for c in range(1, len(cut_list)):
h = high[cut_list[c - 1]:cut_list[c]]
l = low[cut_list[c - 1]:cut_list[c]]
out_array = np.full((size, split_len), 0)
time_list = np.linspace(0, split_len, split_len + 1).astype(int)
for i in range(1, len(time_list)):
index = np.where((l[i-1] <= price_list) & (price_list <= h[i-1]))[0]
for row in index:
for column in range(split_len):
if out_array[row][column] == 0:
out_array[row][column] = i
break
if c == 1:
matrix = out_array
else:
matrix = np.hstack((matrix, out_array))
return matrix.astype(np.float32)
T = 8
SIZE = 32
SPLIT = 4
tpo = market_profile(high, low, T, SIZE, SPLIT)
plt.matshow(tpo)
plt.show()
滚动对每32个小时生成x,并将其改为渐变灰度图。
x = []
gray_scale = np.linspace(0.6, 1, SIZE)
for i in range(SIZE, len(close)-T):
# 生成TPO
tpo = market_profile(high[i-SIZE:i], low[i-SIZE:i], T, SIZE, SPLIT)
# 改变灰度值
tpo[tpo > 0] = 1
for i in range(len(tpo[0])):
tpo[:, i] = tpo[:, i] * gray_scale[i]
x.append(tpo)
x = np.array(x)
np.save('x.npy', x)
x = np.load('x.npy')
x.shape
(39897, 32, 32)
像是这样的图:
生成y, 有多,空和中性三个分类, 这里设置一个r是让多空条件明确一些,不然会有误判。y的条件可以随便设置。反转,双底,震荡理论上来说都行。
r = 0.02
y = []
for i in range(SIZE, len(close)-T):
if (1-r)*close[i-T] > close[i] > (1+r)*close[i+T]:
y.append(0)
elif (1+r)*close[i-T] < close[i] < (1-r)*close[i+T]:
y.append(2)
else:
y.append(1)
y = np.array(y)
np.save('y.npy', y)
y = np.load('y.npy')
y.shape
(39897,)
将x和y转换为输入需要的格式
x = x.reshape(x.shape[0], SIZE, SIZE, 1)
print(x.shape)
onehot_y = np.zeros((y.size, 3))
onehot_y[np.arange(y.size), y] = 1
print(onehot_y.shape)
(39897, 32, 32, 1)
(39897, 3)
按6-4的比例分割x,y为训练集和测试集
split = int(0.6 * len(close))
x_train = x[:split]
y_train = onehot_y[:split]
x_test = x[split:]
y_test = onehot_y[split:]
print('x_train: ', x_train.shape)
print('y_train: ', y_train.shape)
print('x_test: ', x_test.shape)
print('y_test: ', y_test.shape)
: (23962, 32, 32, 1)
: (23962, 3)
: (15935, 32, 32, 1)
: (15935, 3)
这是lenet-5的模型,我这里将原来的10分类改为了3分类
batch_size = 128
shape = (SIZE,SIZE,1)
model = keras.Sequential([
keras.layers.Conv2D(6, 5),
keras.layers.MaxPooling2D(pool_size=2, strides=2),
keras.layers.ReLU(),
keras.layers.Conv2D(16, 5),
keras.layers.MaxPooling2D(pool_size=2, strides=2),
keras.layers.ReLU(),
keras.layers.Conv2D(120, 5),
keras.layers.Flatten(),
keras.layers.Dense(84, activation='relu'),
keras.layers.Dense(3, activation='softmax')
])
model.build(input_shape=(batch_size, shape[0], shape[1], shape[2]))
model.summary()
训练模型
model.compile(optimizer=keras.optimizers.Adam(), loss=keras.losses.CategoricalCrossentropy(), metrics=['accuracy'])
history = model.fit(x_train, y_train, epochs=50, validation_data=(x_test, y_test))
# model.save('model.h5')
# model = keras.models.load_model('model.h5')
预测,这里的准确率没有什么参考价值因为有很多标签为1(中性)
loss, acc = model.evaluate(x_test, y_test)
print('Loss :', loss)
print('Accuracy:', acc)
pred = model.predict(x_test)
pred
CNN使用了将结果转换为了一个总和为1的概率分布,那么我们可以将这个概率当成每天的持仓大小。并用close计算每天的收益率。(注意: 我这里只是对每小时收益率进行了简单的累积加法,在真实情况下仓位需要持有T=8天,因此实际盈亏会部分夸大)
price = np.array(close[SIZE+split:])
y_pred = np.round(pred[:,2] - pred[:, 0], 2)
return_log = np.cumsum(y_pred * ((price[T:] / price[:-T]) - 1)) + 1
print('Return:', return_log[-1])
: 3.55166
加起来看起来还可以,最后画个图
plot_df = pd.DataFrame({'price': price[:-T], 'return': return_log})
fig = plt.figure(figsize=(16,6))
ax1 = fig.add_subplot(111)
plot_df['price'].plot(ax=ax1,grid=True,alpha=0.8,style='b',label='Price')
plt.xlabel('Hour')
ax1.set_ylabel('Price')
plt.plot(np.where(y_pred>0)[0], price[np.where(y_pred>0)[0]], '^', markersize=10, color='g', alpha=0.4, label='Long')
plt.plot(np.where(y_pred<0)[0], price[np.where(y_pred<0)[0]], 'v', markersize=10, color='r', alpha=0.4, label='Short')
plt.legend(loc=2)
ax2 = ax1.twinx()
plot_df['return'].plot(ax=ax2,label='Return',style='y',alpha=0.7)
ax2.set_ylabel('Profit')
plt.legend(loc=1)
plt.title('Predicted returns and changes in Bitcoin price')
plt.savefig('result.png', dpi=400, bbox_inches='tight')
虽然在大多数运行情况下模型的预测结果是赚钱的,但是看运气,也就是不够稳定。欢迎大佬们来指正或是提点建议。
评论(0)