ยท 11 min read ยท Wingston Sharon

Building a Celery Pipeline for Large-Scale Web Crawling with Crawl4AI

---

Building a Celery Pipeline for Large-Scale Web Crawling with Crawl4AI

By Wingston Sharon | February 2025


The core product loop at Agentosaurus is: find an organization, crawl their website, extract structured data about what they do and how they align to environmental goals, score them, and store the result. We do this for thousands of organizations. Getting this pipeline right took several iterations.

This post describes the current architecture โ€” what we actually run in production, including the parts that were annoying to figure out.

Crawl4AI Basics

Crawl4AI is an async Python crawler built on Playwright. The key advantage over simple requests-based scraping is that it handles JavaScript-rendered content and has built-in LLM extraction โ€” you can give it a Pydantic schema and it'll use an LLM to extract structured data from the page.

A minimal example:

import asyncio
from crawl4ai import AsyncWebCrawler
from crawl4ai.extraction_strategy import LLMExtractionStrategy
from pydantic import BaseModel

class OrganizationData(BaseModel):
    name: str
    mission: str
    founded_year: int | None
    headquarters: str | None
    sustainability_claims: list[str]

async def crawl_org(url: str) -> OrganizationData | None:
    strategy = LLMExtractionStrategy(
        provider="ollama/llama3.1:8b",
        api_base="http://mac-mini-amsterdam.tail1234.ts.net:11434",
        schema=OrganizationData.model_json_schema(),
        extraction_type="schema",
        instruction="Extract structured information about this organization.",
    )

    async with AsyncWebCrawler(headless=True) as crawler:
        result = await crawler.arun(
            url=url,
            extraction_strategy=strategy,
            word_count_threshold=50,
        )

    if result.success and result.extracted_content:
        return OrganizationData.model_validate_json(result.extracted_content)
    return None

This is clean. The problem comes when you try to run it inside Celery.

The Async-in-Sync Problem

Celery workers run synchronous Python by default. AsyncWebCrawler uses asyncio internally. You cannot await inside a standard Celery task.

The naive fix โ€” asyncio.run() โ€” works, but there are gotchas:

from celery import shared_task
import asyncio

@shared_task
def crawl_organization_task(org_id: int, url: str):
    """Crawl an organization's website and store extracted data."""
    result = asyncio.run(_crawl_async(url))
    # ... store result
    return result

async def _crawl_async(url: str):
    async with AsyncWebCrawler(headless=True) as crawler:
        return await crawler.arun(url=url, ...)

Gotcha 1: Celery's default concurrency model is prefork (multiple processes). Each process can safely call asyncio.run() because they each have their own event loop. This is fine.

Gotcha 2: If you switch to Celery's gevent or eventlet pool, you get a conflict between gevent's monkey-patched async and asyncio. Stick with prefork for Crawl4AI tasks.

Gotcha 3: Playwright (which Crawl4AI uses) installs browser binaries. In our Docker containers, we pre-install them:

# In our Dockerfile
RUN pip install crawl4ai playwright && \
    playwright install chromium && \
    playwright install-deps chromium

If Playwright isn't installed at task execution time, Crawl4AI will try to install it, which fails in restricted environments.

Task Architecture

We split crawling into four separate Celery tasks, chained together:

crawl_url โ†’ extract_organization โ†’ score_esg โ†’ store_result

This separation matters for retries: if the LLM extraction fails, we don't re-crawl the URL. If scoring fails, we don't re-extract.

# agentosaurus/tasks.py
from celery import shared_task, chain
from celery.utils.log import get_task_logger
import asyncio
import json

logger = get_task_logger(__name__)


@shared_task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,
    queue='crawl',
    time_limit=120,
    soft_time_limit=90,
)
def crawl_url(self, org_id: int, url: str) -> dict:
    """
    Task 1: Fetch raw HTML/markdown from URL.
    Returns raw content dict, not yet structured.
    """
    try:
        raw_content = asyncio.run(_fetch_url(url))
        return {
            "org_id": org_id,
            "url": url,
            "markdown": raw_content.markdown,
            "html": raw_content.html,
            "success": raw_content.success,
        }
    except Exception as exc:
        logger.warning(f"Crawl failed for {url}: {exc}")
        raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))


@shared_task(
    bind=True,
    max_retries=2,
    default_retry_delay=120,
    queue='inference',
    time_limit=180,
)
def extract_organization(self, crawl_result: dict) -> dict:
    """
    Task 2: Use LLM to extract structured org data from raw content.
    """
    if not crawl_result.get("success"):
        logger.info(f"Skipping extraction โ€” crawl failed for {crawl_result['url']}")
        return {"org_id": crawl_result["org_id"], "extracted": None, "skipped": True}

    try:
        extracted = asyncio.run(
            _extract_with_llm(crawl_result["markdown"], crawl_result["url"])
        )
        return {
            "org_id": crawl_result["org_id"],
            "extracted": extracted,
            "skipped": False,
        }
    except Exception as exc:
        raise self.retry(exc=exc)


@shared_task(queue='inference', time_limit=120)
def score_esg(extraction_result: dict) -> dict:
    """Task 3: Score extracted org data against ESG criteria."""
    if extraction_result.get("skipped") or not extraction_result.get("extracted"):
        return {**extraction_result, "scores": None}

    scores = _run_esg_scoring(extraction_result["extracted"])
    return {**extraction_result, "scores": scores}


@shared_task(queue='default')
def store_result(scored_result: dict) -> int:
    """Task 4: Persist everything to the database."""
    from agentosaurus.models import Organization, CrawlResult

    org = Organization.objects.get(id=scored_result["org_id"])
    CrawlResult.objects.update_or_create(
        organization=org,
        defaults={
            "extracted_data": scored_result.get("extracted"),
            "esg_scores": scored_result.get("scores"),
            "crawl_success": not scored_result.get("skipped", False),
        }
    )
    return org.id

Kicking off the chain:

def process_organization(org_id: int, url: str):
    task_chain = chain(
        crawl_url.s(org_id, url),
        extract_organization.s(),
        score_esg.s(),
        store_result.s(),
    )
    task_chain.apply_async()

Rate Limiting with Task Routing

We crawl third-party websites. Rate limiting is both polite and practical โ€” aggressive crawling gets you IP-banned. We use Redis-based rate limiting via Celery's task routing:

# celery_config.py (or settings.py CELERY_* settings)

CELERY_TASK_ROUTES = {
    'agentosaurus.tasks.crawl_url': {'queue': 'crawl'},
    'agentosaurus.tasks.extract_organization': {'queue': 'inference'},
    'agentosaurus.tasks.score_esg': {'queue': 'inference'},
    'agentosaurus.tasks.store_result': {'queue': 'default'},
}

# Rate limits per queue
CELERY_TASK_ANNOTATIONS = {
    'agentosaurus.tasks.crawl_url': {'rate_limit': '30/m'},
    'agentosaurus.tasks.extract_organization': {'rate_limit': '20/m'},
}

Start workers for each queue with appropriate concurrency:

# Crawl workers โ€” multiple concurrent browsers
celery -A mysite worker -Q crawl --concurrency=4 --loglevel=info &

# Inference workers โ€” serialized because Ollama is single-threaded
celery -A mysite worker -Q inference --concurrency=1 --loglevel=info &

# Default workers
celery -A mysite worker -Q default --concurrency=4 --loglevel=info &

Retry Logic and Dead Letters

Celery's max_retries handles transient failures. For permanent failures (site gone, persistent extraction failure), we log to a dead letter table:

from celery.signals import task_failure
from agentosaurus.models import CrawlFailure

@task_failure.connect
def handle_task_failure(sender, task_id, exception, args, kwargs, traceback, einfo, **kw):
    if sender.name == 'agentosaurus.tasks.crawl_url':
        org_id = args[0] if args else kwargs.get('org_id')
        url = args[1] if len(args) > 1 else kwargs.get('url')
        CrawlFailure.objects.create(
            org_id=org_id,
            url=url,
            error_type=type(exception).__name__,
            error_message=str(exception),
        )

We have a Django management command that replays dead letters after fixing an underlying issue:

python manage.py retry_crawl_failures --since=2025-02-01 --limit=100

Honest Failure Modes

JavaScript-heavy single-page apps. Some modern organizational websites render entirely in React/Vue with no server-side HTML. Crawl4AI uses Playwright, so it can render JS, but sites that lazy-load content below the fold, paginate their content, or require user interaction before showing key content are often still problematic. We handle maybe 85% of sites cleanly. The other 15% get partial data or fail.

Sites that ban crawlers. Even with polite rate limits and a descriptive User-Agent, some sites detect and block crawler traffic. We respect robots.txt (Crawl4AI handles this), and when we get 429 or 403 responses, we mark the crawl as failed rather than retrying aggressively.

LLM extraction hallucinations. This is the real problem. When a page has thin content โ€” a one-paragraph "About" section โ€” the LLM will sometimes invent plausible-sounding mission statements or claims that aren't in the source text. We mitigate this by including "source_text_required": true in our extraction prompt instructions and by post-validating: checking whether key extracted strings actually appear in the raw markdown. It's not a perfect solution.

Our extraction confidence threshold: if the extracted text has fewer than 100 words of source material to work with, we flag the result as low_confidence and don't display it publicly until a human reviews it.

The pipeline isn't perfect. But it handles the volume we need, fails gracefully, and the dead letter tracking means nothing silently disappears.


Questions about the crawling pipeline or Celery setup? Reach out at hello@agentosaurus.com.

Share: X (Twitter) LinkedIn

Build This Infrastructure?

We help AI teams build sovereign GPU clouds and autonomous systems. Free 30-minute consultation. Fixed-price projects from โ‚ฌ5K.

Schedule Free Consultation