项目
可暂停的数据清洗与 LLM 数据增强流水线
一个面向训练语料加工的长任务内核,用关系化语料存储、DB 游标状态机、Redis 暂停标志和 upsert 回写支撑清洗、增强与 JSONL 导出。
可暂停的数据清洗与 LLM 数据增强流水线,是一个面向训练语料加工的长任务内核。它把数据集内容落进关系表,用 Celery 执行清洗与增强任务,用 DB 游标保存进度,用 Redis 标志位处理暂停,最终导出 JSONL 供训练链路使用。
项目服务于某 A 股上市环保集团的训练推理平台。客户真实名称、内部系统名、数据集名均已脱敏。
项目简介
这个模块负责「数据集上传之后、训练开始之前」的加工环节。它不训练模型,也不评价模型效果,核心职责是把原始训练样本加工成训练链路可以消费的结构化数据。
主要能力包括:
- 数据清洗
- SimHash 去重
- LLM 数据增强
- 多轮对话样本关系化存储
- 任务暂停与断点续跑
- JSONL 导出到 MinIO
问题定义
训练语料加工在企业平台里不能只按离线脚本处理。
清洗任务通常几分钟结束,但 LLM 增强可能跑几十到上百小时。任务时间一长,就会出现暂停、失败、重试、进度查询和数据污染问题。
这套内核要解决的是:
- 用户点暂停时,任务能在 batch 边界保存进度。
- worker 或模型调用失败后,任务能从上次游标继续。
- 重跑同一批数据时,回写尽量幂等。
- 原始数据和加工中间态能被查询、预览和分析。
- 最终产物能导出为 JSONL,交给训练链路。
技术栈
- Python / Celery:执行清洗、增强和导出任务。
- Redis:保存暂停标志位。
- MySQL / SQLAlchemy async:存储关系化语料和任务游标。
- MinIO:保存导出的 JSONL 文件。
- OpenAI SDK:执行 LLM 增强调用。
- Jinja2:管理增强模板。
- SimHash:做文档去重能力的一部分。
架构图
Dataset Upload
|
v
Relational Corpus Table
|
+--> Data Cleaning Task ----+
| |
+--> LLM Augmentation Task --+--> Upsert Writeback
| |
+--> Export JSONL Task ------+--> MinIO
Cross-cutting:
DB cursor state: current_file_id / current_offset / processed_rows
Redis pause flag: checked at batch boundaries
Celery queues: clean / augmentation / export
核心模块
关系化语料存储
多轮对话样本被拆成表记录,每条记录保留文件、会话行、轮次和一组 data_* 字段。这样平台可以做分页、查询、分析、软删除和重组。
这也是为什么没有只走文件流。文件适合导入导出,不适合作为平台中间态。
任务游标状态机
任务执行状态记录 current_file_id、current_offset、processed_rows 和 status。恢复任务时先读游标,跳过已完成文件,命中恢复文件后从 offset 继续。
恢复文件处理完之后,resume_offset 会归零,避免污染后续文件。
暂停机制
暂停通过 Redis 标志位轮询实现。任务在 batch 开头检查标志位,命中后保存游标并写入 paused 状态。
这个做法的重点不是终止任务,而是让任务停在一个能恢复的位置。
幂等回写
加工结果通过 INSERT ... ON DUPLICATE KEY UPDATE 回写。这个策略对断点续跑很重要,但它依赖主键命中。
Response 增强阶段曾经因为漏传 id,导致 upsert 退化为 insert,引发读写同表无限循环。修复点是回写时显式带上原始记录 id。
我负责的部分
- 数据清洗和数据增强任务链。
- 关系化语料表的读写与多轮对话重组。
- DB 游标状态机和恢复逻辑。
- Redis 暂停标志位检查。
- upsert 幂等回写。
- JSONL 导出链路。
- Response 增强无限循环故障修复。
跨系统任务状态同步涉及外部业务系统,本页不展开其内部名称和实现。
难点与处理
最大的问题不是某个算法,而是长任务的工程边界。
清洗和增强采用不同 batch_size:清洗 10000,增强 10。前者偏吞吐,后者偏暂停响应。这个数字不是压测最优值,只代表当时的工程取舍。
另一个难点是幂等。upsert 可以支持断点重跑,但如果 payload 缺主键,就会从更新变成插入。Response 增强故障说明,幂等不能只靠调用方记忆,后续更好的做法是在 DAO 或 schema 层拒绝无主键更新。
还有一个边界是 LLM 失败处理。当前批量生成会隔离单条失败,用空串占位并记录成功 / 失败计数。生产上没有确认出现过空样本批量混入训练集的问题,但从工程角度看,空结果应该进入失败队列或重试队列,而不是只靠日志。
如何运行
真实项目代码不公开。按模块结构,运行链路大致是:
1. 上传数据集并解析入关系表
2. 提交 Celery 清洗任务
3. 任务按 batch 处理并回写进度游标
4. 如需增强,提交 LLM 数据增强任务
5. 增强结果 upsert 回写
6. 导出 JSONL 到 MinIO
可公开的任务参数与约束:
- 清洗
batch_size=10000 - 增强
batch_size=10 - LLM 默认并发
max_concurrent=50 - Celery 重试
max_retries=3 - 指数退避上限 600s
- 增强策略 6 种
链接
如果你正在做企业训练语料处理、RAG 知识库、文档解析或 LLM 数据增强,可以通过页面下方微信二维码或邮件沟通,邮箱:contact@aildnc.com。