严格聚焦 "4S 仓位管理算法:总资金按评分高低自动分配个股持仓权重"
4S 仓位管理算法:基于评分的个股资金分配
一、实际应用场景描述
在量化选股系统中,选股(Stock Selection) 和 仓位管理(Position Sizing) 是两个独立问题。即使 AI 模型给出了评分,如果不科学地分配资金,依然会:
典型场景
场景 问题
5 只候选股评分接近 平均分配 → 好公司和一般公司拿一样多的钱
评分差距很大(90 分 vs 60 分) 等权 → 浪费资金在弱信号上
资金量不同(10 万 vs 1000 万) 固定手数 → 小资金过度分散
新资金流入 不知道如何在现有持仓基础上增量分配
回测中 仓位管理缺失 → 收益被稀释、波动被放大
核心矛盾:AI 模型给出了"谁好谁差",但没有回答"好多少,该给多少"。
二、引入痛点(问题结构化)
层级 痛点 后果
数据层 评分分布未知,不做归一化 极端值主导资金分配
算法层 等权分配,浪费评分信息 策略区分度丧失
工程层 不考虑个股市值 / 流动性约束 小盘股分到大资金 → 冲击成本爆炸
风控层 单只个股无上限 → 过度集中 一只炸雷 → 全盘崩
教学层 讲义只讲"选股",不讲"分钱" 学生以为选出来就结束了
本质结论:选股是"找好标的",仓位管理是"把好钢用在刀刃上"。两者缺一不可。
三、核心逻辑讲解
3.1 仓位管理的核心问题
给定:
- 总资金 M
- N 只候选股,每只有一个评分 s_i ∈ [0, 1]
- 每只股票的市值 cap_i(可选约束)
求:
每只股票分配多少资金 w_i,使得:
1. Σ w_i = M(全部分配)
2. w_i 与 s_i 正相关(评分越高拿越多)
3. w_i ≤ cap_i(不超过个股市值约束)
4. 某种"公平性"或"集中度"约束
3.2 四种分配算法对比
算法 公式 特点 适用场景
等权(Equal Weight) w_i = M/N 最简单,忽略评分 基准对比
线性加权(Linear) w_i = M × s_i / Σs_j 评分直接映射资金 评分分布均匀时
指数加权(Exponential) w_i = M × e^(α·s_i) / Σe^(α·s_j) 放大头部差异 评分差距大时
凯利改进(Kelly-Modified) w_i ∝ s_i² / σ_i² 高分 + 低波动 → 更多资金 有波动率数据时
指数加权的 α 是核心超参数:
- α = 0 → 退化为等权
- α = 1 → 温和放大
- α = 3 → 极度集中(冠军拿大头)
3.3 工程化增强:三层约束
┌─────────────────────────────────────────────────────┐
│ 仓位分配三大约束 │
├─────────────────────────────────────────────────────┤
│ │
│ 约束 1:单只上限 │
│ ───────────────────────────────────────── │
│ w_i ≤ M × max_single_pct(如 30%) │
│ → 防止"全仓一只"的灾难性风险 │
│ │
│ 约束 2:最小持仓门槛 │
│ ───────────────────────────────────────── │
│ w_i ≥ M × min_single_pct(如 5%) │
│ → 防止"分 0.5% 给某只"的过度分散 │
│ │
│ 约束 3:流动性约束(可选) │
│ ───────────────────────────────────────── │
│ w_i ≤ daily_amount_i × liquidity_pct(如 10%) │
│ → 确保分配的资金"能买得进" │
│ │
└─────────────────────────────────────────────────────┘
3.4 增量分配(新资金流入)
当已有持仓,新资金到来时,不是重新分配全部资金,而是只在现有持仓 + 新候选中做增量调整:
新增资金 ΔM 的分配:
1. 保留现有持仓不变
2. 对 ΔM 执行仓位分配算法
3. 叠加到现有持仓上
4. 若超上限 → 溢出到次优标的
四、项目结构
position_manager/
├── README.md
├── requirements.txt
├── config.yaml
├── data/
│ ├── scores.csv # 个股评分数据
│ └── liquidity.csv # 成交额数据(可选)
├── src/
│ ├── allocator.py # ★ 核心:仓位分配算法
│ ├── constraint_engine.py # ★ 约束引擎(上限/下限/流动性)
│ ├── incremental_allocator.py # 增量分配
│ ├── position_backtester.py # 仓位管理回测
│ └── visualizer.py # 可视化
├── main.py
└── output/
└── position_allocation.csv
五、完整代码(模块化 + 清晰注释)
"requirements.txt"
pandas>=1.5
numpy>=1.21
matplotlib>=3.5
seaborn>=0.12
scipy>=1.9
pyyaml>=6.0
"config.yaml"
# 4S 仓位管理配置
# ★ 分配算法
allocator:
method: "exponential" # equal / linear / exponential / kelly
exponential_alpha: 2.0 # 指数加权参数(0=等权,越大越集中)
# 等权模式不需要额外参数
# ★ 约束
constraints:
max_single_pct: 0.30 # 单只最多 30%
min_single_pct: 0.05 # 单只最少 5%(防止过度分散)
enforce_min: true # 是否执行最小持仓约束
liquidity_cap_pct: 0.10 # 单只持仓不超过日成交额的 10%
# 回测
backtest:
total_capital: 1000000 # 总资金 100 万
n_candidates: 10 # 每次选 10 只
commission_rate: 0.0003
stamp_tax_rate: 0.001
# 输出
output:
path: "output/position_allocation.csv"
"src/allocator.py"(★ 核心模块)
"""
allocator.py
★ 仓位分配算法:按评分分配资金
支持四种算法:
1. equal — 等权分配
2. linear — 评分线性加权
3. exponential — 指数加权(放大头部)
4. kelly — 凯利改进(评分² / 波动率²)
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Optional
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
logger = logging.getLogger(__name__)
class PositionAllocator:
"""
★ 仓位分配器
核心方法:allocate(capital, scores, volatilities=None)
→ 返回每只股票的分配权重
"""
VALID_METHODS = ['equal', 'linear', 'exponential', 'kelly']
def __init__(
self,
method: str = 'exponential',
exponential_alpha: float = 2.0
):
"""
参数:
method: 分配算法(equal / linear / exponential / kelly)
exponential_alpha: 指数加权参数,越大头部越集中
"""
if method not in self.VALID_METHODS:
raise ValueError(f"未知算法: {method},支持: {self.VALID_METHODS}")
self.method = method
self.alpha = exponential_alpha
logger.info(f"仓位分配器初始化: 算法={method}")
if method == 'exponential':
logger.info(f" 指数加权 α={exponential_alpha}")
def allocate(
self,
capital: float,
scores: pd.Series,
volatilities: Optional[pd.Series] = None
) -> pd.Series:
"""
★ 核心方法:将总资金按评分分配到各标的
参数:
capital: 总资金(元)
scores: 个股评分 Series(index=code, value=0~1)
volatilities: 波动率 Series(仅 kelly 模式需要)
返回:
pd.Series: index=code, value=分配金额(元)
"""
if len(scores) == 0:
return pd.Series(dtype=float)
scores = scores.dropna()
if len(scores) == 0:
return pd.Series(dtype=float)
# 归一化评分到 [0, 1]
s_min, s_max = scores.min(), scores.max()
if s_max - s_min < 1e-10:
# 所有评分相同 → 等权
return self._equal(capital, scores)
norm_scores = (scores - s_min) / (s_max - s_min)
if self.method == 'equal':
weights = self._equal(capital, scores)
elif self.method == 'linear':
weights = self._linear(capital, norm_scores)
elif self.method == 'exponential':
weights = self._exponential(capital, norm_scores)
elif self.method == 'kelly':
weights = self._kelly(capital, norm_scores, volatilities)
else:
raise ValueError(f"未实现算法: {self.method}")
# 记录分配详情
self._log_allocation(scores, weights)
return weights
def _equal(self, capital: float, scores: pd.Series) -> pd.Series:
"""等权分配:w_i = M / N"""
n = len(scores)
return pd.Series(capital / n, index=scores.index)
def _linear(self, capital: float, norm_scores: pd.Series) -> pd.Series:
"""线性加权:w_i = M × s_i / Σs_j"""
total = norm_scores.sum()
if total < 1e-10:
return self._equal(capital, norm_scores)
return capital * norm_scores / total
def _exponential(self, capital: float, norm_scores: pd.Series) -> pd.Series:
"""
★ 指数加权:w_i = M × e^(α·s_i) / Σe^(α·s_j)
关键性质:
- α → 0 时,退化为等权
- α 越大,"第一名"分到的资金越多
"""
exp_scores = np.exp(self.alpha * norm_scores)
total = exp_scores.sum()
if total < 1e-10:
return self._equal(capital, norm_scores)
return capital * exp_scores / total
def _kelly(
self,
capital: float,
norm_scores: pd.Series,
volatilities: Optional[pd.Series]
) -> pd.Series:
"""
★ 凯利改进:w_i ∝ s_i² / σ_i²
逻辑:
- 评分越高 → 越值得重仓
- 波动率越高 → 越应该少配
- 两者平衡 → 夏普比率最大化方向
"""
if volatilities is None:
logger.warning("凯利模式未提供波动率,退化为线性加权")
return self._linear(capital, norm_scores)
# 对齐 index
vol = volatilities.reindex(norm_scores.index).fillna(0.2) # 缺失 → 默认 20% 波动
vol = vol.replace(0, 0.01) # 防止除零
# w_i ∝ score² / vol²
score_sq = norm_scores ** 2
vol_sq = vol ** 2
raw = score_sq / vol_sq
total = raw.sum()
if total < 1e-10:
return self._equal(capital, norm_scores)
return capital * raw / total
def _log_allocation(self, scores: pd.Series, weights: pd.Series):
"""打印分配详情"""
logger.debug(f"\n 仓位分配明细({self.method}):")
logger.debug(f" {'代码':<8} {'评分':<8} {'分配金额':<15} {'占比':<8}")
logger.debug(f" {'─'*50}")
sorted_idx = weights.sort_values(ascending=False).index
for code in sorted_idx:
pct = weights[code] / weights.sum() * 100
logger.debug(
f" {code:<8} {scores.get(code, 0):.3f} "
f"¥{weights[code]:>12,.0f} {pct:>5.1f}%"
)
# 集中度指标:赫芬达尔指数 HHI
shares = weights / weights.sum()
hhi = (shares ** 2).sum()
logger.debug(f" 集中度 HHI: {hhi:.4f}(1/N={1/len(scores):.4f})")
def allocate_batch(
self,
capital: float,
scores_df: pd.DataFrame,
score_col: str = 'score'
) -> pd.DataFrame:
"""
批量分配(多日期)
参数:
capital: 总资金
scores_df: DataFrame,含 date, code, score 列
score_col: 评分列名
返回:
DataFrame: 新增 allocation 列
"""
results = []
for date, group in scores_df.groupby('date'):
scores = pd.Series(
group[score_col].values,
index=group['code'].values
)
alloc = self.allocate(capital, scores)
for code, amount in alloc.items():
results.append({
'date': date,
'code': code,
'score': scores[code],
'allocation': amount,
'allocation_pct': amount / capital * 100
})
return pd.DataFrame(results)
"src/constraint_engine.py"(★ 约束引擎)
"""
constraint_engine.py
★ 仓位约束引擎:上限 / 下限 / 流动性约束
"""
import pandas as pd
import numpy as np
from typing import Dict, Optional
import logging
logger = logging.getLogger(__name__)
class ConstraintEngine:
"""
★ 仓位约束引擎
三层约束:
1. 单只上限(max_single_pct)
2. 单只下限(min_single_pct)
3. 流动性上限(liquidity_cap_pct × 日均成交额)
"""
def __init__(
self,
max_single_pct: float = 0.30,
min_single_pct: float = 0.05,
enforce_min: bool = True,
liquidity_cap_pct: Optional[float] = 0.10
):
"""
参数:
max_single_pct: 单只最多占总资金的百分比
min_single_pct: 单只最少占总资金的百分比
enforce_min: 是否执行最小持仓约束
liquidity_cap_pct: 持仓不超过日成交额的百分比(None = 不限制)
"""
self.max_pct = max_single_pct
self.min_pct = min_single_pct
self.enforce_min = enforce_min
self.liq_pct = liquidity_cap_pct
logger.info(f"约束引擎初始化:")
logger.info(f" 单只上限: {max_single_pct*100:.0f}%")
logger.info(f" 单只下限: {min_single_pct*100:.0f}%({'开' if enforce_min else '关'})")
logger.info(f" 流动性约束: {f'{liquidity_cap_pct*100:.0f}%' if liquidity_cap_pct else '关'}")
def apply(
self,
capital: float,
raw_allocations: pd.Series,
daily_amounts: Optional[pd.Series] = None
) -> pd.Series:
"""
★ 核心方法:对原始分配施加约束
参数:
capital: 总资金
raw_allocations: 原始分配金额
daily_amounts: 日均成交额(可选,用于流动性约束)
返回:
约束后的分配金额
"""
alloc = raw_allocations.copy()
if len(alloc) == 0:
return alloc
# === 约束 1:单只上限 ===
max_amount = capital * self.max_pct
capped = (alloc > max_amount).sum()
if capped > 0:
logger.debug(f" 约束1: {capped} 只超过上限 {self.max_pct*100:.0f}%,已截断")
alloc = alloc.clip(upper=max_amount)
# === 约束 2:最小持仓(可选)===
if self.enforce_min:
min_amount = capital * self.min_pct
# 小于下限的 → 提升到下限(从超额的里面匀)
under_min = alloc < min_amount
if under_min.sum() > 0:
# 简单处理:设为零(不分配),多余资金再分配
alloc[under_min] = 0
logger.debug(f" 约束2: {under_min.sum()} 只低于下限,已归零")
# === 约束 3:流动性上限 ===
if self.liq_pct is not None and daily_amounts is not None:
liq_cap = daily_amounts * self.liq_pct
liq_violations = (alloc > liq_cap).sum()
if liq_violations > 0:
logger.debug(f" 约束3: {liq_violations} 只超过流动性上限,已截断")
alloc = alloc.clip(upper=liq_cap.reindex(alloc.index).fillna(float('inf')))
# === 重新归一化,确保总和 = capital ===
total = alloc.sum()
if total > 0 and abs(total - capital) / capital > 0.01:
alloc = alloc / total * capital
logger.debug(f" 重新归一化: {total:,.0f} → {capital:,.0f}")
elif total == 0:
# 全被过滤了 → 等权兜底
logger.warning(" 所有标的被约束过滤,退化为等权")
alloc = pd.Series(capital / len(raw_allocations), index=raw_allocations.index)
return alloc
def apply_batch(
self,
capital: float,
allocations_df: pd.DataFrame,
amounts_df: Optional[pd.DataFrame] = None
) -> pd.DataFrame:
"""批量施加约束"""
results = []
for date, group in allocations_df.groupby('date'):
raw = pd.Series(
group['allocation'].values,
index=group['code'].values
)
daily_amts = None
if amounts_df is not None:
day_data = amounts_df[amounts_df['date'] == date]
if len(day_data) > 0:
daily_amts = pd.Series(
day_data['amount'].values,
index=day_data['code'].values
)
constrained = self.apply(capital, raw, daily_amts)
for code, amount in constrained.items():
results.append({
'date': date,
'code': code,
'raw_allocation': raw.get(code, 0),
'final_allocation': amount,
'final_pct': amount / capital * 100,
'cap_pct': min(amount / capital * 100, self.max_pct * 100)
})
return pd.DataFrame(results)
"src/incremental_allocator.py"
"""
incremental_allocator.py
增量分配:新资金到来时,在现有持仓基础上增量调整
"""
import pandas as pd
import numpy as np
from src.allocator import PositionAllocator
from src.constraint_engine import ConstraintEngine
from typing import Dict
import logging
logger = logging.getLogger(__name__)
class IncrementalAllocator:
"""
★ 增量仓位分配器
场景:已有持仓 P1, P2, ... Pn,新资金 ΔM 到来
策略:
1. 不对原有持仓做任何变动
2. 只对 ΔM 执行分配算法
3. 叠加到现有持仓上
4. 若叠加后超上限 → 溢出重分配
"""
def __init__(
self,
allocator: PositionAllocator,
constraint_engine: ConstraintEngine
):
self.allocator = allocator
self.constraint = constraint_engine
logger.info("增量分配器初始化")
def allocate_incremental(
self,
existing_positions: Dict[str, float],
new_capital: float,
scores: pd.Series,
daily_amounts: pd.Series = None
) -> Dict[str, float]:
"""
★ 核心方法:增量分配
参数:
existing_positions: {code: 当前持仓金额}
new_capital: 新增资金(元)
scores: 候选股评分
daily_amounts: 日均成交额(可选)
返回:
{code: 新增分配金额}
"""
if new_capital <= 0:
return {}
# 只对新增资金做分配
new_alloc = self.allocator.allocate(new_capital, scores, None)
# 叠加到现有持仓
total_positions = {}
for code in set(list(existing_positions.keys()) + list(new_alloc.index)):
existing = existing_positions.get(code, 0)
new = new_alloc.get(code, 0)
total_positions[code] = existing + new
# 检查约束
total_capital = sum(existing_positions.values()) + new_capital
total_series = pd.Series(total_positions)
constrained = self.constraint.apply(
total_capital, total_series, daily_amounts
)
# 计算最终增量(可能被约束削减)
final_incremental = {}
for code in new_alloc.index:
existing = existing_positions.get(code, 0)
final_total = constrained.get(code, 0)
incremental = max(0, final_total - existing)
if incremental > 0:
final_incremental[code] = incremental
logger.info(
f"增量分配: ¥{new_capital:,.0f} → "
f"{len(final_incremental)} 只标的,"
f"实际分配 ¥{sum(final_incremental.values()):,.0f}"
)
return final_incremental
"src/position_backtester.py"
"""
position_backtester.py
仓位管理回测:对比不同分配算法的资金曲线
"""
import pandas as pd
import numpy as np
from src.allocator import PositionAllocator
from src.constraint_engine import ConstraintEngine
from typing import Dict, List
def run_position_backtest(
scores_df: pd.DataFrame,
price_df: pd.DataFrame,
total_capital: float = 1_000_000,
methods: List[str] = None,
commission_rate: float = 0.0003,
stamp_tax_rate: float = 0.001
) -> Dict:
"""
对比不同仓位分配算法的回测
参数:
scores_df: 含 date, code, score 列
price_df: 含 date, code, close 列(用于模拟持仓变化)
total_capital: 总资金
methods: 要对比的算法列表
返回:
{method: {'nav': 净值 Series, 'allocations': 分配明细}}
"""
if methods is None:
methods = ['equal', 'linear', 'exponential']
results = {}
for method in methods:
alloc = PositionAllocator(method=method)
constraint = ConstraintEngine()
nav = [total_capital]
dates = []
all_allocs = []
for date, group in scores_df.groupby('date'):
scores = pd.Series(
group['score'].values,
index=group['code'].values
)
# 分配
weights = alloc.allocate(total_capital, scores)
weights = constraint.apply(total_capital, weights)
# 模拟:按收盘价计算持仓价值
day_prices = price_df[price_df['date'] == date]
portfolio_val = 0
for code, amount in weights.items():
px = day_prices[day_prices['code'] == code]['close']
if len(px) > 0 and px.values[0] > 0:
shares = int(amount / px.values[0] / 100) * 100
portfolio_val += shares * px.values[0]
nav.append(portfolio_val)
dates.append(date)
for code, amount in weights.items():
all_allocs.append({
'date': date,
'code': code,
'allocation': amount,
'allocation_pct': amount / total_capital * 100,
'method': method
})
results[method] = {
'nav': pd.Series(nav, index=[scores_df['date'].iloc[0]] + dates),
'allocations': pd.DataFrame(all_allocs)
}
return results
def calc_position_metrics(nav_dict: Dict) -> pd.DataFrame:
"""计算各算法的评价指标"""
records = []
for method, data in nav_dict.items():
nav = data['nav'].dropna()
if len(nav) < 2:
continue
ret = nav.pct_change().dropna()
days = (nav.index[-1] - nav.index[0]).days
yrs = days / 365.25
tot = nav.iloc[-1] / nav.iloc[0] - 1
ann = (1 + tot) ** (1 / yrs) - 1 if yrs > 0 else 0
vol = ret.std() * 252 ** 0.5
sharpe = (ann - 0.025) / vol if vol > 0 else 0
dd = (nav - nav.cummax()) / nav.cummax()
records.append({
'method': method,
'total_ret_pct': round(tot * 100, 2),
本文代码仅供学习与技术交流,不构成任何投资建议,股市有风险,入市需谨慎!
利用AI解决实际问题,如果你觉得这个工具好用,欢迎关注长安牧笛!