Python实战:用scipy.optimize.linprog构建超效率SBM模型评估环境效率
当我们需要评估不同地区或企业的环境效率时,传统的数据包络分析(DEA)方法往往难以处理非期望产出(如污染物排放)和效率值超过1的情况。这正是超效率SBM模型的用武之地——它不仅能处理期望产出和非期望产出,还能对高效决策单元进行更精确的排序。本文将带你用Python的scipy.optimize.linprog函数,从零开始实现这一复杂模型。
1. 理解超效率SBM模型的核心概念
超效率SBM模型是数据包络分析(DEA)框架下的一个重要扩展,它解决了传统DEA模型的两个关键局限:
- 非期望产出处理:在环境效率评估中,我们通常有期望产出(如GDP)和非期望产出(如CO2排放)。传统径向DEA模型无法正确处理这种混合产出。
- 效率值上限:传统DEA模型的效率值被限制在0-1之间,无法区分多个效率值为1的决策单元。
模型的核心思想是通过引入松弛变量(slack variables)来捕捉投入过剩和产出不足的情况,同时允许效率值超过1。其数学形式可以表示为:
min ρ = (1 - (1/m)Σ(s_i^- / x_io)) / (1 + (1/(s1+s2))Σ(s_r^+ / y_ro) + Σ(s_t^b / b_to))其中:
- m是投入指标数量
- s1和s2分别是期望和非期望产出数量
- s_i^-是投入松弛变量
- s_r^+和s_t^b是产出松弛变量
提示:在实际编程实现时,我们需要将这个非线性分式规划问题转化为线性规划问题,这正是scipy.optimize.linprog的用武之地。
2. 数据准备与预处理
在开始建模前,我们需要准备三类数据:
- 投入指标:如劳动力、资本、能源消耗等
- 期望产出:如GDP、工业产值等
- 非期望产出:如CO2排放、废水排放等
import numpy as np # 示例数据:3个决策单元,2个投入,1个期望产出,1个非期望产出 inputs = np.array([ [5, 10], # DMU1的投入 [8, 12], # DMU2的投入 [6, 15] # DMU3的投入 ]) good_outputs = np.array([ [20], # DMU1的期望产出 [25], # DMU2的期望产出 [18] # DMU3的期望产出 ]) bad_outputs = np.array([ [8], # DMU1的非期望产出 [10], # DMU2的非期望产出 [12] # DMU3的非期望产出 ])数据标准化是重要的一步,特别是当不同指标的数值尺度差异很大时:
def normalize_data(data): means = np.mean(data, axis=0) stds = np.std(data, axis=0) return (data - means) / stds norm_inputs = normalize_data(inputs) norm_good = normalize_data(good_outputs) norm_bad = normalize_data(bad_outputs)3. 构建超效率SBM模型的线性规划问题
超效率SBM模型的核心是将原始分式规划问题转化为线性规划问题。我们需要为每个决策单元构建以下要素:
- 目标函数系数向量c
- 不等式约束矩阵A_ub和向量b_ub
- 等式约束矩阵A_eq和向量b_eq
from scipy.optimize import linprog def super_sbm(inputs, good_outputs, bad_outputs, dmu_index): num_dmu = inputs.shape[0] num_input = inputs.shape[1] num_good = good_outputs.shape[1] num_bad = bad_outputs.shape[1] # 目标函数系数 c = np.zeros(1 + num_dmu + num_input + num_good + num_bad) c[0] = 1 # 对应目标函数中的ρ # 构建约束矩阵 ## 投入约束 input_constraints = np.hstack([ -inputs[dmu_index].reshape(-1, 1), # -x_0 inputs.T, # X -np.eye(num_input), # s_i^- np.zeros((num_input, num_good + num_bad)) # 产出松弛变量 ]) ## 期望产出约束 good_output_constraints = np.hstack([ good_outputs[dmu_index].reshape(-1, 1), # y_0 -good_outputs.T, # -Y np.zeros((num_good, num_input)), # 投入松弛变量 np.eye(num_good), # s_r^+ np.zeros((num_good, num_bad)) # 非期望产出松弛变量 ]) ## 非期望产出约束 bad_output_constraints = np.hstack([ -bad_outputs[dmu_index].reshape(-1, 1), # -b_0 bad_outputs.T, # B np.zeros((num_bad, num_input + num_good)), # 其他松弛变量 np.eye(num_bad) # s_t^b ]) ## 组合所有不等式约束 A_ub = np.vstack([input_constraints, good_output_constraints, bad_output_constraints]) b_ub = np.zeros(A_ub.shape[0]) ## 等式约束:λ之和=1 (VRS假设) A_eq = np.zeros(1 + num_dmu + num_input + num_good + num_bad) A_eq[1:1+num_dmu] = 1 A_eq = A_eq.reshape(1, -1) b_eq = np.array([1]) ## 变量边界 bounds = [(0, None)] * (1 + num_dmu) + [(0, None)] * (num_input + num_good + num_bad) # 求解线性规划 res = linprog(c, A_ub=A_ub, b_ub=b_ub, A_eq=A_eq, b_eq=b_eq, bounds=bounds) return res.x[0] # 返回效率值ρ4. 批量计算与结果分析
有了单个决策单元的计算函数后,我们可以批量计算所有决策单元的效率值:
def calculate_all_super_sbm(inputs, good_outputs, bad_outputs): num_dmu = inputs.shape[0] efficiencies = [] for i in range(num_dmu): eff = super_sbm(inputs, good_outputs, bad_outputs, i) efficiencies.append(eff) return np.array(efficiencies) # 计算所有DMU的效率值 efficiencies = calculate_all_super_sbm(norm_inputs, norm_good, norm_bad) # 打印结果 for i, eff in enumerate(efficiencies): print(f"DMU{i+1}的超效率SBM效率值: {eff:.4f}")结果解读要点:
- 效率值>1表示该决策单元比生产前沿面更高效
- 效率值=1表示位于生产前沿面上
- 效率值<1表示存在效率改进空间
我们可以进一步将结果可视化:
import matplotlib.pyplot as plt plt.figure(figsize=(10, 6)) plt.bar(range(1, len(efficiencies)+1), efficiencies, color='skyblue') plt.axhline(y=1, color='r', linestyle='--') plt.xlabel('决策单元') plt.ylabel('效率值') plt.title('超效率SBM模型评估结果') plt.grid(axis='y', alpha=0.3) plt.show()5. 模型优化与实用技巧
在实际应用中,我们可能会遇到各种问题。以下是几个常见问题的解决方案:
问题1:大规模数据计算速度慢
解决方案:
- 使用稀疏矩阵存储约束条件
- 并行计算各决策单元的效率值
from multiprocessing import Pool def parallel_super_sbm(args): inputs, good_outputs, bad_outputs, i = args return super_sbm(inputs, good_outputs, bad_outputs, i) def calculate_parallel(inputs, good_outputs, bad_outputs): num_dmu = inputs.shape[0] args = [(inputs, good_outputs, bad_outputs, i) for i in range(num_dmu)] with Pool() as p: efficiencies = p.map(parallel_super_sbm, args) return np.array(efficiencies)问题2:模型无可行解
可能原因和解决方法:
- 数据存在异常值 → 检查并清洗数据
- 变量尺度差异过大 → 标准化数据
- 约束条件矛盾 → 检查模型设定
问题3:解释效率驱动因素
我们可以通过计算松弛变量来分析效率低下的原因:
def analyze_slacks(res, num_input, num_good, num_bad): slacks = res.x[1+num_input:1+num_input+num_good+num_bad] input_slacks = slacks[:num_input] good_slacks = slacks[num_input:num_input+num_good] bad_slacks = slacks[num_input+num_good:] print("投入过剩情况:", input_slacks) print("期望产出不足情况:", good_slacks) print("非期望产出过剩情况:", bad_slacks)6. 实际应用案例:中国各省工业碳排放效率评估
让我们用一个更实际的案例来演示模型的应用。假设我们有中国30个省份的工业部门数据:
- 投入指标:工业从业人员(万人)、工业资本存量(亿元)、能源消费量(万吨标煤)
- 期望产出:工业增加值(亿元)
- 非期望产出:工业CO2排放量(万吨)
# 模拟数据 - 实际应用中应从CSV或数据库读取 province_names = [f"省份{i}" for i in range(1, 31)] industrial_labor = np.random.uniform(50, 500, 30) industrial_capital = np.random.uniform(1000, 10000, 30) energy_consumption = np.random.uniform(1000, 8000, 30) industrial_output = np.random.uniform(500, 5000, 30) co2_emission = industrial_output * np.random.uniform(0.5, 2.0, 30) # 构建输入输出矩阵 inputs = np.column_stack([industrial_labor, industrial_capital, energy_consumption]) good_outputs = industrial_output.reshape(-1, 1) bad_outputs = co2_emission.reshape(-1, 1) # 计算效率 province_efficiencies = calculate_all_super_sbm( normalize_data(inputs), normalize_data(good_outputs), normalize_data(bad_outputs) ) # 按效率值排序 ranking = np.argsort(-province_efficiencies) print("省份效率排名:") for i, idx in enumerate(ranking): print(f"{i+1}. {province_names[idx]}: {province_efficiencies[idx]:.3f}")结果分析表:
| 排名 | 省份 | 效率值 | 主要改进方向 |
|---|---|---|---|
| 1 | 省份X | 1.215 | - |
| 2 | 省份Y | 1.103 | 能源利用效率 |
| 3 | 省份Z | 0.987 | 资本产出效率 |
| ... | ... | ... | ... |
| 28 | 省份A | 0.652 | 能源和劳动力效率 |
| 29 | 省份B | 0.589 | 全面改进 |
| 30 | 省份C | 0.521 | 碳排放强度过高 |
这个排名可以帮助政策制定者识别哪些省份的工业部门效率较高,哪些需要改进,以及改进的具体方向(如减少能源消耗或降低碳排放)。