接入实时数据是仿真和实盘的基础,如果不能仿真和实盘,那用backtrader写的策略代码就都只能做研究了。我搜遍了全网都没有找到backtrader如何自定义实时数据源及券商接口的解决方案,backtrader支持的实盘券商不多,在官网上也没找到自定义接口的文档。在研究了backtrader内置的盈透实盘接口的文档和代码后,发现其实实盘与回测差别也就是两点:实时数据源、券商交易接口。
前一篇文章讲了怎么通过掘金量化的history接口获取数据提供给backtrader使用。本文将介绍怎么利用管道通过掘金量化的on_bar、on_tick回调函数获取实时数据。
整体思路是这样的:首先自定义一个 MyQuantData 类用来表示从掘金量化获取的实时数据源:
MyQuantData继承了bt.feed.DataBase类,conn 是用来接收从掘金量化那里传来的行情数据,islive函数用来表示这个数据源是不是实时数据源,这里一定要让它返回True,不然在运行程序的时候会卡住。获取数据的函数是_load函数,通过bar = self.conn.recv()获取管道中传来的数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| class MyQuantData(bt.feed.DataBase): conn = None
def islive(self): return True
def start(self): super(MyQuantData, self).start() if self.conn is None: raise Exception("Invalid conn!")
def stop(self): super(MyQuantData, self).stop() if self.conn is not None: self.conn.close() self.conn = None
def _load(self): if self.conn is None: return False try: bar = self.conn.recv() self.lines.datetime[0] = bt.utils.date2num(bar['eob']) self.lines.open[0] = bar['open'] self.lines.high[0] = bar['high'] self.lines.low[0] = bar['low'] self.lines.close[0] = bar['close'] self.lines.volume[0] = bar['volume'] self.lines.openinterest[0] = -1 return True except: return False
|
然后再定义一个获取数据源的函数,把管道传给数据源:
1 2 3 4
| def get_my_quant_data(conn): data = MyQuantData() data.conn = conn return data
|
接下来是调用backtrader的函数,这里策略还是用上篇文章里的单均线策略。
1 2 3 4 5 6 7 8 9
| def backtrader(conn): cerebro = bt.Cerebro() cerebro.addstrategy(SingleMAStrategy) cerebro.adddata(get_my_quant_data(conn)) cerebro.broker.setcash(100000) print('初始时资金持仓:%.2f' % cerebro.broker.getvalue()) cerebro.run() print('结束时资金持仓:%.2f' % cerebro.broker.getvalue()) cerebro.plot()
|
在掘金量化的初始化函数中,创建一个新的进程,订阅行情:
1 2 3 4 5 6 7 8
| def init(context): parent_conn, child_conn = Pipe() context.parent_conn = parent_conn p = Process(target=backtrader, args=(child_conn,)) p.start() subscribe(symbols='SHSE.600000', frequency='1d') pass
|
on_bar和on_tick函数就是行情订阅的回调函数,这里以on_bar为例:
当行情数据更新时,会触发on_bar回调函数,在on_bar里把行情数据通过管道传给backtrader对应的进程即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| def on_bar(context, bars): origin_bar = bars[0] bar = { 'symbol': origin_bar['symbol'], 'frequency': origin_bar['frequency'], 'open': origin_bar['open'], 'high': origin_bar['high'], 'low': origin_bar['low'], 'close': origin_bar['close'], 'volume': origin_bar['volume'], 'amount': origin_bar['amount'], 'pre_close': origin_bar['pre_close'], 'position': origin_bar['position'], 'bob': origin_bar['bob'], 'eob': origin_bar['eob'] } context.parent_conn.send(bar)
|
回测时当行情数据结束时,要把管道关闭,不然backtrader进程会一直等待新的数据
1 2
| def on_backtest_finished(context, indicator): context.parent_conn.close()
|
最后是掘金量化的主程序入口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| if __name__ == '__main__': ''' strategy_id策略ID, 由系统生成 filename文件名, 请与本文件名保持一致 mode运行模式, 实时模式:MODE_LIVE回测模式:MODE_BACKTEST token绑定计算机的ID, 可在系统设置-密钥管理中生成 backtest_start_time回测开始时间 backtest_end_time回测结束时间 backtest_adjust股票复权方式, 不复权:ADJUST_NONE前复权:ADJUST_PREV后复权:ADJUST_POST backtest_initial_cash回测初始资金 backtest_commission_ratio回测佣金比例 backtest_slippage_ratio回测滑点比例 ''' run(strategy_id='xxxx', filename='main.py', mode=MODE_BACKTEST, token='xxxx', backtest_start_time='2023-01-01 08:00:00', backtest_end_time='2024-01-01 16:00:00', backtest_adjust=ADJUST_PREV, backtest_initial_cash=10000000, backtest_commission_ratio=0.0001, backtest_slippage_ratio=0.0001)
|
执行程序后backtrader会画出本次回测的结果:

这一篇简单介绍了怎么将掘金量化的实时数据源接入到backtrader中,文章中只演示了回测的结果,这种方式也支持仿真和实盘的实时数据获取。
这一篇就到这里啦。欢迎大家点赞、转发、私信。
