量化交易学习(十六)掘金小市值选股策略

今天这篇介绍小市值选股策略。

小市值选股策略是指以市值为标准,选取市场中市值较小的股票进行投资的策略。该策略基于小市值股票具有以下优势:

  • 估值相对较低:小市值股票往往被市场低估,具有较大的超额收益潜力。
  • 成长潜力高:小市值公司通常处于发展初期,具有较高的成长空间。
  • 流动性溢价低:小市值股票的流动性溢价较低,投资者可以以更接近真实价值的价格买入。

小市值选股策略的具体实施方法如下:

  1. 确定市值范围:根据市场情况,确定小市值的具体范围。例如,可以将市值在100亿元以下的股票定义为小市值股票。
  2. 筛选股票池:根据选定的市值范围,筛选出符合条件的股票池。
  3. 选股指标:根据投资目标和风险偏好,选取合适的选股指标。常见的选股指标包括市盈率、市净率、股息率、营业收入增长率、净利润增长率等。
  4. 构建投资组合:根据选股指标,选取符合条件的股票,构建投资组合。

小市值选股策略需要注意以下风险:

  • 流动性风险:小市值股票的流动性较差,在买入或卖出时可能面临较大的冲击成本。
  • 市场风险:小市值股票的风险普遍较高,在市场波动较大的情况下,可能会遭受较大的损失。
  • 信息不对称风险:小市值公司的信息披露往往不够充分,投资者可能难以获得足够的信息进行投资决策。

以下是一些小市值选股策略的案例:

  • 市值最小策略:该策略简单有效,直接选取市值最小的股票进行投资。
  • 市盈率动量策略:该策略选取市盈率较低且近期市盈率快速上升的股票进行投资。
  • 成长因子策略:该策略选取营业收入增长率和净利润增长率较高的股票进行投资。

投资者在采用小市值选股策略时,应充分考虑自身的风险承受能力,并做好风险管理。

接下来就用代码介绍怎么实现小市值选股策略。这套代码是之前券商发给我的,我之前跑仿真盘,跑了一段时间,收益还行。因为没弄懂原理一直也不敢拿来实盘,另外在仿真过程中也出现过一些bug。

今天就好好读读它的源码,看看这个策略是怎么实现的。

首先是导入 python 库

1
2
3
4
5
6
# coding=utf-8
from __future__ import print_function, absolute_import, unicode_literals
from gm.api import *
import datetime
import pandas as pd
import numpy as np

在初始化函数中,定义一些变量,并注册选股、卖出、买入三个定时任务,这三个任务每天运行一次。在每天的9:15执行选股任务,9:30执行卖出任务,9:31执行买入任务。

1
2
3
4
5
6
7
8
9
10
11
def init(context):
# 定义股票池数量
context.num = 20
# 波动率周期
context.volatility = 252
# 双周标记
context.twoweekflg=True
# 定时任务
schedule(schedule_func=algo, date_rule='1d', time_rule='09:15:00')
schedule(schedule_func=sell_algo, date_rule='1d', time_rule='09:30:00')
schedule(schedule_func=buy_algo, date_rule='1d', time_rule='09:31:30')

接下来定义选股函数,选股逻辑是这样的:
在每个双周周初(除一月、 四月和六月), 在基础股票池中选择市值最小的 100 个股票,然后选择波动率最低的 50 个股票。调仓时将处于跌停板的股票以打开跌停当天收盘价格卖出。
在一月、四月、六月将小市值策略空仓,可以显著提高策略表现。 一月是业绩预告的主要时间段,在此期间发生的业绩暴雷、财务异常等事件会对小市值股票造成一定负面影响。 四月份为年报的集中披露期,期间小市值股票有较高的戴帽风险。相比而言,六月空仓没有对应的经济逻辑解释,而只是基于历史统计。在代码中没有对6月进行空仓处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# 分割数据的辅助函数
def Arrsplits(item, n):
return [item[i:i+n] for i in range(0, len(item), n)]

def algo(context):
context.trading_status = False# 交易状态,TRUE时启动交易,FALSE关闭
# 月份
month = context.now.month
# 过滤一月和四月(有持仓则平仓)
if month==1 or month==4:
order_close_all()
return None
# 上一个交易日
last_date = get_previous_trading_date(exchange='SHSE', date=context.now)
last_last_date = get_previous_trading_date(exchange='SHSE', date=last_date)
next_date = get_next_trading_date(exchange='SHSE', date=context.now)
if context.now.weekday()<=datetime.datetime.strptime(last_date, '%Y-%m-%d').weekday():# 周一
context.twoweekflg=not context.twoweekflg
if context.twoweekflg:
return None

# 获取A股代码(剔除停牌股、ST股、次新股(365天),科创版)
all_stock,all_stock_str = get_normal_stocks(context.now)
all_stock_array=Arrsplits(all_stock,200)
all_stock=[]
# 加强:ROE>0
for arr in all_stock_array:
fundamental=get_fundamentals(table='prim_finance_indicator', symbols=arr, start_date=context.now, end_date=context.now, fields='ROEWEIGHTED', limit=40000, df=True)
stocks=list(fundamental[fundamental['ROEWEIGHTED']>0]['symbol'])
all_stock.extend(stocks)
# 获取所有股票市值,并按升序排序
all_stock_array=Arrsplits(all_stock,200)
fundamental=None
for arr in all_stock_array:
if fundamental is None:
fundamental = get_fundamentals(table='trading_derivative_indicator', symbols=arr, start_date=context.now, end_date=context.now, fields='TOTMKTCAP', limit=40000, df=True).sort_values('TOTMKTCAP')
else:
fundamental_loc = get_fundamentals(table='trading_derivative_indicator', symbols=arr, start_date=context.now, end_date=context.now, fields='TOTMKTCAP', limit=40000, df=True).sort_values('TOTMKTCAP')
pd.concat([fundamental,fundamental_loc])
# 获取前2N只股票
to_buy = list(fundamental.iloc[:context.num*2,:]['symbol'])
# # 计算波动率
# 开始日期(计算日频收益率)
start_date = get_previous_N_trading_date(context.now,counts=context.volatility+1)
# 获取收盘价
close = history_new(context, security=to_buy,frequency='1d',start_time=start_date,end_time=last_date,fields='eob,symbol,close',skip_suspended=True,fill_missing=None,adjust=ADJUST_PREV,adjust_end_time=last_date, df=True)
# 日频收益率
ret = (close/close.shift(1)-1).iloc[1:,:]
# 波动率
volatility = ret.std().sort_values()
# 目标股票
context.to_buy = list(volatility.iloc[:context.num].index)
print('{}:本次股票池有{}只:{}'.format(context.now,len(context.to_buy),context.to_buy))
context.trading_status = True

平不在标的池的股票:

1
2
3
4
5
6
7
8
9
10
11
12
def sell_algo(context):
if context.trading_status:
positions = context.account().positions()
# 平不在标的池的股票
for position in positions:
symbol = position['symbol']
if symbol not in context.to_buy:
lower_limit = get_history_instruments(symbol, start_date=context.now, end_date=context.now, df=True)
new_price = current(symbol)
if symbol not in context.to_buy and (len(new_price)==0 or len(lower_limit)==0 or lower_limit['lower_limit'][0]!=round(new_price[0]['price'],2)):
# new_price为空时,是开盘后无成交的现象,此处忽略该情况,可能会包含涨跌停的股票
order_target_percent(symbol=symbol, percent=0, order_type=OrderType_Limit, position_side=PositionSide_Long, price=new_price[0]['price'])

买在标的池中的股票:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def buy_algo(context):
if context.trading_status:
# 买在标的池中的股票
for symbol in context.to_buy:
upper_limit = get_history_instruments(symbol, start_date=context.now, end_date=context.now, df=True)
new_price = current(symbol)
if len(new_price)==0 or len(upper_limit)==0 or upper_limit['upper_limit'][0]!=round(new_price[0]['price'],2):
order_target_percent(symbol=symbol, percent=1/context.num, position_side=PositionSide_Long, order_type=OrderType_Market)
# new_price为空时,是开盘后无成交的现象,此处忽略该情况,可能会包含涨跌停的股票
# nav = context.account().cash['nav']
# trade_volume = int(np.floor(nav/context.num/new_price[0]['price']/100)*100)# 将下单金额换算成下单数量
# order_volume(symbol=symbol, volume=trade_volume, side=OrderSide_Buy, order_type=OrderType_Market, position_effect=PositionEffect_Open, price=new_price[0]['price'])
```
打印当前订单状态:
```py
def on_order_status(context, order):
# 标的代码
symbol = order['symbol']
# 委托价格
price = order['price']
# 委托数量
volume = order['volume']
# 目标仓位
target_percent = order['target_percent']
# 查看下单后的委托状态,等于3代表委托全部成交
status = order['status']
# 买卖方向,1为买入,2为卖出
side = order['side']
# 开平仓类型,1为开仓,2为平仓
effect = order['position_effect']
# 委托类型,1为限价委托,2为市价委托
order_type = order['order_type']
if status == 3:
if effect == 1:
if side == 1:
side_effect = '开多仓'
elif side == 2:
side_effect = '开空仓'
else:
if side == 1:
side_effect = '平空仓'
elif side == 2:
side_effect = '平多仓'
order_type_word = '限价' if order_type==1 else '市价'
print('{}:标的:{},操作:以{}{},委托价格:{},委托数量:{}'.format(context.now,symbol,order_type_word,side_effect,price,volume))

辅助函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def get_previous_N_trading_date(date,counts=1,exchange='SHSE'):
"""
获取end_date前N个交易日,end_date为datetime格式,不包括date日期
:param date:目标日期
:param counts:历史回溯天数,默认为1,即前一天
"""
date = pd.Timestamp(date)
previous_N_trading_date = get_trading_dates(exchange=exchange, start_date=date-datetime.timedelta(days=max(counts+30,counts*3)), end_date=date)[-counts-1]
return previous_N_trading_date

def get_normal_stocks(date,new_days=365):
"""
获取目标日期date的A股代码(剔除停牌股、ST股、次新股(365天))
:param date:目标日期
:param new_days:新股上市天数,默认为365天
"""
if isinstance(date,str) and len(date)==10:
date = datetime.datetime.strptime(date,"%Y-%m-%d")
elif isinstance(date,str) and len(date)>10:
date = datetime.datetime.strptime(date,"%Y-%m-%d %H:%M:%S")
# 先剔除退市股、次新股和B股
df_code = get_instrumentinfos(sec_types=SEC_TYPE_STOCK, fields='symbol, listed_date, delisted_date', df=True)
all_stocks = [code for code in df_code[(df_code['listed_date']<=date-datetime.timedelta(days=new_days))&(df_code['delisted_date']>date+datetime.timedelta(days=31))].symbol.to_list() if code[:6]!='SHSE.9' and code[:6]!='SZSE.2']
# 再剔除当前的停牌股和ST股
history_ins = get_history_instruments(symbols=all_stocks, start_date=date, end_date=date, fields='symbol,sec_level, is_suspended', df=True)
all_stocks = list(history_ins[(history_ins['sec_level']==1) & (history_ins['is_suspended']==0)]['symbol'])
all_stocks = [code for code in all_stocks if code[:8]!='SHSE.688']
all_stocks_str = ','.join(all_stocks)
return all_stocks,all_stocks_str

def history_new(context,security,frequency,start_time,end_time,fields,skip_suspended=True,fill_missing=None,adjust=ADJUST_PREV, adjust_end_time='backtest_end_time', df=True):
"""
分区间获取数据(以避免超出数据限制)(start_time和end_date为字符串,fields需包含eob和symbol,单字段)
:param :参数同history()参数一致,adjust_end_time默认为回测结束时间:context.backtest_end_time
"""
if adjust_end_time=='backtest_end_time':
adjust_end_time = context.backtest_end_time
Data = pd.DataFrame()
if frequency=='1d':
trading_date = pd.Series(get_trading_dates(exchange='SZSE', start_date=start_time, end_date=end_time))
else:
trading_date = history('SHSE.000300', frequency=frequency, start_time=start_time, end_time=end_time, fields='eob', skip_suspended=skip_suspended, fill_missing=fill_missing, adjust=adjust, adjust_end_time=adjust_end_time, df=df)
trading_date = trading_date['eob']
space = 5
if len(trading_date)<=space:
Data = history(security, frequency=frequency, start_time=start_time, end_time=end_time, fields=fields, skip_suspended=skip_suspended, fill_missing=fill_missing, adjust=adjust, adjust_end_time=adjust_end_time, df=df)
else:
for n in range(int(np.ceil(len(trading_date)/space))):
start = n*space
end = start+space-1
if end>=len(trading_date):
data = history(security, frequency=frequency, start_time=trading_date.iloc[start], end_time=trading_date.iloc[-1], fields=fields, skip_suspended=skip_suspended, fill_missing=fill_missing, adjust=adjust, adjust_end_time=adjust_end_time, df=df)
else:
data = history(security, frequency=frequency, start_time=trading_date.iloc[start], end_time=trading_date.iloc[end], fields=fields, skip_suspended=skip_suspended, fill_missing=fill_missing, adjust=adjust, adjust_end_time=adjust_end_time, df=df)
if len(data)>=33000:
print('请检查返回数据量,可能超过系统限制,缺少数据!!!!!!!!!!')
Data = pd.concat([Data,data])
Data.drop_duplicates(keep='first',inplace=True)
if len(Data)>0:
Data = Data.set_index(['eob','symbol'])
Data = Data.unstack()
Data.columns = Data.columns.droplevel(level=0)
return Data


def on_backtest_finished(context, indicator):
print('*'*50)
print('回测已完成,请通过右上角“回测历史”功能查询详情。')

掘金主程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
if __name__ == '__main__':
'''
strategy_id策略ID,由系统生成
filename文件名,请与本文件名保持一致
mode实时模式:MODE_LIVE回测模式:MODE_BACKTEST
token绑定计算机的ID,可在系统设置-密钥管理中生成
backtest_start_time回测开始时间
backtest_end_time回测结束时间
backtest_adjust股票复权方式不复权:ADJUST_NONE前复权:ADJUST_PREV后复权:ADJUST_POST
backtest_initial_cash回测初始资金
backtest_commission_ratio回测佣金比例
backtest_slippage_ratio回测滑点比例
'''
run(strategy_id='xxxxx',
filename='main.py',
mode=MODE_BACKTEST,
token='xxxx',
backtest_start_time='2023-01-01 08:00:00',
backtest_end_time='2024-01-1 16:00:00',
backtest_adjust=ADJUST_PREV,
backtest_initial_cash=1000000,
backtest_commission_ratio=0.0001,
backtest_slippage_ratio=0.0001)

江达小记