Implementing Celery for Async Work Order Batching: Resolving Duplicate Ingestion on Broker Failover

When routing parsed maintenance tickets into a CMMS, the transition from synchronous validation to asynchronous execution introduces a critical failure point: broker connection loss during task acknowledgment. Facilities managers and maintenance engineers frequently report duplicate preventive maintenance routes following network interruptions or Redis restarts. This occurs when the Celery worker consumes a batch of work orders but fails to acknowledge completion before the broker drops the connection. Without strict acknowledgment controls, the broker redelivers the same payload, causing the ingestion pipeline to create duplicate records in the CMMS database. The following debugging scenario isolates the exact configuration flaw and provides a deterministic resolution for Work Order Ingestion & Parsing Pipelines architectures.

Log Trace Evidence

During a broker failover event, the Celery worker logs exhibit a predictable sequence:

[2024-05-12 08:14:22,103: WARNING/MainProcess] Connection to Redis lost: Retry (0/20) now.
[2024-05-12 08:14:22,105: INFO/MainProcess] Task cmms_pipeline.batch_work_orders[8a3f-4c1d] received
[2024-05-12 08:14:22,410: INFO/MainProcess] Task cmms_pipeline.batch_work_orders[8a3f-4c1d] succeeded in 0.305s
[2024-05-12 08:14:23,001: WARNING/MainProcess] Connection to Redis lost: Retry (1/20) now.
[2024-05-12 08:14:25,112: INFO/MainProcess] Task cmms_pipeline.batch_work_orders[8a3f-4c1d] received
[2024-05-12 08:14:25,415: ERROR/MainProcess] IntegrityError: duplicate key value violates unique constraint "work_orders_pkey"

The trace confirms that the task was marked succeeded locally, but the ACK message never reached the broker due to the connection drop. Upon reconnection, the broker redelivered the unacknowledged message, triggering a second execution.

Root Cause Analysis

Celery defaults to early acknowledgment (task_acks_late=False). When late acknowledgment is disabled, the broker marks the task as consumed immediately upon delivery to the worker. If the worker crashes or loses connectivity mid-execution, the message is considered processed and is not redelivered. Conversely, if the broker itself restarts or loses state before the worker sends the ACK, the message remains in the broker’s unacknowledged queue and is redelivered upon recovery.

In CMMS routing, this behavior breaks idempotency. The Async Batch Processing layer must guarantee exactly-once execution semantics for work order ingestion. Relying solely on broker-level deduplication is insufficient because the CMMS database lacks native message-id tracking. The pipeline requires explicit late acknowledgment combined with database-level idempotency checks.

Step-by-Step Resolution

1. Enforce Late Acknowledgment & Broker Visibility

Late acknowledgment defers the ACK signal until the task function returns successfully. This prevents the broker from dropping messages during transient network partitions. Pair this with an extended visibility timeout to prevent premature redelivery during long-running batch operations.

# celery_config.py
broker_transport_options = {
    "visibility_timeout": 3600,  # 1 hour; adjust based on max batch processing time
    "retry_on_timeout": True
}
task_acks_late = True
task_reject_on_worker_lost = True

2. Implement Payload-Level Idempotency Keys

Broker redelivery is inevitable in distributed systems. The ingestion layer must treat duplicate payloads as idempotent operations. Generate a deterministic hash from the source ticket metadata (e.g., source_system_id, parsed_timestamp, equipment_tag) and pass it as an explicit idempotency key.

import hashlib
from celery import Celery

app = Celery("cmms_pipeline")
app.config_from_object("celery_config")

def generate_idempotency_key(ticket: dict) -> str:
    raw = f"{ticket['source_id']}|{ticket['timestamp']}|{ticket['asset_tag']}"
    return hashlib.sha256(raw.encode()).hexdigest()[:16]

3. Database Conflict Handling (Minimal Reproducible Example)

Combine the idempotency key with an UPSERT pattern. PostgreSQL’s ON CONFLICT clause ensures that if the broker redelivers a task, the database silently ignores or updates the existing record rather than raising an IntegrityError.

from sqlalchemy import create_engine, text
from datetime import datetime

engine = create_engine("postgresql+psycopg2://cmms_user:pass@db-host:5432/cmms_prod")

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def batch_work_orders(self, tickets: list[dict]):
    insert_stmt = """
        INSERT INTO work_orders (idempotency_key, asset_tag, description, priority, created_at)
        VALUES (:key, :asset, :desc, :priority, :created)
        ON CONFLICT (idempotency_key) DO NOTHING
    """
    
    try:
        with engine.begin() as conn:
            conn.execute(
                text(insert_stmt),
                [
                    {
                        "key": generate_idempotency_key(t),
                        "asset": t["asset_tag"],
                        "desc": t["description"],
                        "priority": t.get("priority", "LOW"),
                        "created": datetime.utcnow()
                    }
                    for t in tickets
                ]
            )
    except Exception as exc:
        # Exponential backoff for transient DB locks, not for idempotency conflicts
        raise self.retry(exc=exc, countdown=2 ** self.request.retries)

4. Broker Connection Resilience

Configure Kombu’s retry behavior to handle Redis/AMQP restarts gracefully without flooding the CMMS API or database.

broker_connection_retry_on_startup = True
broker_connection_max_retries = 10
broker_pool_limit = 10
worker_prefetch_multiplier = 1  # Critical: prevents over-fetching during failover

Validation & Post-Fix Monitoring

  1. Simulate Broker Drop: Run redis-cli DEBUG SLEEP 5 or restart the Redis container while a batch task is executing.
  2. Verify ACK Timing: Confirm task_acks_late=True is active by checking Celery worker logs. The ACK should appear after the database transaction commits.
  3. Check CMMS State: Query SELECT COUNT(*), COUNT(DISTINCT idempotency_key) FROM work_orders;. Both counts must match.
  4. Monitor Retry Metrics: Track celery_task_retries_total in Prometheus. A stable ingestion pipeline shows zero IntegrityError spikes during broker failover windows.

Implementing late acknowledgment with deterministic idempotency keys eliminates duplicate work order creation at the ingestion boundary. This configuration aligns with CMMS routing requirements where asset history and maintenance compliance demand strict data integrity, even under transient infrastructure failures. For authoritative reference on Celery acknowledgment semantics, consult the Celery Configuration Reference. For PostgreSQL conflict resolution patterns, see the official INSERT … ON CONFLICT documentation.