环保 / 公用事业 / 企业 AI 平台 / 某 A 股上市环保集团,建设训练推理平台中的通用数据处理能力

某上市环保集团的训练语料数据处理流水线

训练语料清洗和 LLM 数据增强属于长耗时任务,清洗通常几分钟,增强可能跑几十到上百小时;平台必须支持用户暂停、失败续跑、查询分析和 JSONL 导出,同时不能破坏原始数据。

  • Python
  • Celery
  • Redis
  • MySQL
  • SQLAlchemy async
  • MinIO
  • OpenAI SDK
  • Jinja2
  • SimHash
方案

我把语料关系化存入 MySQL,用 DB 游标记录任务进度,用 Redis 标志位实现优雅暂停,用 upsert 做幂等回写,并按清洗与增强任务特性设置差异化 batch_size。

架构

数据集上传 -> 关系化语料表 -> Celery 清洗任务 -> LLM 数据增强任务 -> DB 游标状态机 / Redis 暂停标志 -> upsert 幂等回写 -> MinIO JSONL 导出。

结果

流水线支持长耗时训练语料任务的暂停和断点续跑;清洗任务量级为几分钟,数据增强按数据量可能跑几十到上百小时。一次 Response 增强无限循环故障已定位并修复,未对外声称训练效果、成本或业务指标提升。

训练语料数据处理流水线,是一套面向大模型训练数据清洗和增强的长任务系统。它把语料落进关系表,用 DB 游标保存任务进度,用 Redis 标志位处理暂停,用 upsert 保证重跑时尽量落到更新语义上。

这个案例服务于某 A 股上市环保集团的训练推理平台。客户真实名称不公开,内部系统名和数据集名也不公开。能讲清楚的是:它不是单个脚本,而是平台中供业务人员和训练团队使用的数据处理能力。

背景

平台里的语料大多来自环保项目相关资料,但功能不能绑定某个业务域。用户上传数据集后,需要做清洗、去重、LLM 数据增强,再导出 JSONL 给训练链路。

清洗任务通常几分钟能跑完。增强任务不同,数据量一上来就可能持续几十到上百小时。只要任务时间拉长,平台就会面对几个非常现实的问题:用户会点暂停,任务会失败,测试会要求重跑,前端还要展示进度。

如果这个能力只是离线脚本,失败后从头跑也许还能接受。放到企业平台里就不行。用户需要知道任务处理到哪一行,暂停后能不能继续,已经处理过的数据会不会重复写,原始数据还能不能查。

真正麻烦的地方

第一件麻烦事是数据不能只在文件里流过。平台要支持查询、分析和预览,多轮对话样本还要按会话行和轮次拆开。最后虽然要导出 JSONL,但处理中间态必须可查询。

第二件麻烦事是任务状态不能只放在 worker 里。worker 会重启,任务会暂停,增强任务可能跑很久。进度必须落库,否则恢复时只能从头开始。

第三件麻烦事是写入要幂等。暂停恢复和失败重试都可能处理到同一批数据。如果回写策略不稳,很容易重复插入、覆盖错行,或者把脏数据带进导出结果。

第四件麻烦事来自 LLM。清洗任务主要吃 CPU,增强任务要等模型调用,单条慢且不稳定。它们不能用同一套 batch 策略。

我负责的部分

我负责的是数据处理流水线内核,而不是整个训练推理平台的所有模块。这里把边界说清楚。

我做的部分包括:把训练语料拆成关系表记录;实现清洗与数据增强任务链;用 DB 游标保存 current_file_idcurrent_offsetprocessed_rows;用 Redis 标志位做可恢复的暂停;用 upsert 做断点重跑时的幂等回写;把处理结果导出为 JSONL 并写入 MinIO。

我也处理过这个模块里的关键故障。Response 增强阶段曾经因为 upsert 漏传主键 id,导致回写从更新退化为插入,任务读写同一张表时进入无限增长。这个问题后来通过补主键回写修复。

跨系统状态同步涉及外部业务系统。这里不展开对方系统的内部名称和架构,只保留「需要同步任务状态」这个边界。

方案与取舍

把语料放进关系表

训练样本不是直接以 JSONL 文件流过系统,而是先拆入 MySQL。每条记录带 dataset_file_idrow_identifierturn_taking 和一组 data_* 字段。多轮对话读取时,再按 row_identifier + id 排序重组。

这个设计的好处是平台能力更完整:可以分页、查询、分析、软删除,也可以按文件和 offset 恢复任务。代价是数据库承载了中间态,所有读取和回写都要考虑性能与一致性。

用 DB 游标做暂停和续跑

任务状态按 (task_id, stage, task_type) 保存。恢复时,系统读出上一次的文件和 offset。已处理文件跳过,命中恢复文件后从 offset 继续。

这里有一个容易被忽略的细节:恢复文件处理完之后,resume_offset 必须归零。否则这个 offset 会污染后续文件,让后续文件前 N 行被跳过。这个问题不一定报错,但数据会少。

暂停通过 Redis 标志位实现。任务在 batch 边界检查标志,命中后先保存游标,再把状态写成 paused。这样暂停和失败不会混在一起。

清洗和增强不共用 batch 策略

清洗任务 batch_size=10000,增强任务 batch_size=10

这两个数不是压测最优值,而是基于任务性质的取舍。清洗主要是规则和算法处理,单条很快,大 batch 能减少数据库往返。增强任务每条都可能调用 LLM,小 batch 能让暂停更快响应。

换句话说,清洗侧优先吞吐,增强侧优先可控。

upsert 幂等,以及它的风险

流水线用 INSERT ... ON DUPLICATE KEY UPDATE 做幂等回写。这样断点重跑时,同一批数据理论上会更新原行,而不是插入重复数据。

但理论成立有个前提:回写 payload 必须带主键。

Response 增强阶段曾经漏传 id。结果 upsert 没有命中更新,变成 insert。任务一边 offset 分页读,一边往目标表插新行,offset 永远追不上表尾。这次故障卡住四五天,产生千万级脏数据,最后由测试侧发现,脏数据直接删除。

这个问题表面是一行代码,实质是系统没有防住隐式约定。后续如果继续收紧,我会把「更新场景必须带主键」下沉到 DAO 或 schema 校验里。

结果与边界

这条流水线已经能支撑训练语料从上传后的清洗、增强到 JSONL 导出。任务进度不依赖 worker 内存,而是持久化到 DB 游标里;暂停和失败是不同状态;增强任务通过小 batch 提高暂停响应。

可以公开的时间量级是:一次全量清洗大约几分钟;数据增强取决于数据量,可能跑几十到上百小时。Response 增强无限循环故障已经定位并修复。

不能公开或不能声称的东西也要写清楚:没有可公开训练效果数字,没有成本节省数字,没有上线后业务指标变化。LLM 单条生成失败时用空串占位这个机制,生产上没有确认出过一批空样本混入训练集的问题,但它仍然是后续该补的质量门。

经验判断

这类项目的价值不在于把一个清洗脚本包装成平台页面。真正的分水岭是,任务跑到第几十个小时的时候,系统还能不能说清楚自己在哪、做过什么、下一次从哪里继续。

如果让我重新设计,我会更早把几条防线做硬:更新 payload 没主键就拒绝;空增强结果进入失败队列;暂停分支的多状态回写做成统一事务或统一接口。

看起来都是很底层的小事。

但长任务系统最后拼的,往往就是这些小事。

相关链接

如果你正在做企业训练语料处理、RAG 知识库、文档解析或 LLM 数据增强,可以通过页面下方微信二维码或邮件沟通,邮箱:contact@aildnc.com

联系

讨论类似项目

如果你正在评估类似的文档解析、企业 RAG、知识库或 AI 工作流,可以先发问题背景。 微信沟通优先,邮箱也可以:contact@aildnc.com。

微信二维码 扫码加微信沟通