Project
Pauseable data cleaning and LLM augmentation pipeline
A long-running training-data pipeline core using relational corpus storage, DB cursor state, Redis pause flags, and idempotent upsert writeback.
This pauseable data cleaning and LLM augmentation pipeline is a long-running processing core for training-data preparation. It stores corpus state in relational tables, runs cleaning and augmentation through Celery, persists progress with DB cursors, checks Redis pause flags, and exports JSONL for training.
The project served a training and inference platform for an anonymized A-share listed environmental services group. Client name, internal system names, and dataset names are not disclosed.
Overview
The module sits between dataset upload and model training. It does not train the model or claim model-quality improvements. Its job is to turn raw training samples into processed data that the training pipeline can consume.
Core capabilities:
- data cleaning
- SimHash deduplication
- LLM data augmentation
- relational storage for multi-turn samples
- pause and resume for long-running tasks
- JSONL export to MinIO
Problem
Training-data processing in an enterprise platform cannot behave like a one-off script.
Cleaning may finish in minutes, but LLM augmentation can run for tens of hours depending on dataset size. Once a job lasts that long, the platform must handle pause, failure, retry, progress query, and data contamination risk.
The pipeline needed to:
- save progress when users pause a task
- resume from the last cursor after failure
- keep retry writeback idempotent
- allow row-level query, preview, and analysis
- export final JSONL for training
Stack
- Python / Celery for cleaning, augmentation, and export tasks
- Redis for pause flags
- MySQL / SQLAlchemy async for corpus storage and task state
- MinIO for exported JSONL files
- OpenAI SDK for LLM augmentation calls
- Jinja2 for augmentation templates
- SimHash for part of the deduplication logic
Architecture
Dataset Upload
|
v
Relational Corpus Table
|
+--> Data Cleaning Task ----+
| |
+--> LLM Augmentation Task --+--> Upsert Writeback
| |
+--> Export JSONL Task ------+--> MinIO
Cross-cutting:
DB cursor state: current_file_id / current_offset / processed_rows
Redis pause flag: checked at batch boundaries
Celery queues: clean / augmentation / export
Core Modules
Relational Corpus Storage
Multi-turn samples are split into table rows with file id, conversation row, turn number, and a set of data_* fields. This makes pagination, query, analysis, soft deletion, and reconstruction possible.
This is why the pipeline did not only stream files. Files are fine for import and export. They are weak as interactive platform state.
Cursor State Machine
Task state stores current_file_id, current_offset, processed_rows, and status. Resume reads the cursor, skips completed files, and continues from the stored offset.
After the resume file is reached, resume_offset is reset to zero so later files do not inherit the same offset.
Pause Mechanism
Pause uses Redis flags. Tasks check the flag at batch boundaries, save the cursor, and mark the task as paused.
The point is not to kill the task. The point is to stop it at a recoverable position.
Idempotent Writeback
Processed records are written with INSERT ... ON DUPLICATE KEY UPDATE. This matters for retry and resume, but it depends on a primary key match.
One Response augmentation path missed the original record id, so upsert became insert. The task read and wrote the same table, causing the offset to chase a moving table tail. The fix was to include the original id in the write payload.
My Role
I worked on:
- the cleaning and augmentation task chain
- relational corpus reads and multi-turn reconstruction
- DB cursor state and resume logic
- Redis pause flag checks
- idempotent upsert writeback
- JSONL export
- the Response augmentation infinite-loop fix
External business-system state sync is intentionally kept high-level in this public write-up.
Engineering Notes
The hardest part was not one algorithm. It was the boundary of long-running jobs.
Cleaning and augmentation used different batch sizes: 10000 for cleaning, 10 for augmentation. Cleaning prioritized throughput. Augmentation prioritized pause responsiveness. These numbers were practical engineering choices, not benchmark-proven optima.
Idempotency was another boundary. Upsert makes resume easier, but a missing primary key changes update into insert. A stronger design would reject update payloads without an id at the DAO or schema layer.
LLM failure handling also needs care. The batch generation path isolated single-sample failures and used empty strings as placeholders while recording success/failure counts. The project material says no production incident came from this, but a failed-sample queue would be safer than only relying on logs.
Run Flow
The real code is not public. The processing flow is:
1. Upload dataset and parse records into relational storage
2. Submit Celery cleaning task
3. Process batches and persist cursor state
4. Submit LLM augmentation task if needed
5. Upsert augmented results back to storage
6. Export JSONL to MinIO
Public implementation constraints:
- cleaning
batch_size=10000 - augmentation
batch_size=10 - default LLM concurrency
max_concurrent=50 - Celery
max_retries=3 - exponential backoff capped at 600s
- 6 augmentation strategies
Links
- Website: https://www.aildnc.com
- GitHub: https://github.com/JV-X
If you are evaluating training-data processing, enterprise RAG, document parsing, or LLM augmentation work, contact me by email at contact@aildnc.com. For China-based inquiries, use the WeChat QR code below the article.