news 2026/5/3 4:07:14

小实验一:数据清洗+ai研判

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
小实验一:数据清洗+ai研判

在日常事件研判工作中,海量数据的预处理与清洗占据了大量时间。为了提升效率,我尝试借助AI“偷个懒”——通过编写自动化脚本,配合DeepSeek与ChatGPT构建半自动化的处理流程。具体思路:利用DeepSeek进行语言润色与Prompt优化,利用ChatGPT生成核心代码(包括正则匹配、数据清洗、批量处理逻辑等)。最初只想实现一个基础的清洗脚本,没想到逐步扩展到日志解析、特征提取与结果校验等环节,越做越上瘾。本文将分享这套“AI辅助编程”的实践思路与踩坑经验,希望对同样面临重复性研判工作的同学有所启发。

这是一开始的第一步。

这是一开始初版(给deepseek)的一段话,deepseek生成了一段:

然后我就将生成的语句都交给了chatgpt。chatgpt就生成了清洗的代码。

import os import re import numpy as np import pandas as pd # ========================= # 1. 用户可修改的输入/输出路径 # ========================= input_file = "data.xlsx" output_file = "cleaned_data.xlsx" # ========================= # 2. 用户可扩展的自定义清洗规则 # 说明: # - 事件名称:默认精确匹配 # - 关键特征 / 主机信息 / 资产分组:子串匹配 # - 空字符串表示该条件忽略 # - 若需要事件名称使用正则,可将 use_regex 设置为 True # ========================= custom_filters = [ { "事件名称": "APT事件(文件情报)", "关键特征": "攻击组织: ['沙虫']", "主机信息": "", "资产分组": "", "use_regex": False }, { "事件名称": "检测到木马", "关键特征": "", "主机信息": "**********", "资产分组": "*********************", "use_regex": False } ] # ========================= # 3. 固定规则配置 # ========================= DROP_COLUMNS = ["首次时间", "最近时间", "事件ID", "来源设备", "检测模块"] DROP_EVENT_NAME = "nmap端口扫描" DROP_ASSET_GROUPS = [ "**************************", "***********************" ] def print_shape(df, step_name): """打印当前数据形状""" print(f"[{step_name}] 当前数据形状: {df.shape[0]} 行 x {df.shape[1]} 列") def validate_file_exists(file_path): """检查输入文件是否存在""" if not os.path.exists(file_path): raise FileNotFoundError(f"输入文件不存在: {file_path}") def read_excel_file(file_path): """读取 Excel 文件""" try: df = pd.read_excel(file_path) return df except Exception as e: raise RuntimeError(f"读取 Excel 文件失败: {e}") def check_required_columns(df, required_columns, stage_desc=""): """检查所需列是否存在""" missing = [col for col in required_columns if col not in df.columns] if missing: raise ValueError(f"{stage_desc}缺少必要列: {missing}") def safe_str_series(series): """ 将列安全转换为字符串,避免 NaN 报错 用于 contains / match 之类字符串处理 """ return series.fillna("").astype(str) def drop_fixed_columns(df): """删除固定列""" existing_cols = [col for col in DROP_COLUMNS if col in df.columns] missing_cols = [col for col in DROP_COLUMNS if col not in df.columns] if missing_cols: print(f"[提示] 以下待删除列不存在,将自动跳过: {missing_cols}") df = df.drop(columns=existing_cols, errors="ignore") print_shape(df, "删除固定列后") return df def drop_nmap_rows(df): """删除事件名称为 nmap端口扫描 的行""" check_required_columns(df, ["事件名称"], "删除 nmap 规则时") before = len(df) df = df[df["事件名称"] != DROP_EVENT_NAME].copy() removed = before - len(df) print(f"[删除事件名称规则] 删除 '事件名称' = '{DROP_EVENT_NAME}' 的行数: {removed}") print_shape(df, "删除 nmap端口扫描 后") return df def drop_asset_group_rows(df): """删除特定资产分组的行""" check_required_columns(df, ["资产分组"], "删除资产分组规则时") before = len(df) df = df[~df["资产分组"].isin(DROP_ASSET_GROUPS)].copy() removed = before - len(df) print(f"[删除资产分组规则] 删除指定资产分组行数: {removed}") print_shape(df, "删除指定资产分组 后") return df def build_custom_filter_mask(df, rule): """ 根据单条自定义规则构建掩码: - 事件名称:精确匹配,或正则匹配 - 关键特征 / 主机信息 / 资产分组:子串匹配 - 空条件忽略 - 同时满足所有非空条件时,判定为需删除 """ needed_cols = ["事件名称", "关键特征", "主机信息", "资产分组"] check_required_columns(df, needed_cols, "执行自定义清洗规则时") event_name = str(rule.get("事件名称", "")).strip() key_feature = str(rule.get("关键特征", "")).strip() host_info = str(rule.get("主机信息", "")).strip() asset_group = str(rule.get("资产分组", "")).strip() use_regex = bool(rule.get("use_regex", False)) # 初始 mask 全 True,后续逐步与运算 mask = pd.Series(True, index=df.index) # 事件名称匹配 if event_name: if use_regex: mask = mask & safe_str_series(df["事件名称"]).str.contains( event_name, regex=True, na=False ) else: mask = mask & (safe_str_series(df["事件名称"]) == event_name) # 关键特征子串匹配 if key_feature: mask = mask & safe_str_series(df["关键特征"]).str.contains( re.escape(key_feature), regex=True, na=False ) # 主机信息子串匹配 if host_info: mask = mask & safe_str_series(df["主机信息"]).str.contains( re.escape(host_info), regex=True, na=False ) # 资产分组子串匹配 if asset_group: mask = mask & safe_str_series(df["资产分组"]).str.contains( re.escape(asset_group), regex=True, na=False ) return mask def apply_custom_filters(df, custom_filters): """应用所有自定义清洗规则""" if not custom_filters: print("[自定义清洗] 未配置自定义规则,跳过。") return df total_removed = 0 for i, rule in enumerate(custom_filters, start=1): try: mask = build_custom_filter_mask(df, rule) matched = int(mask.sum()) print(f"\n[自定义规则 {i}] 规则内容: {rule}") print(f"[自定义规则 {i}] 命中行数: {matched}") if matched > 0: df = df[~mask].copy() total_removed += matched print_shape(df, f"应用自定义规则 {i} 后") else: print(f"[自定义规则 {i}] 无匹配,跳过删除。") except Exception as e: print(f"[警告] 自定义规则 {i} 执行失败,已跳过。原因: {e}") print(f"\n[自定义清洗完成] 总删除行数: {total_removed}") print_shape(df, "全部自定义规则执行后") return df def save_to_excel(df, file_path): """保存到 Excel""" try: df.to_excel(file_path, index=False) print(f"\n[完成] 清洗后的数据已保存到: {file_path}") except Exception as e: raise RuntimeError(f"保存 Excel 文件失败: {e}") def main(): try: # 1. 检查文件 validate_file_exists(input_file) # 2. 读取文件 df = read_excel_file(input_file) print_shape(df, "原始数据") # 3. 固定清洗规则 df = drop_fixed_columns(df) df = drop_nmap_rows(df) df = drop_asset_group_rows(df) # 4. 自定义清洗规则 df = apply_custom_filters(df, custom_filters) # 5. 保存结果 save_to_excel(df, output_file) except FileNotFoundError as e: print(f"[错误] {e}") except ValueError as e: print(f"[错误] {e}") except RuntimeError as e: print(f"[错误] {e}") except Exception as e: print(f"[未知错误] 程序执行失败: {e}") if __name__ == "__main__": main()

以上就是清洗的代码,不得不说chatgpt生成代码的能力真的很强。

然后这时候就想为何不利用大模型给我进行分析,原理就是利用云端的网站来配合进行分析。因此我主要的任务就是获得ai的api值以及所用到的情报中心平台的api值。

因此下面就是我利用deepseek+chatgpt来回修改的步骤:

第一回合

第二回合:

最后生成的最终脚本演示的效果还可以,方便的很多了。

尤其是最后生成的critical_events.xlsx副本。这个更加有助于我去分析了。

以上是展示的最终的效果,ai+事件研判效率快多了。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/16 5:36:01

Windows 10/11 上保姆级安装MRtrix3教程:用MSYS2搞定神经影像分析工具

Windows 10/11 神经影像分析利器:MRtrix3 全流程安装指南 神经影像分析领域的研究者们,是否曾因Windows平台缺乏专业工具而苦恼?今天我们将彻底解决这个痛点。MRtrix3作为当前最先进的扩散磁共振成像分析套件,其强大的纤维追踪和…

作者头像 李华
网站建设 2026/4/16 5:35:37

PointTransformer:如何让Transformer看懂无序的3D世界

1. 从Transformer到3D世界的跨越 想象一下,你面前有一堆散落的乐高积木块,它们没有固定的排列顺序,但最终能拼出一座城堡。这就是3D点云数据的特点——无序但蕴含结构。Transformer在自然语言处理和计算机视觉领域大放异彩,但面对…

作者头像 李华
网站建设 2026/4/16 5:33:12

SQL报销异常票据批量筛查语句,颠覆逐单查不合规票据低效模式,一键检索无票,超标异常账目批量出整改清单,机器批量审核完胜人工逐票翻看核验。

我见过太多企业的报销审核流程:财务坐在那里,像“大家来找茬”一样盯着一张张电子发票,寻找“连号发票”、“节假日消费”、“超标准住宿”等违规痕迹。今天,我们将利用智能会计中的“内部控制”与“实质性测试”理念,…

作者头像 李华
网站建设 2026/4/16 5:29:39

**发散创新:Python实现AI伦理合规性检测框架——从代码到责任的落地

发散创新:Python实现AI伦理合规性检测框架——从代码到责任的落地实践 在人工智能飞速发展的今天,模型偏见、数据滥用、决策黑箱等问题日益凸显。如何让AI系统不仅“聪明”,更“有道德”?本文将通过一个可落地的Python工具链&…

作者头像 李华
网站建设 2026/4/16 5:29:34

Golang怎么做贪心算法_Golang贪心算法教程【指南】

贪心算法在 Go 里不是靠“写个模板”就能套用的,它本质是“排序 单次遍历”的组合技,核心在于选对排序维度和判断条件。怎么选排序规则?看局部最优定义贪心能否成立,取决于你定义的“当前最好”是否真能导向全局最优。比如&#…

作者头像 李华