Task Processing with Celery
The Financial Data Extractor uses Celery with Redis as a message broker to handle long-running asynchronous tasks for web scraping, PDF processing, and financial data extraction.
Overview
Celery tasks enable the system to process time-consuming operations asynchronously:
- Web Scraping: Discover PDFs from investor relations websites (seconds to minutes)
- PDF Download: Download and store PDF documents (seconds to minutes)
- Document Classification: Classify documents by type (seconds)
- Financial Extraction: Extract statements using LLM (2-5 minutes per PDF)
- Data Compilation: Normalize and compile multi-year statements (minutes)
Architecture
graph LR
A[FastAPI API] -->|Trigger Task| B[Redis Broker]
B -->|Queue Task| C[Celery Worker]
C -->|Execute| D[Task Logic]
D -->|Store Results| E[Redis Backend]
D -->|Update DB| F[PostgreSQL]
C -->|Status Updates| G[Flower Dashboard]
style A fill:#e1f5ff
style B fill:#fff3e0
style C fill:#f3e5f5
style D fill:#e8f5e9
style E fill:#fff3e0
style F fill:#e8f5e9
style G fill:#fce4ec
Task Queues
Tasks are organized into dedicated queues for better resource management:
| Queue | Purpose | Typical Duration | Concurrency |
|---|---|---|---|
scraping | Web scraping and URL discovery | Seconds to 1 minute | High (5-10) |
extraction | LLM-powered financial extraction | 2-5 minutes | Low (1-2) |
compilation | Normalization and compilation | 30s - 2 minutes | Medium (2-3) |
orchestration | End-to-end workflows | 10 minutes - 2 hours | Low (1) |
default | General tasks | Varies | Medium (2-3) |
Task Processing Workflow
The following diagram illustrates how tasks flow through the system, from user action through worker execution to final data storage:
flowchart LR
subgraph "User Action"
User[User/API Request]
end
subgraph "API Layer"
API[FastAPI Endpoint]
end
subgraph "Message Broker"
Queue[Redis Queue<br/>scraping/extraction/compilation]
end
subgraph "Worker Pool"
Worker1[Worker 1: Scraping]
Worker2[Worker 2: Extraction]
Worker3[Worker 3: Compilation]
end
subgraph "External Services"
IR[Investor Relations<br/>Website]
LLM[OpenRouter<br/>LLM API]
end
subgraph "Storage"
MinIO[MinIO<br/>Object Storage]
DB[(PostgreSQL<br/>Database)]
end
User -->|HTTP Request| API
API -->|Queue Task| Queue
Queue -->|Process| Worker1
Queue -->|Process| Worker2
Queue -->|Process| Worker3
Worker1 -->|Scrape| IR
Worker1 -->|Store PDFs| MinIO
Worker1 -->|Save Metadata| DB
Worker1 -.->|Trigger| Worker2
Worker2 -->|Read PDFs| MinIO
Worker2 -->|Extract Data| LLM
Worker2 -->|Store Extractions| DB
Worker2 -.->|Trigger| Worker3
Worker3 -->|Read Extractions| DB
Worker3 -->|Compile Statements| DB
Worker3 -->|Save Results| DB
style User fill:#e1f5ff,stroke:#01579b,stroke-width:2px
style API fill:#fff3e0,stroke:#e65100,stroke-width:2px
style Queue fill:#f3e5f5,stroke:#4a148c,stroke-width:2px
style Worker1 fill:#e8f5e9,stroke:#1b5e20,stroke-width:2px
style Worker2 fill:#e8f5e9,stroke:#1b5e20,stroke-width:2px
style Worker3 fill:#e8f5e9,stroke:#1b5e20,stroke-width:2px
style IR fill:#fce4ec,stroke:#880e4f,stroke-width:2px
style LLM fill:#fce4ec,stroke:#880e4f,stroke-width:2px
style MinIO fill:#fff9c4,stroke:#f57f17,stroke-width:2px
style DB fill:#fff9c4,stroke:#f57f17,stroke-width:2px
Key Workflow Points:
- User Action: User or API client triggers an extraction task
- Task Queuing: FastAPI endpoint queues the task to Redis
- Worker Processing: Specialized workers process tasks from their respective queues
- Task Chaining: Workers can trigger subsequent tasks (dashed arrows)
- Data Storage: Results are stored in MinIO (PDFs) and PostgreSQL (metadata, extractions, compiled statements)
- External Calls: Workers interact with external services (websites, LLM APIs)
Available Tasks
Company-Level Tasks
extract_company_financial_data
Complete end-to-end extraction workflow for a company.
Triggers the full pipeline:
- Scrapes investor relations website
- Discovers and classifies documents
- Downloads PDFs
- Extracts financial statements
- Normalizes and compiles statements
API Endpoint:
POST /api/v1/tasks/companies/{company_id}/extract
Example:
curl -X POST http://localhost:3030/api/v1/tasks/companies/1/extract
Response:
{
"task_id": "a00d8c65-c7fd-4360-8f4c-836b0df25f59",
"status": "PENDING",
"message": "Financial data extraction started for company 1"
}
Estimated Duration: 10 minutes - 2 hours (depends on number of documents)
scrape_investor_relations
Scrape investor relations website to discover PDF documents.
API Endpoint:
POST /api/v1/tasks/companies/{company_id}/scrape
Returns: List of discovered document URLs with metadata
Estimated Duration: 30 seconds - 5 minutes
recompile_company_statements
Recompile all financial statements after new extractions.
Useful when new documents are processed and statements need updating.
API Endpoint:
POST /api/v1/tasks/companies/{company_id}/recompile
Estimated Duration: 1-5 minutes
Document-Level Tasks
process_document
Process a document end-to-end: classify, download, and extract.
Convenience task that orchestrates:
- Document classification
- PDF download (if needed)
- Financial statement extraction (for annual reports)
API Endpoint:
POST /api/v1/tasks/documents/{document_id}/process
Estimated Duration: 2-10 minutes
download_pdf
Download PDF document from URL and store locally.
API Endpoint:
POST /api/v1/tasks/documents/{document_id}/download
Returns: File path and SHA256 hash
Estimated Duration: 5-30 seconds
classify_document
Classify document by type (annual_report, quarterly_report, etc.).
Uses filename patterns, URL patterns, and content analysis.
API Endpoint:
POST /api/v1/tasks/documents/{document_id}/classify
Estimated Duration: 1-5 seconds
extract_financial_statements
Extract financial statements from PDF using LLM.
Extracts Income Statement, Balance Sheet, and Cash Flow Statement.
API Endpoint:
POST /api/v1/tasks/documents/{document_id}/extract
Returns: Extracted statement data in JSON format
Estimated Duration: 2-5 minutes per document
Task Status
get_task_status
Check the current status and result of a Celery task.
API Endpoint:
GET /api/v1/tasks/{task_id}
Response:
{
"task_id": "a00d8c65-c7fd-4360-8f4c-836b0df25f59",
"status": "SUCCESS",
"result": {
"task_id": "...",
"company_id": 1,
"status": "success",
"discovered_count": 12,
"created_count": 12
},
"error": null
}
Status Values:
PENDING- Task is waiting to be processedSTARTED- Task has started executionPROGRESS- Task is in progress (checkmetafor details)SUCCESS- Task completed successfullyFAILURE- Task failed (checkerrorfield)RETRY- Task is being retriedREVOKED- Task was cancelled
Running Celery Workers
Starting Workers
# Start worker listening to all queues
make celery-worker
# Or manually with specific queues
celery -A app.tasks.celery_app worker \
--loglevel=info \
-Q scraping,extraction,compilation,orchestration,default
Worker Configuration
Workers are configured with:
- Prefetch multiplier: 1 (better load balancing)
- Max tasks per child: 1000 (prevents memory leaks)
- Late acknowledgment: Tasks acknowledged after completion
- Task time limits: Soft and hard limits per task type
- Auto-retry: Configured per task with exponential backoff
Worker Logs
Worker logs include:
- Task start/completion
- Progress updates (via
update_state()) - Errors and retry attempts
- Performance metrics
Log Format:
2025-01-15 10:30:45 [INFO] app.tasks.scraping_tasks: Starting scrape_investor_relations task
2025-01-15 10:30:45 [INFO] app.tasks.scraping_tasks: Scraping IR website: https://...
Flower Monitoring Dashboard
Flower provides a real-time web-based dashboard for monitoring Celery clusters.
Starting Flower
make celery-flower
Access: http://localhost:5555
Flower Features
Workers Tab
- View all active workers
- Monitor worker status (online/offline)
- Track worker metrics:
- Active tasks
- Processed/Failed/Succeeded counts
- System load average
- Worker uptime
Tasks Tab
- View all tasks across states:
- Active - Currently executing
- Scheduled - Waiting to start
- Reserved - Reserved by workers
- Succeeded - Completed successfully
- Failed - Failed tasks
- Revoked - Cancelled tasks
- Search tasks by name or ID
- Filter by worker, state, or time range
- View task details:
- Arguments and keyword arguments
- Result data
- Traceback (for failed tasks)
- Execution timeline
- Worker assignment
Task Actions
- Retry - Retry failed tasks
- Revoke - Cancel pending/running tasks
- Terminate - Force terminate running tasks
- View Traceback - See full error details
Broker Tab
- Monitor Redis broker status
- View queue depths
- Check message rates
Persistent Task History
Flower runs with --persistent=True to maintain task history in a local SQLite database. This allows you to:
- View completed tasks even after they’re removed from Redis
- Track task history over time
- Analyze task performance
Note: Flower database files (flower, flower-shm, flower-wal) are git-ignored.
Task Execution Workflow
Step-by-Step: Extracting Financial Data for a Company
-
Trigger Extraction
curl -X POST http://localhost:3030/api/v1/tasks/companies/1/extractReturns:
task_id -
Monitor in Flower
- Open http://localhost:5555
- Navigate to “Tasks” tab
- Search for your
task_id - Watch task progress through states:
PENDING→STARTED→PROGRESS→SUCCESS
-
Check Status via API
curl http://localhost:3030/api/v1/tasks/{task_id} -
View Results
- Check Flower dashboard for detailed execution timeline
- Use API status endpoint for JSON response
- Review worker logs for detailed execution logs
Task State Updates
Tasks report progress using update_state():
self.update_state(
state="PROGRESS",
meta={"step": "scraping_website", "progress": 30}
)
Progress States:
fetching_company_info- Retrieving company datascraping_website- Discovering PDFscreating_document_records- Creating database entriesdownloading_file- Downloading PDFclassifying- Classifying document typeprocessing_pdf- Extracting text/tablescalling_llm- Calling OpenAI APIstoring_extractions- Saving extraction resultsnormalizing_line_items- Normalizing financial datacompiling_statement- Compiling multi-year statement
Error Handling & Retries
Automatic Retries
Tasks are configured with automatic retries for transient failures:
- Max Retries: 2-3 attempts (configurable per task)
- Retry Delay: Exponential backoff (60-300 seconds)
- Retry Conditions:
- Network errors (ConnectionError, TimeoutException)
- HTTP errors (5xx server errors)
- Rate limiting (429 errors)
Retry Logic
@celery_app.task(
max_retries=3,
default_retry_delay=60,
autoretry_for=(httpx.HTTPError, httpx.TimeoutException),
)
Tasks automatically retry on specified exceptions without manual intervention.
Handling Persistent Failures
403 Forbidden (Website Blocking):
- Tasks handle 403 gracefully
- Return “partial” status with empty results
- Do not retry (retrying won’t help)
Other Failures:
- Tasks retry up to
max_retriestimes - After final failure, task marked as
FAILURE - Full traceback available in Flower and logs
Testing Tasks
Using the Test Script
A convenient bash script is provided for testing tasks:
# Show help
./scripts/test_tasks.sh help
# Trigger scraping task
./scripts/test_tasks.sh company-scrape 1
# Trigger with auto-polling
POLL=true ./scripts/test_tasks.sh company-scrape 1
# Check task status
./scripts/test_tasks.sh status {task_id}
# Poll until completion
./scripts/test_tasks.sh poll {task_id} 600
Using Makefile
# Trigger a task
make test-task COMMAND="company-scrape 1"
# Trigger with auto-polling
make test-task-poll COMMAND="company-scrape 1"
See Testing Tasks section for full details.
Task Configuration
Task Limits
| Task Type | Soft Limit | Hard Limit | Max Retries |
|---|---|---|---|
| Scraping | 5 minutes | 10 minutes | 3 |
| Extraction | 27.5 minutes | 30 minutes | 3 |
| Compilation | 27.5 minutes | 30 minutes | 2 |
| Orchestration | 1h 55min | 2 hours | 2 |
Result Expiration
- Task results expire after 1 hour in Redis
- Use Flower’s persistent mode to view older tasks
- Results can be retrieved via API within expiration window
Task Serialization
- All tasks use JSON serialization
- Results stored in Redis backend
- Supports complex nested data structures
Best Practices
Monitoring
- Start Flower before triggering tasks
- Watch worker logs for real-time execution details
- Monitor queue depths in Flower Broker tab
- Set up alerts for task failure rates > 5%
Performance
- Use appropriate queues - Route tasks to specialized workers
- Monitor worker load - Add workers if queues backlog
- Optimize retry delays - Balance retry speed vs. resource usage
- Cache LLM responses - Avoid reprocessing same documents
Error Handling
- Check task status after triggering
- Review Flower tracebacks for failed tasks
- Implement circuit breakers for external services
- Log all critical operations with structured context
Development
- Test tasks individually before running full workflows
- Use test script for consistent testing
- Monitor Flower during development
- Review worker logs for debugging
Troubleshooting
Tasks Not Appearing in Flower
Solution:
- Ensure worker is listening to correct queues
- Restart Flower with
--persistent=True - Check Redis connection
Tasks Stuck in PENDING
Possible Causes:
- No worker listening to task’s queue
- Worker crashed or offline
- Redis connection issues
Solution:
- Verify worker is running:
make celery-worker - Check worker logs for errors
- Verify Redis is accessible
High Task Failure Rate
Check:
- Worker logs for error patterns
- External service status (OpenAI API, websites)
- Database connection pool health
- Resource constraints (memory, CPU)
Slow Task Execution
Optimize:
- Increase worker concurrency
- Add more workers
- Optimize database queries
- Cache expensive operations