项目

可暂停的数据清洗与 LLM 数据增强流水线

一个面向训练语料加工的长任务内核,用关系化语料存储、DB 游标状态机、Redis 暂停标志和 upsert 回写支撑清洗、增强与 JSONL 导出。

  • Python
  • Celery
  • Redis
  • MySQL
  • SQLAlchemy async
  • MinIO
  • OpenAI SDK
  • Jinja2
  • SimHash
用 DB 游标记录 current_file_id、current_offset 和 processed_rows,支持长任务断点续跑。清洗 batch_size=10000,LLM 增强 batch_size=10,在吞吐与暂停响应之间分开取舍。复盘并修复 Response 增强因 upsert 漏传主键导致的读写同表无限循环。

可暂停的数据清洗与 LLM 数据增强流水线,是一个面向训练语料加工的长任务内核。它把数据集内容落进关系表,用 Celery 执行清洗与增强任务,用 DB 游标保存进度,用 Redis 标志位处理暂停,最终导出 JSONL 供训练链路使用。

项目服务于某 A 股上市环保集团的训练推理平台。客户真实名称、内部系统名、数据集名均已脱敏。

项目简介

这个模块负责「数据集上传之后、训练开始之前」的加工环节。它不训练模型,也不评价模型效果,核心职责是把原始训练样本加工成训练链路可以消费的结构化数据。

主要能力包括:

  • 数据清洗
  • SimHash 去重
  • LLM 数据增强
  • 多轮对话样本关系化存储
  • 任务暂停与断点续跑
  • JSONL 导出到 MinIO

问题定义

训练语料加工在企业平台里不能只按离线脚本处理。

清洗任务通常几分钟结束,但 LLM 增强可能跑几十到上百小时。任务时间一长,就会出现暂停、失败、重试、进度查询和数据污染问题。

这套内核要解决的是:

  • 用户点暂停时,任务能在 batch 边界保存进度。
  • worker 或模型调用失败后,任务能从上次游标继续。
  • 重跑同一批数据时,回写尽量幂等。
  • 原始数据和加工中间态能被查询、预览和分析。
  • 最终产物能导出为 JSONL,交给训练链路。

技术栈

  • Python / Celery:执行清洗、增强和导出任务。
  • Redis:保存暂停标志位。
  • MySQL / SQLAlchemy async:存储关系化语料和任务游标。
  • MinIO:保存导出的 JSONL 文件。
  • OpenAI SDK:执行 LLM 增强调用。
  • Jinja2:管理增强模板。
  • SimHash:做文档去重能力的一部分。

架构图

Dataset Upload
    |
    v
Relational Corpus Table
    |
    +--> Data Cleaning Task ----+
    |                           |
    +--> LLM Augmentation Task --+--> Upsert Writeback
    |                           |
    +--> Export JSONL Task ------+--> MinIO

Cross-cutting:
DB cursor state: current_file_id / current_offset / processed_rows
Redis pause flag: checked at batch boundaries
Celery queues: clean / augmentation / export

核心模块

关系化语料存储

多轮对话样本被拆成表记录,每条记录保留文件、会话行、轮次和一组 data_* 字段。这样平台可以做分页、查询、分析、软删除和重组。

这也是为什么没有只走文件流。文件适合导入导出,不适合作为平台中间态。

任务游标状态机

任务执行状态记录 current_file_idcurrent_offsetprocessed_rowsstatus。恢复任务时先读游标,跳过已完成文件,命中恢复文件后从 offset 继续。

恢复文件处理完之后,resume_offset 会归零,避免污染后续文件。

暂停机制

暂停通过 Redis 标志位轮询实现。任务在 batch 开头检查标志位,命中后保存游标并写入 paused 状态。

这个做法的重点不是终止任务,而是让任务停在一个能恢复的位置。

幂等回写

加工结果通过 INSERT ... ON DUPLICATE KEY UPDATE 回写。这个策略对断点续跑很重要,但它依赖主键命中。

Response 增强阶段曾经因为漏传 id,导致 upsert 退化为 insert,引发读写同表无限循环。修复点是回写时显式带上原始记录 id。

我负责的部分

  • 数据清洗和数据增强任务链。
  • 关系化语料表的读写与多轮对话重组。
  • DB 游标状态机和恢复逻辑。
  • Redis 暂停标志位检查。
  • upsert 幂等回写。
  • JSONL 导出链路。
  • Response 增强无限循环故障修复。

跨系统任务状态同步涉及外部业务系统,本页不展开其内部名称和实现。

难点与处理

最大的问题不是某个算法,而是长任务的工程边界。

清洗和增强采用不同 batch_size:清洗 10000,增强 10。前者偏吞吐,后者偏暂停响应。这个数字不是压测最优值,只代表当时的工程取舍。

另一个难点是幂等。upsert 可以支持断点重跑,但如果 payload 缺主键,就会从更新变成插入。Response 增强故障说明,幂等不能只靠调用方记忆,后续更好的做法是在 DAO 或 schema 层拒绝无主键更新。

还有一个边界是 LLM 失败处理。当前批量生成会隔离单条失败,用空串占位并记录成功 / 失败计数。生产上没有确认出现过空样本批量混入训练集的问题,但从工程角度看,空结果应该进入失败队列或重试队列,而不是只靠日志。

如何运行

真实项目代码不公开。按模块结构,运行链路大致是:

1. 上传数据集并解析入关系表
2. 提交 Celery 清洗任务
3. 任务按 batch 处理并回写进度游标
4. 如需增强,提交 LLM 数据增强任务
5. 增强结果 upsert 回写
6. 导出 JSONL 到 MinIO

可公开的任务参数与约束:

  • 清洗 batch_size=10000
  • 增强 batch_size=10
  • LLM 默认并发 max_concurrent=50
  • Celery 重试 max_retries=3
  • 指数退避上限 600s
  • 增强策略 6 种

链接

如果你正在做企业训练语料处理、RAG 知识库、文档解析或 LLM 数据增强,可以通过页面下方微信二维码或邮件沟通,邮箱:contact@aildnc.com

联系

预约一次 30 分钟技术诊断

正在评估企业知识库、AI 客服或 Agent 工作流时,可以先发问题背景。我会先帮你判断是否值得做、怎么做,以及主要风险在哪里。

微信二维码 优先加微信沟通