量化交易学习(三)backtrader使用掘金量化实时数据源

接入实时数据是仿真和实盘的基础,如果不能仿真和实盘,那用backtrader写的策略代码就都只能做研究了。我搜遍了全网都没有找到backtrader如何自定义实时数据源及券商接口的解决方案,backtrader支持的实盘券商不多,在官网上也没找到自定义接口的文档。在研究了backtrader内置的盈透实盘接口的文档和代码后,发现其实实盘与回测差别也就是两点:实时数据源、券商交易接口。

前一篇文章讲了怎么通过掘金量化的history接口获取数据提供给backtrader使用。本文将介绍怎么利用管道通过掘金量化的on_bar、on_tick回调函数获取实时数据。

整体思路是这样的:首先自定义一个 MyQuantData 类用来表示从掘金量化获取的实时数据源:

MyQuantData继承了bt.feed.DataBase类,conn 是用来接收从掘金量化那里传来的行情数据,islive函数用来表示这个数据源是不是实时数据源,这里一定要让它返回True,不然在运行程序的时候会卡住。获取数据的函数是_load函数,通过bar = self.conn.recv()获取管道中传来的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class MyQuantData(bt.feed.DataBase):
conn = None

def islive(self):
return True

def start(self):
super(MyQuantData, self).start()
if self.conn is None:
raise Exception("Invalid conn!")

def stop(self):
super(MyQuantData, self).stop()
if self.conn is not None:
self.conn.close()
self.conn = None

def _load(self):
if self.conn is None:
return False
try:
bar = self.conn.recv()
self.lines.datetime[0] = bt.utils.date2num(bar['eob'])
self.lines.open[0] = bar['open']
self.lines.high[0] = bar['high']
self.lines.low[0] = bar['low']
self.lines.close[0] = bar['close']
self.lines.volume[0] = bar['volume']
self.lines.openinterest[0] = -1
return True
except:
return False

然后再定义一个获取数据源的函数,把管道传给数据源:

1
2
3
4
def get_my_quant_data(conn):
data = MyQuantData()
data.conn = conn
return data

接下来是调用backtrader的函数,这里策略还是用上篇文章里的单均线策略。

1
2
3
4
5
6
7
8
9
def backtrader(conn):
cerebro = bt.Cerebro()
cerebro.addstrategy(SingleMAStrategy)
cerebro.adddata(get_my_quant_data(conn))
cerebro.broker.setcash(100000)
print('初始时资金持仓:%.2f' % cerebro.broker.getvalue())
cerebro.run()
print('结束时资金持仓:%.2f' % cerebro.broker.getvalue())
cerebro.plot()

在掘金量化的初始化函数中,创建一个新的进程,订阅行情:

1
2
3
4
5
6
7
8
def init(context):
parent_conn, child_conn = Pipe()
# 一定要把parent_conn挂到context对象中,之后就可以调用它
context.parent_conn = parent_conn
p = Process(target=backtrader, args=(child_conn,))
p.start()
subscribe(symbols='SHSE.600000', frequency='1d')
pass

on_bar和on_tick函数就是行情订阅的回调函数,这里以on_bar为例:

当行情数据更新时,会触发on_bar回调函数,在on_bar里把行情数据通过管道传给backtrader对应的进程即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def on_bar(context, bars):
origin_bar = bars[0]
bar = {
'symbol': origin_bar['symbol'],
'frequency': origin_bar['frequency'],
'open': origin_bar['open'],
'high': origin_bar['high'],
'low': origin_bar['low'],
'close': origin_bar['close'],
'volume': origin_bar['volume'],
'amount': origin_bar['amount'],
'pre_close': origin_bar['pre_close'],
'position': origin_bar['position'],
'bob': origin_bar['bob'],
'eob': origin_bar['eob']
}
context.parent_conn.send(bar)

回测时当行情数据结束时,要把管道关闭,不然backtrader进程会一直等待新的数据

1
2
def on_backtest_finished(context, indicator):
context.parent_conn.close()

最后是掘金量化的主程序入口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
if __name__ == '__main__':
'''
strategy_id策略ID, 由系统生成
filename文件名, 请与本文件名保持一致
mode运行模式, 实时模式:MODE_LIVE回测模式:MODE_BACKTEST
token绑定计算机的ID, 可在系统设置-密钥管理中生成
backtest_start_time回测开始时间
backtest_end_time回测结束时间
backtest_adjust股票复权方式, 不复权:ADJUST_NONE前复权:ADJUST_PREV后复权:ADJUST_POST
backtest_initial_cash回测初始资金
backtest_commission_ratio回测佣金比例
backtest_slippage_ratio回测滑点比例
'''
run(strategy_id='xxxx',
filename='main.py',
mode=MODE_BACKTEST,
token='xxxx',
backtest_start_time='2023-01-01 08:00:00',
backtest_end_time='2024-01-01 16:00:00',
backtest_adjust=ADJUST_PREV,
backtest_initial_cash=10000000,
backtest_commission_ratio=0.0001,
backtest_slippage_ratio=0.0001)

执行程序后backtrader会画出本次回测的结果:
图片

这一篇简单介绍了怎么将掘金量化的实时数据源接入到backtrader中,文章中只演示了回测的结果,这种方式也支持仿真和实盘的实时数据获取。

这一篇就到这里啦。欢迎大家点赞、转发、私信。

江达小记