Project

Pauseable data cleaning and LLM augmentation pipeline

A long-running training-data pipeline core using relational corpus storage, DB cursor state, Redis pause flags, and idempotent upsert writeback.

  • Python
  • Celery
  • Redis
  • MySQL
  • SQLAlchemy async
  • MinIO
  • OpenAI SDK
  • Jinja2
  • SimHash
Persisted current_file_id, current_offset, and processed_rows in DB state to support resumable long-running tasks.Used batch_size=10000 for cleaning and batch_size=10 for LLM augmentation to separate throughput from pause responsiveness.Diagnosed and fixed a same-table infinite loop caused by missing primary key id in an upsert payload.

This pauseable data cleaning and LLM augmentation pipeline is a long-running processing core for training-data preparation. It stores corpus state in relational tables, runs cleaning and augmentation through Celery, persists progress with DB cursors, checks Redis pause flags, and exports JSONL for training.

The project served a training and inference platform for an anonymized A-share listed environmental services group. Client name, internal system names, and dataset names are not disclosed.

Overview

The module sits between dataset upload and model training. It does not train the model or claim model-quality improvements. Its job is to turn raw training samples into processed data that the training pipeline can consume.

Core capabilities:

  • data cleaning
  • SimHash deduplication
  • LLM data augmentation
  • relational storage for multi-turn samples
  • pause and resume for long-running tasks
  • JSONL export to MinIO

Problem

Training-data processing in an enterprise platform cannot behave like a one-off script.

Cleaning may finish in minutes, but LLM augmentation can run for tens of hours depending on dataset size. Once a job lasts that long, the platform must handle pause, failure, retry, progress query, and data contamination risk.

The pipeline needed to:

  • save progress when users pause a task
  • resume from the last cursor after failure
  • keep retry writeback idempotent
  • allow row-level query, preview, and analysis
  • export final JSONL for training

Stack

  • Python / Celery for cleaning, augmentation, and export tasks
  • Redis for pause flags
  • MySQL / SQLAlchemy async for corpus storage and task state
  • MinIO for exported JSONL files
  • OpenAI SDK for LLM augmentation calls
  • Jinja2 for augmentation templates
  • SimHash for part of the deduplication logic

Architecture

Dataset Upload
    |
    v
Relational Corpus Table
    |
    +--> Data Cleaning Task ----+
    |                           |
    +--> LLM Augmentation Task --+--> Upsert Writeback
    |                           |
    +--> Export JSONL Task ------+--> MinIO

Cross-cutting:
DB cursor state: current_file_id / current_offset / processed_rows
Redis pause flag: checked at batch boundaries
Celery queues: clean / augmentation / export

Core Modules

Relational Corpus Storage

Multi-turn samples are split into table rows with file id, conversation row, turn number, and a set of data_* fields. This makes pagination, query, analysis, soft deletion, and reconstruction possible.

This is why the pipeline did not only stream files. Files are fine for import and export. They are weak as interactive platform state.

Cursor State Machine

Task state stores current_file_id, current_offset, processed_rows, and status. Resume reads the cursor, skips completed files, and continues from the stored offset.

After the resume file is reached, resume_offset is reset to zero so later files do not inherit the same offset.

Pause Mechanism

Pause uses Redis flags. Tasks check the flag at batch boundaries, save the cursor, and mark the task as paused.

The point is not to kill the task. The point is to stop it at a recoverable position.

Idempotent Writeback

Processed records are written with INSERT ... ON DUPLICATE KEY UPDATE. This matters for retry and resume, but it depends on a primary key match.

One Response augmentation path missed the original record id, so upsert became insert. The task read and wrote the same table, causing the offset to chase a moving table tail. The fix was to include the original id in the write payload.

My Role

I worked on:

  • the cleaning and augmentation task chain
  • relational corpus reads and multi-turn reconstruction
  • DB cursor state and resume logic
  • Redis pause flag checks
  • idempotent upsert writeback
  • JSONL export
  • the Response augmentation infinite-loop fix

External business-system state sync is intentionally kept high-level in this public write-up.

Engineering Notes

The hardest part was not one algorithm. It was the boundary of long-running jobs.

Cleaning and augmentation used different batch sizes: 10000 for cleaning, 10 for augmentation. Cleaning prioritized throughput. Augmentation prioritized pause responsiveness. These numbers were practical engineering choices, not benchmark-proven optima.

Idempotency was another boundary. Upsert makes resume easier, but a missing primary key changes update into insert. A stronger design would reject update payloads without an id at the DAO or schema layer.

LLM failure handling also needs care. The batch generation path isolated single-sample failures and used empty strings as placeholders while recording success/failure counts. The project material says no production incident came from this, but a failed-sample queue would be safer than only relying on logs.

Run Flow

The real code is not public. The processing flow is:

1. Upload dataset and parse records into relational storage
2. Submit Celery cleaning task
3. Process batches and persist cursor state
4. Submit LLM augmentation task if needed
5. Upsert augmented results back to storage
6. Export JSONL to MinIO

Public implementation constraints:

  • cleaning batch_size=10000
  • augmentation batch_size=10
  • default LLM concurrency max_concurrent=50
  • Celery max_retries=3
  • exponential backoff capped at 600s
  • 6 augmentation strategies

If you are evaluating training-data processing, enterprise RAG, document parsing, or LLM augmentation work, contact me by email at contact@aildnc.com. For China-based inquiries, use the WeChat QR code below the article.

Contact

Book a 30-minute technical diagnosis

Share the business context first. I will help assess whether the AI application is worth building, how to approach it, and where the main risks are.

Telegram @NieErAI Message me on Telegram