1.PandasAI简介
- 定义:结合Pandas和AI的开源Python库
- 核心功能:使用自然语言进行数据查询和分析
- 支持数据源:CSV、XLSX、PostgreSQL、MySQL、BigQuery、Databricks、Snowflake等
2.主要特点
- 自然语言查询:用日常语言提问数据问题
- 数据可视化:自动生成图表和图形
- 数据清理:处理缺失值问题
- 特征生成:提升数据质量
- 多数据源连接:支持多种数据库和文件格式
3.技术架构
工作原理: 自然语言查询 → AI模型理解 → 转换为Python/SQL代码 → 与数据交互 → 返回结果4.环境安装步骤
步骤1:源码包下载与依赖管理
- 使用Python 3.10.x版本
- 从GitLab仓库下载源码(对应01分支)
- 明确依赖版本以避免兼容性问题
步骤2:安装Python依赖
- 安装PandasAI及相关依赖包
- 需要修复官方代码中的bug(文章中提到的)
- 配置生成式AI模型(如OpenAI GPT)
步骤3:运行Demo
- 使用官方示例代码进行测试
- 验证自然语言查询功能
- 测试数据可视化和分析能力
5.应用价值
- 提升效率:减少编写复杂查询和分析代码的时间
- 降低门槛:非技术人员也能进行数据分析
- 全面功能:涵盖数据探索、清洗、可视化、特征工程全流程
6.技术资源
- 官方文档:https://docs.pandas-ai.com
- GitHub仓库:https://github.com/Sinaptik-AI/pandas-ai
- 源码仓库:GitLab(作者提供的实例代码)
实际应用场景
- 业务分析师:快速生成报表和洞察
- 数据科学家:加速数据探索和预处理
- 开发人员:简化数据查询和分析流程
- 产品经理:直接与数据对话获取指标
注意事项
- 需要适当修复官方代码中的bug
- 注意依赖版本兼容性
- 需要配置有效的AI模型API(如OpenAI)
这篇文章为初学者提供了完整的PandasAI入门指南,从环境搭建到实际应用,展示了如何利用AI技术简化传统的数据分析工作流程。
PandasAI实战:环境搭建与基本使用
完整步骤详解
步骤1:环境准备与安装
1.1 创建虚拟环境(推荐)
# 使用condaconda create -n pandasai_envpython=3.10conda activate pandasai_env# 或使用venvpython -m venv pandasai_env# Windowspandasai_env\Scripts\activate# Linux/Macsourcepandasai_env/bin/activate1.2 安装核心依赖
# 基础包pipinstallpandas numpy matplotlib seaborn# PandasAIpipinstallpandas-ai# 如果需要使用OpenAI等大模型pipinstallopenai# 或使用本地模型pipinstalllangchain1.3 验证安装
importpandasaspdimportpandasaiaspaiprint(f"Pandas版本:{pd.__version__}")print(f"PandasAI版本:{pai.__version__}")步骤2:准备测试DataFrame
importpandasaspdfromdatetimeimportdatetime# 创建示例DataFramedefcreate_sample_dataframe():data={'date':pd.date_range(start='2024-01-01',periods=30,freq='D'),'city':['北京']*10+['上海']*10+['广州']*10,'temperature':[2,3,1,4,2,3,5,6,4,3]+[8,9,10,8,7,9,11,10,8,9]+[18,19,20,21,22,20,19,21,22,23],'humidity':[45,47,50,48,46,49,51,52,50,48]+[65,66,68,67,65,69,70,68,67,66]+[75,76,78,77,79,76,75,78,77,76],'sales':[1000,1200,800,1500,900,1300,1400,1600,1100,1250]+[2000,2200,1800,2500,1900,2300,2400,2600,2100,2250]+[3000,3200,2800,3500,2900,3300,3400,3600,3100,3250],'category':['A','B','A','C','B','A','C','B','A','C']*3}df=pd.DataFrame(data)# 添加一些缺失值df.loc[5,'sales']=Nonedf.loc[15,'humidity']=Nonedf.loc[25,'temperature']=Nonereturndf# 创建并查看数据df=create_sample_dataframe()print("数据形状:",df.shape)print("\n前5行数据:")print(df.head())print("\n数据基本信息:")print(df.info())print("\n描述性统计:")print(df.describe())步骤3:配置PandasAI并创建MockLLM
frompandasaiimportSmartDataframefrompandasai.llmimportOpenAIfrompandasai.llm.local_llmimportLocalLLMimportwarnings warnings.filterwarnings('ignore')# 方案1:使用MockLLM(用于测试)classMockLLM:"""模拟LLM类,返回预设的代码"""def__init__(self):self.history=[]defcall(self,instruction:str,value:str,suffix:str=""):"""模拟LLM调用"""# 记录历史self.history.append({'instruction':instruction,'value':value,'suffix':suffix})# 根据问题返回预设的pandas代码if"前5行"ininstructionor"前5条"ininstruction:return"df.head(5)"elif"统计信息"ininstructionor"describe"ininstruction:return"df.describe()"elif"平均值"ininstructionor"平均温度"ininstruction:if"温度"ininstruction:return"df['temperature'].mean()"elif"湿度"ininstruction:return"df['humidity'].mean()"elif"销售额"ininstruction:return"df['sales'].mean()"elif"各城市"ininstructionand"平均温度"ininstruction:return"df.groupby('city')['temperature'].mean()"elif"缺失值"ininstruction:return"df.isnull().sum()"elif"北京"ininstructionand"销售额"ininstruction:return"df[df['city'] == '北京']['sales'].sum()"elif"折线图"ininstructionor"趋势"ininstruction:return""" import matplotlib.pyplot as plt plt.figure(figsize=(10, 6)) for city in df['city'].unique(): city_data = df[df['city'] == city] plt.plot(city_data['date'], city_data['temperature'], label=city, marker='o') plt.title('各城市温度趋势') plt.xlabel('日期') plt.ylabel('温度(°C)') plt.legend() plt.grid(True) plt.xticks(rotation=45) plt.tight_layout() plt.show() return plt """else:# 默认返回前3行return"df.head(3)"defchat(self,prompt:str):"""聊天接口"""returnself.call(prompt,"","")# 方案2:使用真实的OpenAI API(需要API密钥)# llm = OpenAI(api_token="your-api-key-here")# 创建MockLLM实例mock_llm=MockLLM()# 创建SmartDataframesdf=SmartDataframe(df,config={"llm":mock_llm,"verbose":True})步骤4:运行自然语言查询
defrun_queries(smart_df):"""运行一系列自然语言查询"""queries=["显示数据的前5行","给我数据的统计信息","计算平均温度是多少?","计算各城市的平均温度","查看数据中的缺失值情况","计算北京的销售额总和","绘制各城市温度变化趋势的折线图"]results={}fori,queryinenumerate(queries,1):print(f"\n{'='*50}")print(f"查询{i}:{query}")print('-'*30)try:# 执行查询result=smart_df.chat(query)results[query]=result# 显示结果ifisinstance(result,pd.DataFrame):print(result.to_string())elifisinstance(result,pd.Series):print(result.to_string())elifhasattr(result,'show'):# 如果是matplotlib对象print("已生成图表")# 在实际环境中,可以保存图表# result.savefig(f"chart_{i}.png")else:print(f"结果:{result}")exceptExceptionase:print(f"查询失败:{e}")results[query]=f"错误:{e}"returnresults# 运行查询results=run_queries(sdf)步骤5:高级功能演示
defadvanced_features_demo():"""演示PandasAI的高级功能"""print("\n"+"="*60)print("高级功能演示")print("="*60)# 1. 数据清理示例print("\n1. 数据清理 - 处理缺失值")print("原始数据缺失情况:")print(df.isnull().sum())# 使用PandasAI进行数据清理(模拟)clean_query="清理数据中的缺失值,用平均值填充"print(f"\n执行查询:{clean_query}")# 在实际PandasAI中,这会生成相应的清理代码# 2. 特征工程示例print("\n2. 特征工程 - 创建新特征")feature_query="创建一个新特征'temp_category',根据温度分类:低温(<10)、中温(10-20)、高温(>20)"print(f"执行查询:{feature_query}")# 手动实现以演示defcategorize_temp(temp):ifpd.isna(temp):return'未知'eliftemp<10:return'低温'eliftemp<=20:return'中温'else:return'高温'df['temp_category']=df['temperature'].apply(categorize_temp)print("新增特征后的数据前5行:")print(df[['date','city','temperature','temp_category']].head())# 3. 数据聚合分析print("\n3. 数据聚合分析")agg_query="按城市和温度类别统计平均销售额"print(f"执行查询:{agg_query}")agg_result=df.groupby(['city','temp_category'])['sales'].mean()print(agg_result)# 4. 时间序列分析print("\n4. 时间序列分析 - 计算7天移动平均")ts_query="计算每个城市销售额的7天移动平均值"print(f"执行查询:{ts_query}")# 演示代码df.set_index('date',inplace=True)forcityindf['city'].unique():city_sales=df[df['city']==city]['sales']ma_7=city_sales.rolling(window=7).mean()print(f"{city}的7天移动平均销售额:{ma_7.dropna().iloc[-1]iflen(ma_7.dropna())>0else'数据不足'}")df.reset_index(inplace=True)returndf# 运行高级功能演示enhanced_df=advanced_features_demo()步骤6:实际应用场景示例
defreal_world_scenarios():"""实际应用场景演示"""print("\n"+"="*60)print("实际应用场景示例")print("="*60)# 场景1:销售数据分析print("\n场景1: 销售数据分析")sales_scenarios=["哪个月份的销售额最高?","哪个城市的平均销售额最高?","按类别分析销售额分布","找出销售额最高的3天"]forscenarioinsales_scenarios:print(f"\n问题:{scenario}")# 在实际PandasAI中,可以直接用自然语言查询# result = sdf.chat(scenario)# print(f"答案: {result}")# 场景2:气象数据分析print("\n场景2: 气象数据分析")weather_scenarios=["哪个城市的温度波动最大?","温度和湿度之间有什么关系?","预测未来3天的温度趋势","找出异常的温度值"]forscenarioinweather_scenarios:print(f"\n问题:{scenario}")# 场景3:业务报告生成print("\n场景3: 自动生成业务报告")report_query=""" 生成一份数据分析报告,包括: 1. 总体销售情况概览 2. 各城市表现对比 3. 温度对销售的影响分析 4. 主要发现和建议 """print(f"\n报告生成请求:{report_query}")print("\n模拟报告内容:")print("-"*40)print("数据分析报告")print("-"*40)print("1. 总体销售情况:")print(f" 总销售额:{df['sales'].sum():,.0f}元")print(f" 平均日销售额:{df['sales'].mean():,.0f}元")print(f" 销售天数:{df['date'].nunique()}天")print("\n2. 各城市表现对比:")city_sales=df.groupby('city')['sales'].sum()forcity,salesincity_sales.items():print(f"{city}:{sales:,.0f}元")print("\n3. 温度对销售的影响:")temp_sales_corr=df['temperature'].corr(df['sales'])print(f" 温度与销售额的相关系数:{temp_sales_corr:.3f}")print("\n4. 主要发现和建议:")print(" - 上海和广州的销售额明显高于北京")print(" - 温度与销售额呈正相关关系")print(" - 建议在温度较高的季节加大营销力度")# 运行应用场景演示real_world_scenarios()步骤7:完整示例代码整合
# 完整的示例代码defcomplete_demo():""" PandasAI完整演示 包含环境检查、数据准备、查询执行和结果展示 """print("PandasAI 完整演示")print("="*60)try:# 1. 环境检查print("1. 检查环境...")importpandasaspdimportnumpyasnpfrompandasaiimportSmartDataframeprint(" ✓ 环境检查通过")# 2. 创建数据print("\n2. 创建示例数据...")df=create_sample_dataframe()print(f" ✓ 创建了包含{len(df)}行数据的DataFrame")# 3. 初始化PandasAIprint("\n3. 初始化PandasAI...")mock_llm=MockLLM()sdf=SmartDataframe(df,config={"llm":mock_llm,"verbose":False})print(" ✓ PandasAI初始化完成")# 4. 执行示例查询print("\n4. 执行自然语言查询...")print("\n示例查询1: '显示前3行数据'")result1=sdf.chat("显示前3行数据")print(result1)print("\n示例查询2: '计算平均销售额'")result2=sdf.chat("计算平均销售额")print(f"平均销售额:{result2}")print("\n示例查询3: '按城市分组统计平均温度'")result3=sdf.chat("按城市分组统计平均温度")print(result3)print("\n✓ 演示完成!")return{'dataframe':df,'smart_dataframe':sdf,'results':{'前3行数据':result1,'平均销售额':result2,'各城市平均温度':result3}}exceptImportErrorase:print(f"✗ 导入错误:{e}")print("请确保已安装必要的包:")print("pip install pandas pandas-ai numpy")returnNoneexceptExceptionase:print(f"✗ 发生错误:{e}")returnNone# 运行完整演示demo_results=complete_demo()ifdemo_results:print("\n"+"="*60)print("演示总结")print("="*60)print(f"1. 数据规模:{len(demo_results['dataframe'])}行 ×{len(demo_results['dataframe'].columns)}列")print(f"2. 成功执行查询数:{len(demo_results['results'])}")print(f"3. 使用的列:{list(demo_results['dataframe'].columns)}")print("\n您可以使用以下方式继续探索:")print(" sdf.chat('您的自然语言问题')")print("\n例如:")print(" sdf.chat('哪天的销售额最高?')")print(" sdf.chat('绘制温度分布直方图')")print(" sdf.chat('按星期分析销售趋势')")关键要点总结
1.核心优势
- 自然语言接口:无需编写复杂的Pandas代码
- 智能分析:自动生成分析代码和可视化
- 降低门槛:业务人员可直接与数据对话
2.使用建议
# 最佳实践# 1. 明确问题question="分析2024年1月各城市的销售趋势"# 2. 逐步细化sub_questions=["计算各城市1月总销售额","比较各城市日均销售额","绘制销售额趋势图"]# 3. 验证结果forqinsub_questions:result=sdf.chat(q)print(f"问题:{q}")print(f"结果:{result}\n")3.注意事项
- MockLLM仅用于测试,生产环境需要真实的LLM
- 复杂查询可能需要多次交互
- 结果需要人工验证准确性
- 注意数据隐私和安全
4.下一步学习
- 接入真实LLM(OpenAI、本地模型等)
- 学习高级数据连接功能
- 探索自定义函数和插件
- 了解性能优化技巧
这个完整的示例展示了PandasAI从环境搭建到实际应用的全过程。通过MockLLM模拟,您可以在本地环境中体验PandasAI的自然语言查询能力,为后续接入真实AI模型打下基础。
PandasAI进阶实战:深入学习路径详解
4. 下一步学习详细指南
4.1 接入真实LLM(OpenAI、本地模型等)
4.1.1 接入OpenAI API
# 安装必要的包# pip install openai pandasai python-dotenvimportosfromdotenvimportload_dotenvfrompandasaiimportSmartDataframefrompandasai.llmimportOpenAIimportpandasaspd# 1. 配置API密钥load_dotenv()# 从.env文件加载环境变量# 方法1:使用环境变量os.environ["OPENAI_API_KEY"]="your-api-key-here"# 方法2:直接配置llm=OpenAI(api_token="sk-your-openai-api-key",model="gpt-4",# 或 "gpt-3.5-turbo"temperature=0.7,max_tokens=1000,timeout=120,# 请求超时时间)# 2. 创建数据集data={"产品":["手机","平板","电脑","手表","耳机"]*4,"季度":["Q1"]*5+["Q2"]*5+["Q3"]*5+["Q4"]*5,"销售额":[10000,8000,15000,5000,3000]*4,"成本":[6000,5000,10000,3000,1800]*4,"地区":["华东","华南","华北","华西","华中"]*4}df=pd.DataFrame(data)# 3. 创建SmartDataframesdf=SmartDataframe(df,config={"llm":llm,"verbose":True,# 显示详细日志"save_logs":True,# 保存日志"enable_cache":True,# 启用缓存"max_retries":3# 最大重试次数})# 4. 使用真实LLM进行查询queries=["计算每个产品的平均销售额","哪个季度的总销售额最高?","绘制各产品销售额的柱状图","计算每个产品的利润率((销售额-成本)/销售额)","分析各地区的销售表现并给出建议"]fori,queryinenumerate(queries,1):print(f"\n查询{i}:{query}")print("-"*50)try:result=sdf.chat(query)ifhasattr(result,'__repr__'):print(result)else:print("查询完成!")exceptExceptionase:print(f"查询失败:{str(e)}")# 5. 高级配置示例advanced_config={"llm":OpenAI(api_token="your-api-key",model="gpt-4",temperature=0.3,# 更确定的输出max_tokens=2000,top_p=0.9,frequency_penalty=0.1,presence_penalty=0.1,),"conversational":True,# 启用对话模式"memory":True,# 启用记忆功能"custom_prompts":{"data_visualization":"请为以下数据创建可视化图表:{prompt}"},"custom_whitelisted_dependencies":["seaborn","plotly"]}4.1.2 接入本地开源模型(使用Ollama)
# 安装必要包# pip install ollama langchain pandasaifrompandasaiimportSmartDataframefrompandasai.llmimportOllamaimportpandasaspd# 1. 确保Ollama服务正在运行# 在终端运行:ollama serve# 下载模型:ollama pull llama2 或 ollama pull mistral# 2. 配置本地LLMlocal_llm=Ollama(model="llama2",# 或 "mistral", "codellama"base_url="http://localhost:11434",# Ollama默认地址temperature=0.7,max_tokens=2000,# 可选:设置自定义提示模板custom_prompt_template=""" 你是一个数据分析助手。用户会给你一个DataFrame和一些查询。 请用Python代码回答问题。 数据信息: {df_head} 用户查询:{query} 请生成合适的代码: """)# 3. 准备数据df=pd.read_csv("your_data.csv")# 或从其他源加载# 4. 创建SmartDataframesdf_local=SmartDataframe(df,config={"llm":local_llm,"verbose":True,"enforce_privacy":True,# 隐私模式,不发送数据到外部"use_error_correction_framework":True# 使用错误纠正框架})# 5. 测试查询try:result=sdf_local.chat("数据的基本统计信息是什么?")print(result)exceptExceptionase:print(f"错误:{e}")4.1.3 使用多模型切换
frompandasaiimportSmartDataframefrompandasai.llmimportOpenAI,Ollama,HuggingFaceLLMimportpandasaspdclassMultiModelManager:"""多模型管理器"""def__init__(self):self.models={}self.current_model=Nonedefregister_model(self,name,llm_instance):"""注册模型"""self.models[name]=llm_instancedefswitch_model(self,name):"""切换模型"""ifnameinself.models:self.current_model=self.models[name]returnTruereturnFalsedefget_model(self,name=None):"""获取模型"""ifname:returnself.models.get(name)returnself.current_model# 初始化管理器manager=MultiModelManager()# 注册多个模型manager.register_model("openai_gpt4",OpenAI(api_token="your-key",model="gpt-4"))manager.register_model("openai_gpt3",OpenAI(api_token="your-key",model="gpt-3.5-turbo"))manager.register_model("local_llama",Ollama(model="llama2",base_url="http://localhost:11434"))# 根据需求切换模型df=pd.DataFrame({"A":[1,2,3],"B":[4,5,6]})# 使用GPT-4处理复杂查询manager.switch_model("openai_gpt4")sdf_gpt4=SmartDataframe(df,config={"llm":manager.get_model()})complex_result=sdf_gpt4.chat("进行时间序列预测分析")# 使用本地模型处理简单查询manager.switch_model("local_llama")sdf_local=SmartDataframe(df,config={"llm":manager.get_model()})simple_result=sdf_local.chat("计算平均值")4.2 学习高级数据连接功能
4.2.1 连接多种数据库
# 安装必要包# pip install pandasai[sql] sqlalchemy psycopg2-binary pymysqlfrompandasaiimportSmartDataframefrompandasai.connectorsimport(SQLConnector,PostgreSQLConnector,MySQLConnector,SnowflakeConnector,BigQueryConnector)importpandasaspd# 1. PostgreSQL连接postgres_connector=PostgreSQLConnector(config={"host":"localhost","port":5432,"database":"your_database","username":"your_username","password":"your_password","table":"sales_data",# 或使用SQL查询# "query": "SELECT * FROM sales WHERE date > '2024-01-01'"})# 2. MySQL连接mysql_connector=MySQLConnector(config={"host":"localhost","port":3306,"database":"your_db","username":"root","password":"password","table":"customer_data"})# 3. Snowflake连接snowflake_connector=SnowflakeConnector(config={"account":"your_account","username":"your_username","password":"your_password","database":"your_database","schema":"your_schema","warehouse":"your_warehouse","role":"your_role","table":"large_dataset"})# 4. 通用SQL连接器generic_connector=SQLConnector(config={"connection_string":"postgresql://user:password@localhost/dbname","table":"your_table"})# 5. 创建SmartDataframe并查询connector=postgres_connector# 选择要使用的连接器sdf_db=SmartDataframe(connector,config={"llm":OpenAI(api_token="your-key"),"verbose":True})# 自然语言查询数据库queries=["显示最近30天的销售记录","计算每个地区的总销售额","找出销售额最高的10个产品","分析销售趋势并预测下个月销售额"]forqueryinqueries:print(f"\n查询:{query}")try:result=sdf_db.chat(query)ifisinstance(result,pd.DataFrame):print(f"返回{len(result)}行数据")print(result.head())else:print(result)exceptExceptionase:print(f"错误:{e}")4.2.2 多数据源联合查询
frompandasaiimportSmartDatalakefrompandasai.connectorsimport(PostgreSQLConnector,CSVConnector,ExcelConnector)# 1. 创建多个数据源连接器sales_connector=PostgreSQLConnector({"host":"localhost","database":"sales_db","table":"transactions"})customers_connector=CSVConnector({"path":"/path/to/customers.csv"})products_connector=ExcelConnector({"path":"/path/to/products.xlsx","sheet_name":"ProductInfo"})# 2. 创建数据湖(支持多数据源)datalake=SmartDatalake([sales_connector,customers_connector,products_connector],config={"llm":OpenAI(api_token="your-key"),"verbose":True})# 3. 跨数据源查询cross_source_queries=[# 关联查询"将销售数据与客户数据关联,分析VIP客户的购买行为",# 复杂分析"计算每个产品类别的销售额,并按地区分组",# 数据整合"创建完整的销售报告,包含产品信息、客户信息和交易详情",# 业务洞察"找出最受欢迎的产品组合,并建议捆绑销售策略"]forqueryincross_source_queries:print(f"\n跨源查询:{query}")try:result=datalake.chat(query)print(f"查询完成!")ifisinstance(result,pd.DataFrame):print(f"返回数据形状:{result.shape}")exceptExceptionase:print(f"错误:{e}")4.2.3 实时API数据连接
# 安装必要包# pip install requests pandasaiimportrequestsfrompandasaiimportSmartDataframefrompandasai.connectorsimportBaseConnectorimportpandasaspdclassAPIConnector(BaseConnector):"""自定义API连接器"""def__init__(self,config):self.api_url=config["api_url"]self.api_key=config.get("api_key")self.headers=config.get("headers",{})self.params=config.get("params",{})defhead(self,n=5):"""获取数据头部"""returnself._fetch_data().head(n)def_fetch_data(self):"""从API获取数据"""headers=self.headers.copy()ifself.api_key:headers["Authorization"]=f"Bearer{self.api_key}"response=requests.get(self.api_url,headers=headers,params=self.params,timeout=30)response.raise_for_status()data=response.json()# 假设API返回JSON列表returnpd.DataFrame(data)@propertydef_df(self):"""获取完整DataFrame"""returnself._fetch_data()# 使用自定义API连接器api_connector=APIConnector({"api_url":"https://api.example.com/data","api_key":"your_api_key_here","headers":{"Content-Type":"application/json"},"params":{"limit":1000}})sdf_api=SmartDataframe(api_connector,config={"llm":OpenAI(api_token="your-key"),"verbose":True})# 查询实时数据result=sdf_api.chat("分析最新的数据趋势")print(result)4.3 探索自定义函数和插件
4.3.1 创建自定义分析函数
frompandasaiimportSmartDataframefrompandasai.helpersimportcode_managerimportpandasaspdimportnumpyasnp# 1. 定义自定义函数库classCustomAnalytics:"""自定义分析函数库"""@staticmethoddefcalculate_cagr(start_value,end_value,periods):"""计算复合年增长率"""ifstart_value<=0:return0return(end_value/start_value)**(1/periods)-1@staticmethoddefdetect_anomalies_zscore(series,threshold=3):"""使用Z-score检测异常值"""mean=np.mean(series)std=np.std(series)z_scores=(series-mean)/stdreturnnp.abs(z_scores)>threshold@staticmethoddefcalculate_roi(investment,returns):"""计算投资回报率"""ifinvestment==0:return0return(returns-investment)/investment@staticmethoddefcreate_segments(data,column,bins,labels):"""创建数据分段"""returnpd.cut(data[column],bins=bins,labels=labels)# 2. 注册自定义函数df=pd.DataFrame({"month":pd.date_range("2024-01-01",periods=12,freq='M'),"revenue":[100,120,130,115,140,160,180,200,190,210,220,230],"cost":[70,80,85,75,90,100,120,130,125,140,150,155]})sdf_custom=SmartDataframe(df,config={"llm":OpenAI(api_token="your-key"),"custom_whitelisted_dependencies":["CustomAnalytics","calculate_cagr","detect_anomalies_zscore","calculate_roi","create_segments"],# 添加自定义导入"custom_imports":""" from custom_analytics import CustomAnalytics import numpy as np """})# 3. 使用自定义函数的查询custom_queries=["使用calculate_cagr函数计算收入的复合年增长率","使用detect_anomalies_zscore检测收入中的异常值","使用calculate_roi计算每个月的投资回报率","使用create_segments将收入分为低、中、高三段"]forqueryincustom_queries:print(f"\n自定义查询:{query}")try:result=sdf_custom.chat(query)print(result)exceptExceptionase:print(f"错误:{e}")4.3.2 创建自定义可视化插件
frompandasaiimportSmartDataframefrompandasai.middlewaresimportBaseMiddlewareimportplotly.graph_objectsasgoimportplotly.expressaspxclassPlotlyVisualizer(BaseMiddleware):"""Plotly可视化中间件"""defrun(self,code):"""修改生成的代码以使用Plotly"""# 检测matplotlib代码并替换为plotlyif"plt.show()"incodeor"matplotlib"incode:code=self._convert_to_plotly(code)returncodedef_convert_to_plotly(self,code):"""将matplotlib代码转换为plotly"""conversions={"import matplotlib.pyplot as plt":"import plotly.express as px\nimport plotly.graph_objects as go","plt.bar(":"go.Bar(","plt.plot(":"go.Scatter(","plt.scatter(":"go.Scatter(mode='markers', ","plt.hist(":"go.Histogram(","plt.show()":"fig.show()","plt.figure(":"fig = go.Figure(","plt.title(":"fig.update_layout(title=","plt.xlabel(":"fig.update_layout(xaxis_title=","plt.ylabel(":"fig.update_layout(yaxis_title=","plt.legend()":"fig.update_layout(showlegend=True)","plt.grid(":"# Grid removed for plotly",}forold,newinconversions.items():code=code.replace(old,new)returncodeclassCustomVisualizations:"""自定义可视化函数"""@staticmethoddefcreate_waterfall(df,values,labels,title="Waterfall Chart"):"""创建瀑布图"""fig=go.Figure(go.Waterfall(name="业绩",orientation="v",measure=["relative"]*len(df),x=df[labels],y=df[values],connector={"line":{"color":"rgb(63, 63, 63)"}},))fig.update_layout(title=title,showlegend=True,waterfallgap=0.3,)returnfig@staticmethoddefcreate_sunburst(df,path,values,title="Sunburst Chart"):"""创建旭日图"""fig=px.sunburst(df,path=path,values=values,title=title)returnfig# 使用自定义可视化df_viz=pd.DataFrame({"category":["A","B","C","A","B","C"],"subcategory":["A1","B1","C1","A2","B2","C2"],"value":[100,150,200,120,180,220],"month":["Jan","Jan","Jan","Feb","Feb","Feb"]})sdf_viz=SmartDataframe(df_viz,config={"llm":OpenAI(api_token="your-key"),"middlewares":[PlotlyVisualizer()],"custom_whitelisted_dependencies":["CustomVisualizations","create_waterfall","create_sunburst"],"save_charts":True,"save_charts_path":"./charts"})# 生成自定义可视化viz_queries=["使用create_waterfall创建价值的瀑布图","使用create_sunburst创建分类的旭日图","创建一个交互式的散点图矩阵"]forqueryinviz_queries:print(f"\n可视化查询:{query}")try:result=sdf_viz.chat(query)# 在Jupyter中会自动显示图表# 在脚本中,可以保存图表ifhasattr(result,'write_html'):result.write_html(f"chart_{query[:10]}.html")print("图表已保存为HTML文件")exceptExceptionase:print(f"错误:{e}")4.3.3 创建数据质量检查插件
importpandasaspdfrompandasaiimportSmartDataframefrompandasai.middlewaresimportBaseMiddlewareclassDataQualityChecker(BaseMiddleware):"""数据质量检查中间件"""def__init__(self,thresholds=None):self.thresholds=thresholdsor{"missing_threshold":0.3,"outlier_threshold":3,"duplicate_threshold":0.1}defrun(self,df):"""执行数据质量检查"""quality_report={"summary":{},"issues":[],"suggestions":[]}# 检查缺失值missing_percentage=df.isnull().sum()/len(df)high_missing=missing_percentage[missing_percentage>self.thresholds["missing_threshold"]]iflen(high_missing)>0:quality_report["issues"].append({"type":"high_missing_values","columns":high_missing.index.tolist(),"values":high_missing.values.tolist()})quality_report["suggestions"].append("考虑删除缺失值超过30%的列或使用插值方法")# 检查重复值duplicate_rows=df.duplicated().sum()duplicate_percentage=duplicate_rows/len(df)ifduplicate_percentage>self.thresholds["duplicate_threshold"]:quality_report["issues"].append({"type":"high_duplicates","count":duplicate_rows,"percentage":duplicate_percentage})quality_report["suggestions"].append("考虑删除重复行或调查数据收集过程")# 生成摘要quality_report["summary"]={"total_rows":len(df),"total_columns":len(df.columns),"missing_values":df.isnull().sum().sum(),"duplicate_rows":duplicate_rows,"data_types":df.dtypes.to_dict()}returnquality_report# 使用数据质量检查df_quality=pd.DataFrame({"A":[1,2,None,4,5],"B":[1,1,3,4,5],# 有重复"C":[100,200,300,400,500],"D":[None,None,3,4,5]# 高缺失})quality_checker=DataQualityChecker()# 创建SmartDataframe并添加质量检查sdf_quality=SmartDataframe(df_quality,config={"llm":OpenAI(api_token="your-key"),"custom_middlewares":[quality_checker]})# 自动质量检查print("数据质量报告:")quality_report=quality_checker.run(df_quality)forkey,valueinquality_report.items():print(f"\n{key}:")ifisinstance(value,dict):fork,vinvalue.items():print(f"{k}:{v}")elifisinstance(value,list):foriteminvalue:print(f"{item}")# 使用自然语言查询数据质量问题result=sdf_quality.chat("识别数据质量问题并给出修复建议")print(f"\nAI分析结果:\n{result}")4.4 了解性能优化技巧
4.4.1 查询优化与缓存策略
frompandasaiimportSmartDataframefrompandasai.llmimportOpenAIimportpandasaspdimporttimefromfunctoolsimportlru_cache# 1. 性能监控装饰器defperformance_monitor(func):"""性能监控装饰器"""defwrapper(*args,**kwargs):start_time=time.time()result=func(*args,**kwargs)end_time=time.time()execution_time=end_time-start_timeprint(f"函数{func.__name__}执行时间:{execution_time:.2f}秒")returnresultreturnwrapper# 2. 智能缓存系统classSmartCache:"""智能缓存系统"""def__init__(self,max_size=1000,ttl=3600):self.cache={}self.max_size=max_size self.ttl=ttl# 缓存生存时间(秒)self.access_times={}defget(self,key):"""获取缓存"""ifkeyinself.cache:# 检查是否过期iftime.time()-self.access_times[key]<self.ttl:self.access_times[key]=time.time()print(f"缓存命中:{key[:50]}...")returnself.cache[key]else:# 缓存过期delself.cache[key]delself.access_times[key]returnNonedefset(self,key,value):"""设置缓存"""iflen(self.cache)>=self.max_size:# 移除最久未使用的oldest_key=min(self.access_times,key=self.access_times.get)delself.cache[oldest_key]delself.access_times[oldest_key]self.cache[key]=value self.access_times[key]=time.time()# 3. 优化配置optimized_config={"llm":OpenAI(api_token="your-key",model="gpt-3.5-turbo",# 对于性能考虑,使用更快的模型temperature=0.1,# 降低随机性max_tokens=500,# 限制输出长度),# 性能优化选项"enable_cache":True,"cache_max_size":1000,"cache_lifetime":300,# 5分钟# 代码执行限制"max_execution_time":30,# 最大执行时间"max_retries":2,# 减少重试次数# 数据采样(对大数据集)"sample_size":10000,# 采样大小"sample_strategy":"head",# 采样策略# 并行处理"use_parallel":True,"max_workers":4,# 内存优化"optimize_memory":True,"chunk_size":10000,}# 4. 大数据处理优化classBigDataHandler:"""大数据处理器"""def__init__(self,df,chunk_size=10000):self.df=df self.chunk_size=chunk_size@performance_monitordefprocess_in_chunks(self,operation):"""分块处理数据"""results=[]total_chunks=(len(self.df)//self.chunk_size)+1foriinrange(total_chunks):start_idx=i*self.chunk_size end_idx=min((i+1)*self.chunk_size,len(self.df))chunk=self.df.iloc[start_idx:end_idx]print(f"处理块{i+1}/{total_chunks}({len(chunk)}行)")result=operation(chunk)results.append(result)# 清理内存delchunkreturnpd.concat(results,ignore_index=True)ifresultselsepd.DataFrame()# 5. 查询优化策略defoptimize_query(query,context=None):"""优化自然语言查询"""# 查询重写规则rewrite_rules={"显示所有数据":"显示前1000行数据","计算全部":"抽样计算","详细分析":"概要分析",}optimized_query=queryforpattern,replacementinrewrite_rules.items():ifpatterninquery:optimized_query=optimized_query.replace(pattern,replacement)print(f"查询已优化: '{pattern}' -> '{replacement}'")returnoptimized_query# 6. 性能测试defperformance_test():"""性能测试函数"""# 创建测试数据test_data=pd.DataFrame({"id":range(100000),"value":np.random.randn(100000),"category":np.random.choice(["A","B","C","D"],100000)})# 创建SmartDataframesdf_perf=SmartDataframe(test_data,config=optimized_config)# 测试查询test_queries=["计算value的平均值","按category分组统计","找出value最大的100条记录","创建value的直方图"]cache=SmartCache()forqueryintest_queries:print(f"\n测试查询:{query}")# 检查缓存cached_result=cache.get(query)ifcached_resultisnotNone:print("从缓存获取结果")result=cached_resultelse:# 优化查询optimized=optimize_query(query)# 执行查询start_time=time.time()result=sdf_perf.chat(optimized)end_time=time.time()# 缓存结果cache.set(query,result)print(f"查询执行时间:{end_time-start_time:.2f}秒")print(f"结果类型:{type(result)}")# 运行性能测试performance_test()# 7. 内存使用优化defmemory_optimization_tips():"""内存优化建议"""tips=""" PandasAI内存优化技巧: 1. 数据采样: - 对于探索性分析,使用数据样本 - 配置sample_size参数 2. 数据类型优化: - 将object类型转换为category - 使用适当的数据类型(int8, float32等) 3. 分块处理: - 大数据集分块处理 - 使用chunk_size参数 4. 及时清理: - 删除不需要的中间变量 - 使用del释放内存 5. 使用数据库: - 大数据存储在数据库中 - 让数据库执行聚合操作 6. 缓存策略: - 启用智能缓存 - 设置合理的TTL """print(tips)memory_optimization_tips()4.4.2 异步处理与并发优化
importasyncioimportconcurrent.futuresfrompandasaiimportSmartDataframeimportpandasaspdimportnumpyasnpclassAsyncPandasAI:"""异步PandasAI处理器"""def__init__(self,df,max_workers=4):self.df=df self.max_workers=max_workers self.executor=concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)asyncdefprocess_queries_async(self,queries):"""异步处理多个查询"""loop=asyncio.get_event_loop()# 准备任务tasks=[]forqueryinqueries:task=loop.run_in_executor(self.executor,self._process_single_query,query)tasks.append(task)# 并发执行results=awaitasyncio.gather(*tasks,return_exceptions=True)# 处理结果processed_results=[]fori,resultinenumerate(results):ifisinstance(result,Exception):print(f"查询 '{queries[i]}' 失败:{result}")processed_results.append(None)else:processed_results.append(result)returnprocessed_resultsdef_process_single_query(self,query):"""处理单个查询"""sdf=SmartDataframe(self.df,config={"llm":OpenAI(api_token="your-key"),"verbose":False})returnsdf.chat(query)defprocess_batch(self,queries,batch_size=10):"""批量处理查询"""all_results=[]foriinrange(0,len(queries),batch_size):batch=queries[i:i+batch_size]print(f"处理批次{i//batch_size+1}:{len(batch)}个查询")# 同步方式处理批次withconcurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers)asexecutor:future_to_query={executor.submit(self._process_single_query,query):queryforqueryinbatch}forfutureinconcurrent.futures.as_completed(future_to_query):query=future_to_query[future]try:result=future.result()all_results.append((query,result))print(f"✓ 完成:{query}")exceptExceptionase:print(f"✗ 失败:{query}-{e}")all_results.append((query,None))returnall_results# 异步处理示例asyncdefasync_example():"""异步处理示例"""# 创建测试数据df=pd.DataFrame({"date":pd.date_range("2024-01-01",periods=100,freq='D'),"value":np.random.randn(100)*100+1000,"category":np.random.choice(["A","B","C"],100)})# 创建处理器processor=AsyncPandasAI(df,max_workers=5)# 准备查询queries=["计算value的平均值","按category分组统计","创建时间序列图","检测异常值","预测未来7天的趋势","计算移动平均","分析周末和工作日的差异","创建热力图","计算相关性矩阵","生成统计报告"]print("开始异步处理...")# 方法1:异步处理results=awaitprocessor.process_queries_async(queries)# 方法2:批量处理(同步)# results = processor.process_batch(queries, batch_size=3)print("\n处理完成!")fori,(query,result)inenumerate(zip(queries,results)):ifresultisnotNone:print(f"{i+1}.{query}: 成功")else:print(f"{i+1}.{query}: 失败")# 运行异步示例(在支持async的环境中)# asyncio.run(async_example())4.4.3 监控与日志系统
importloggingimportjsonfromdatetimeimportdatetimefrompandasaiimportSmartDataframeimportpandasaspdclassPerformanceMonitor:"""性能监控系统"""def__init__(self,log_file="pandasai_performance.log"):self.log_file=log_file self.setup_logging()defsetup_logging(self):"""设置日志系统"""logger=logging.getLogger("PandasAI-Performance")logger.setLevel(logging.INFO)# 文件处理器file_handler=logging.FileHandler(self.log_file)file_handler.setLevel(logging.INFO)# 控制台处理器console_handler=logging.StreamHandler()console_handler.setLevel(logging.WARNING)# 格式化formatter=logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')file_handler.setFormatter(formatter)console_handler.setFormatter(formatter)logger.addHandler(file_handler)logger.addHandler(console_handler)self.logger=loggerdeflog_query(self,query,execution_time,result_size=None,status="success"):"""记录查询日志"""log_entry={"timestamp":datetime.now().isoformat(),"query":query,"execution_time":execution_time,"result_size":result_size,"status":status}self.logger.info(json.dumps(log_entry))# 性能警告ifexecution_time>10:# 超过10秒self.logger.warning(f"慢查询:{query}- 用时{execution_time:.2f}秒")defgenerate_performance_report(self,days=7):"""生成性能报告"""# 读取日志文件withopen(self.log_file,'r')asf:logs=[json.loads(line)forlineinfifline.strip()]# 分析性能ifnotlogs:return"暂无性能数据"# 计算统计信息execution_times=[log["execution_time"]forloginlogsif"execution_time"inlog]report={"total_queries":len(logs),"success_rate":sum(1forloginlogsiflog.get("status")=="success")/len(logs)*100,"avg_execution_time":sum(execution_times)/len(execution_times)ifexecution_timeselse0,"max_execution_time":max(execution_times)ifexecution_timeselse0,"slow_queries":[log["query"]forloginlogsiflog.get("execution_time",0)>10],"common_queries":self._get_common_queries(logs),"performance_trend":self._calculate_trend(logs)}returnreportdef_get_common_queries(self,logs,top_n=5):"""获取常见查询"""fromcollectionsimportCounter queries=[log["query"]forloginlogs]returnCounter(queries).most_common(top_n)def_calculate_trend(self,logs):"""计算性能趋势"""# 按日期分组daily_data={}forloginlogs:date=log["timestamp"][:10]# 提取日期ifdatenotindaily_data:daily_data[date]={"count":0,"total_time":0}daily_data[date]["count"]+=1daily_data[date]["total_time"]+=log.get("execution_time",0)# 计算每日平均值trend={}fordate,dataindaily_data.items():trend[date]=data["total_time"]/data["count"]returntrend# 使用监控系统defmonitored_analysis():"""带监控的分析"""# 创建监控器monitor=PerformanceMonitor()# 创建数据df=pd.DataFrame({"sales":np.random.randint(100,1000,1000),"profit":np.random.randint(10,200,1000),"region":np.random.choice(["North","South","East","West"],1000),"month":np.random.choice(["Jan","Feb","Mar","Apr"],1000)})# 创建SmartDataframesdf=SmartDataframe(df,config={"llm":OpenAI(api_token="your-key"),"verbose":False,"enable_cache":True})# 执行监控查询importtime queries=["计算各区域的平均销售额","分析销售额与利润的关系","预测下个月的销售趋势","找出表现最好的区域","创建销售仪表板"]forqueryinqueries:print(f"\n执行:{query}")start_time=time.time()try:result=sdf.chat(query)execution_time=time.time()-start_time# 记录日志result_size=len(result)ifhasattr(result,'__len__')elseNonemonitor.log_query(query=query,execution_time=execution_time,result_size=result_size,status="success")print(f"✓ 成功 - 用时:{execution_time:.2f}秒")exceptExceptionase:execution_time=time.time()-start_time monitor.log_query(query=query,execution_time=execution_time,status=f"error:{str(e)}")print(f"✗ 失败:{e}")# 生成性能报告print("\n"+"="*60)print("性能报告")print("="*60)report=monitor.generate_performance_report()forkey,valueinreport.items():ifisinstance(value,list):print(f"{key}:")foriteminvalue:print(f" -{item}")else:print(f"{key}:{value}")# 运行监控示例monitored_analysis()总结与最佳实践
学习路径总结
| 学习阶段 | 主要内容 | 关键技能 |
|---|---|---|
| 初级阶段 | 基础安装、MockLLM使用、简单查询 | 环境配置、基本语法 |
| 中级阶段 | 真实LLM集成、数据库连接、自定义函数 | API集成、SQL连接、函数扩展 |
| 高级阶段 | 性能优化、异步处理、插件开发 | 性能调优、并发编程、系统设计 |
| 专家阶段 | 架构设计、生产部署、团队协作 | 架构设计、CI/CD、团队管理 |
实用工具推荐
# 开发环境检查清单defcheck_development_environment():"""检查开发环境"""required_packages=["pandas","pandasai","openai",# 如果使用OpenAI"sqlalchemy",# 如果使用数据库"plotly",# 如果使用高级可视化"asyncio",# 如果使用异步"logging"# 如果使用日志]print("开发环境检查清单:")print("="*60)forpackageinrequired_packages:try:__import__(package.replace("-","_"))print(f"✓{package}")exceptImportError:print(f"✗{package}- 需要安装")print("\n建议配置:")print("1. 使用虚拟环境")print("2. 设置环境变量")print("3. 配置版本控制")print("4. 设置监控和日志")print("5. 实施测试策略")check_development_environment()生产部署建议
# 生产配置示例PRODUCTION_CONFIG={"llm":{"provider":"openai","model":"gpt-4","api_key_env_var":"OPENAI_API_KEY","timeout":30,"max_retries":3},"database":{"connection_pool_size":10,"max_overflow":20,"pool_recycle":3600},"performance":{"enable_cache":True,"cache_ttl":300,"max_cache_size":10000,"query_timeout":60,"max_result_size":100000},"security":{"data_masking":True,"log_sanitization":True,"api_rate_limit":100,"allowed_data_sources":["database1","api1"]},"monitoring":{"enable_logging":True,"log_level":"INFO","performance_metrics":True,"alert_threshold":{"response_time":10,"error_rate":0.01,"cache_hit_rate":0.8}}}# Docker部署示例DOCKER_COMPOSE_TEMPLATE=""" version: '3.8' services: pandasai-api: build: . ports: - "8000:8000" environment: - OPENAI_API_KEY=${OPENAI_API_KEY} - DATABASE_URL=${DATABASE_URL} - REDIS_URL=${REDIS_URL} volumes: - ./logs:/app/logs - ./cache:/app/cache depends_on: - redis - database redis: image: redis:alpine ports: - "6379:6379" database: image: postgres:13 environment: - POSTGRES_PASSWORD=${DB_PASSWORD} volumes: - postgres_data:/var/lib/postgresql/data volumes: postgres_data: """通过这个完整的进阶指南,您可以系统地学习PandasAI的高级功能,从基础使用到生产部署,全面提升数据分析自动化的能力。每个部分都包含实际代码示例和最佳实践,帮助您在实际项目中应用这些技术。