量化交易学习(二十五)backtrader使用mysql数据源

上一篇文章,介绍了怎么通过掘金量化的接口下载股票数据到本地mysql数据库。这一篇文章就来介绍一下怎么使用我们下载好的数据。

首先导入各种库:

1
2
3
4
5
6
7
8
9
10
11
import decimal
from backtrader.feed import DataBase
from backtrader import date2num
from backtrader import TimeFrame

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import declarative_base
from sqlalchemy import Column,String,BigInteger,Numeric,Double,DateTime

import datetime

定义好访问mysql数据库的类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Base = declarative_base()
class Daily(Base):
__tablename__ = 'daily'

id = Column(BigInteger,primary_key=True,autoincrement=True)
symbol = Column(String(20))
frequency = Column(String(10))
open=Column(Numeric(precision=20, scale=5))
high=Column(Numeric(precision=20, scale=5))
low=Column(Numeric(precision=20, scale=5))
close=Column(Numeric(precision=20, scale=5))
pre_close=Column(Numeric(precision=20, scale=5))
volume=Column(BigInteger)
amount=Column(Double)
position=Column(BigInteger)
bob=Column(DateTime)
eob=Column(DateTime)
adj_factor=Column(Double)
name=Column(String(20))

连接数据库:

1
2
3
4
5
6
# 替换为你的数据库连接信息
DATABASE_URI = "mysql+pymysql://root:quant@localhost/quant"
engine = create_engine(DATABASE_URI)
# 创建会话
Session = sessionmaker(bind=engine)
session = Session()

接下来就要定义数据源了。

在一篇文章中,我们下载了不复权的数据,及累计后复权因子,现在我们要通过公式计算出前、后复权的数据。

后复权价格的计算公式:

1
复权价格计算:T日后复权价格 = T日不复权价格 * T日累计后复权因子 

前复权价格的计算公式:

1
2
T日前复权因子 = T日累计后复权因子/复权基准日累计后复权因子
T日前复权价格 = T日不复权价格 * T日前复权因子

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# 支持复权各种,支持
class MySQLData(DataBase):
# 不复权:ADJUST_NONE前复权:ADJUST_PREV后复权:ADJUST_POST
params = dict(adj_mode='ADJUST_PREV',
adj_base_date=datetime.date.today(),
)
def __init__(self):
super(MySQLData, self).__init__()
self.frequency='1d'
if self.p.timeframe==TimeFrame.Days:
self.frequency='1d'
elif self.p.timeframe==TimeFrame.Minutes:
self.frequency='60s'
self.idx=0
# 数据源在启动时,获取回测时间段的股票数据
def start(self):
self.data=session.query(Daily).filter(Daily.symbol==self.p.dataname,
Daily.frequency==self.frequency,
Daily.bob>=self.p.fromdate,
Daily.eob<=self.p.todate).all()

# 获取前复权基准日的复权因子
self.base_day_stock=session.query(Daily).filter(Daily.symbol==self.p.dataname,
Daily.frequency=='1d',
Daily.eob==self.p.adj_base_date).first()

if self.base_day_stock==None:
self.base_day_stock=session.query(Daily).filter(Daily.symbol==self.p.dataname,
Daily.frequency=='1d',
).order_by(Daily.eob.desc()).first()

def stop(self):
pass

def _load(self):
if self.idx>=len(self.data):
return False

stock=self.data[self.idx]
self.idx=self.idx+1
self.lines.datetime[0] = date2num(stock.eob)
# 这里要计算复权价格
# 复权价格计算: T日后复权价格 = T日不复权价格 * T日累计后复权因子
# T日前复权价格 = T日不复权价格 * T日前复权因子
# T日前复权因子=T日累计后复权因子/复权基准日累计后复权因子
adj_pre_factor=stock.adj_factor/self.base_day_stock.adj_factor

if self.p.adj_mode=='ADJUST_NONE':
pass
elif self.p.adj_mode=='ADJUST_PREV':
stock.open=stock.open*decimal.Decimal(adj_pre_factor)
stock.high=stock.high*decimal.Decimal(adj_pre_factor)
stock.low=stock.low*decimal.Decimal(adj_pre_factor)
stock.close=stock.close*decimal.Decimal(adj_pre_factor)
elif self.p.adj_mode=='ADJUST_POST':
stock.open=stock.open*decimal.Decimal(stock.adj_factor)
stock.high=stock.high*decimal.Decimal(stock.adj_factor)
stock.low=stock.low*decimal.Decimal(stock.adj_factor)
stock.close=stock.close*decimal.Decimal(stock.adj_factor)

self.lines.open[0] = stock.open
self.lines.high[0] = stock.high
self.lines.low[0] = stock.low
self.lines.close[0] = stock.close
self.lines.volume[0] = stock.volume
self.lines.openinterest[0] = 0

# Say success
return True

接下来用一个示例来说明怎么使用这个数据源:
首先把上面数据源的代码放在mysqlDataFeed.py文件中,策略类的代码放在strategy.py文件中,这样在使用时,可以通过import来导入相关的类,以精简main.py的代码量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import backtrader as bt
import datetime

import mysqlDataFeed
import strategy

cerebro = bt.Cerebro()

cerebro.addstrategy(strategy.MACD1Strategy)

data=mysqlDataFeed.MySQLData(
dataname='SHSE.510300',
timeframe=bt.TimeFrame.Days,
fromdate=datetime.datetime(2022,1,1),
todate=datetime.datetime(2024,2,22),
adj_base_date=datetime.datetime(2024,2,22))

cerebro.adddata(data)

策略采用macd金叉买入,死叉卖出策略:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class MACD1Strategy(bt.Strategy):

# 打印日志的函数
def log(self, txt, dt=None):
dt = dt or self.datas[0].datetime.date(0)
print('%s, %s' % (dt.isoformat(), txt))

# 策略初始化,设置一些参数
def __init__(self):
# 标第的收盘价
self.dataclose = self.data.close
self.order = None
self.buyprice = 0
self.buycomm = None
self.macd=bt.indicators.MACD()
self.cross=bt.indicators.CrossOver(self.macd.macd,self.macd.signal)
self.buyflg=False

# 记录交易收益情况
def notify_trade(self, trade):
if trade.isclosed:
print('毛收益 %0.2f, 扣佣后收益 % 0.2f, 佣金 %.2f, 市值 %.2f, 现金 %.2f' %
(trade.pnl, trade.pnlcomm, trade.commission, self.broker.getvalue(), self.broker.getcash()))

def notify_order(self, order):
if order.status in [order.Submitted, order.Accepted]:
# 订单状态 submitted/accepted,无动作
return

# 订单完成
if order.status in [order.Completed]:
if order.isbuy():
self.log('买单执行,%s, %.2f, %i' % (order.data._name,
order.executed.price, order.executed.size))

elif order.issell():
self.log('卖单执行, %s, %.2f, %i' % (order.data._name,
order.executed.price, order.executed.size))

else:
self.log('订单作废 %s, %s, isbuy=%i, size %i, open price %.2f' %
(order.data._name, order.getstatusname(), order.isbuy(), order.created.size, order.data.open[0]))


# 每条k线都会执行这个函数
def next(self):
# macd 金叉
if self.cross==1:
vol = self.broker.getvalue() / self.dataclose[0] // 100 * 100
self.log('创建买单,%.2f' % self.dataclose[0])
self.log('vol:%.2f' % vol)
self.order = self.buy(size=vol)
self.buyflg=True

elif self.cross==-1:
if self.buyflg:
self.log('创建卖单,%.2f' % self.dataclose[0])
self.order = self.sell(size=self.position.size)
self.buyflg=False

b48d7e824d5927096849d1cec860b24b.png
57adfd911eacb252b32ba57e4d97ef4b.png


这一篇就到这里啦。欢迎大家点赞、转发、私信。还没有关注我的朋友可以关注 江达小记

江达小记