Environmental services / public utilities / enterprise AI platform / An anonymized A-share listed environmental services group building a training and inference platform

Training-data processing pipeline for a listed environmental services group

Training-data cleaning and LLM augmentation were long-running platform tasks. Cleaning usually took minutes, while augmentation could run for tens of hours or close to a hundred hours. The platform needed pause, resume, query, analysis, and JSONL export without damaging the original data.

  • Python
  • Celery
  • Redis
  • MySQL
  • SQLAlchemy async
  • MinIO
  • OpenAI SDK
  • Jinja2
  • SimHash
Solution

I normalized training samples into MySQL, used DB cursors to persist task progress, used Redis flags for graceful pause, used upsert for idempotent writes, and split batch sizes between cleaning and LLM augmentation.

Architecture

Dataset upload -> relational corpus table -> Celery cleaning task -> LLM augmentation task -> DB cursor state machine / Redis pause flag -> idempotent upsert writeback -> MinIO JSONL export.

Result

The pipeline supports pause and resume for long-running training-data jobs. Cleaning runs at minute scale, while augmentation may run for tens of hours depending on data size. A Response augmentation infinite-loop incident was diagnosed and fixed. No training-quality, cost, or business KPI improvement is claimed.

This training-data processing pipeline is a long-running system for cleaning and augmenting LLM training samples. It stores corpus state in relational tables, persists task progress with DB cursors, handles graceful pause through Redis flags, and uses idempotent writeback to survive retries.

The project served a training and inference platform for an anonymized A-share listed environmental services group. I do not disclose the client name, internal system names, or real dataset names. The public version of the case focuses on the engineering boundary.

Background

Most of the real corpus was related to environmental project data, but the feature had to work as a general data processing layer. Users uploaded datasets, ran cleaning and deduplication, used LLM augmentation, then exported JSONL for training.

Cleaning jobs usually finished in minutes. Augmentation jobs were different. Depending on the dataset size, they could run for tens of hours or close to a hundred hours. Once a job lasts that long, the product has to answer practical questions: what happens when a user clicks pause, a worker fails, or QA reruns the same task?

For a script, restarting from row one may be tolerable. For an enterprise platform, it is not. Users need progress, pause, resume, preview, query, and export.

The Real Problem

The first hard part was that data could not only pass through files. The platform needed row-level inspection, search, analysis, and preview. Multi-turn samples also had to be split by conversation row and turn.

The second hard part was state. Worker memory could not be the source of truth for a job that may run for hours. Progress had to survive restarts and failures.

The third hard part was idempotent writeback. Pause/resume and retry both mean the same batch may run more than once. If writeback is not safe, the pipeline can duplicate rows, overwrite the wrong rows, or export dirty samples.

The LLM path added another constraint. Cleaning is mostly CPU-bound. Augmentation waits on model calls, so the same batch strategy does not work for both.

My Role

My scope was the data processing pipeline core, not the whole training and inference platform.

I worked on the relational corpus model, the cleaning and augmentation task chain, the DB cursor state, the Redis pause mechanism, idempotent writeback, and JSONL export to MinIO.

I also handled a key failure in this module. In the Response augmentation stage, one write path missed the primary key id. Upsert fell back to insert. The task read from and wrote to the same table, so the offset never reached the moving end of the table. The immediate fix was adding the id back to the write payload.

Cross-system state sync involved an external business system. I keep its real name and internal architecture out of the public write-up.

Solution And Tradeoffs

Relational Corpus Storage

The pipeline did not process the dataset only as JSONL. Training samples were normalized into MySQL records with fields such as dataset_file_id, row_identifier, turn_taking, and data_* text columns. Multi-turn conversations were reconstructed by grouping on row_identifier and sorting by id.

This enabled pagination, query, analysis, soft delete, and resumable processing. The cost was moving intermediate-state pressure into the database.

DB Cursor State

Execution state was stored by (task_id, stage, task_type). The cursor included current_file_id, current_offset, processed_rows, and status.

On resume, the task skipped completed files and continued from the stored offset in the matching file. After that file was reached, resume_offset had to be reset to zero. Otherwise later files would incorrectly inherit the offset and skip their first rows.

Pause used Redis flags. Tasks checked the flag at batch boundaries, saved the cursor, and marked the job as paused. That kept user-initiated pause separate from failure.

Different Batch Sizes

Cleaning used batch_size=10000. Augmentation used batch_size=10.

Those were not benchmark-derived optimal numbers. They reflected different priorities. Cleaning is fast per row, so large batches reduce database round trips. LLM augmentation is slow per row, so small batches make pause responsive.

Cleaning optimized for throughput. Augmentation optimized for control.

Idempotent Upsert And Its Failure Mode

Writes used INSERT ... ON DUPLICATE KEY UPDATE. A resumed batch should update existing rows instead of creating duplicates.

The hidden contract was that update payloads must include the primary key.

In the Response augmentation failure, that contract was broken. Missing id turned upsert into insert. The job kept paginating forward while the table kept growing. It ran for four or five days and produced tens of millions of dirty rows before QA caught it. The dirty rows were deleted, and the write payload was fixed.

The deeper lesson: an idempotency contract should not live only in developer memory. The lower layer should reject unsafe update payloads.

Results And Boundaries

The pipeline supports long-running data cleaning and LLM augmentation jobs with pause and resume. Progress is persisted outside worker memory. Pause and failure are separate states. LLM augmentation uses a small batch size so pause can take effect sooner.

The public time scale is limited: full cleaning runs at minute scale; augmentation can run for tens of hours depending on the dataset. The Response augmentation infinite-loop bug was diagnosed and fixed.

I do not claim training-quality improvements, cost reduction, or business KPI changes. The material does not provide public numbers for those.

One remaining mechanism deserves caution: single LLM generation failures were isolated and represented as empty-string placeholders. The project material says this did not produce a known production incident, but a stricter failed-sample queue would be safer.

What I Took Away

The difference between a script and a platform is not the UI. It is whether the system can still explain itself after running for dozens of hours.

Where is the cursor? What was already processed? Can the user pause safely? Can retry happen without duplicating rows? Can invalid output be stopped before export?

Those questions look boring in a demo.

They become the system when a long LLM job is still running on day three.

If you are evaluating training-data processing, enterprise RAG, document parsing, or LLM augmentation work, contact me by email at contact@aildnc.com. For China-based inquiries, use the WeChat QR code below the article.

Contact

Discuss Similar Work

If you are evaluating a similar document AI, enterprise RAG, knowledge base, or AI workflow project, share the context first. Email works, and Telegram is available for a faster reply: contact@aildnc.com.

Telegram @NieErAI Message me on Telegram