Queue System

The Fuwafuwa Framework includes a robust background job processing system that allows you to handle time-consuming operations asynchronously. This keeps your web application responsive while processing tasks like sending emails, generating reports, or processing images in the background.

💡 Key Benefits
  • Non-blocking web requests - users don't wait for long operations
  • Automatic retry with exponential backoff for failed jobs
  • Shared hosting friendly with auto-scaling workers
  • Web-based admin interface for monitoring and management

System Components

1. QueueJob Model

The QueueJob model stores job information with these key fields:

Field Type Description
idintegerUnique identifier
job_namestringHuman-readable job name
job_classstringFully qualified handler class
job_datatextSerialized job data
priorityintegerJob priority (1=low, 5=normal, 10=high)
statusstringpending/processing/completed/failed
attemptsintegerNumber of attempts made
max_attemptsintegerMaximum retry attempts
delayintegerDelay in seconds before processing
available_atdatetimeWhen job becomes available
completed_atdatetimeCompletion timestamp
failed_atdatetimeFailure timestamp
last_errortextError message from last failure

2. QueueManager Service

Primary interface for queue operations:

use Fuwafuwa\Service\QueueManager;

$queueManager = c(QueueManager::class);

// Add a job
$jobId = $queueManager->push('Send Welcome Email', SendEmailJob::class, [
    'user_id' => 123,
    'template' => 'welcome'
]);

// Schedule a job for later
$queueManager->schedule('Nightly Report', ReportJob::class, '+1 day', $data);

// Get statistics
$stats = $queueManager->getStats();

Available Methods

MethodDescription
push()Add job with normal priority
pushHigh()Add high-priority job
pushLow()Add low-priority job
schedule()Schedule job for specific time
getPendingJobs()Retrieve pending jobs
markCompleted()Mark job as completed
markFailed()Mark job as failed with error
retry()Retry a failed job
cancel()Cancel pending/processing job
getStats()Get queue statistics
cleanup()Remove old completed/failed jobs

Creating Job Handlers

Job handlers are classes that implement the logic for processing jobs:

<?php

namespace App\Jobs;

use Fuwafuwa\Queue\JobHandler;
use Fuwafuwa\Model\QueueJob;

class SendWelcomeEmail extends JobHandler
{
    public function handle(array $data, QueueJob $job): void
    {
        $userId = $data['user_id'];
        $template = $data['template'];
        
        // Your processing logic here
        $user = m(\Model\User::class)->retrieve($userId);
        $this->sendEmail($user->email, $template);
        
        // Log success
        error_log("Welcome email sent to user {$userId}");
    }
    
    private function sendEmail($email, $template): void
    {
        // Email sending implementation
    }
}

Queue Workers

Standard Worker

For dedicated servers or environments where you can run long-running processes:

# Run a single worker
php index.php /queue/cli/worker

# Run multiple workers
php index.php /queue/cli/worker --workers=4

Auto-Scaling Worker

Designed for shared hosting with execution time limits:

# Run auto-scaling worker (60s lifetime)
php index.php /queue/cli/worker-auto

# Custom options
php index.php /queue/cli/worker-auto --lifetime=60 --idle-timeout=1800
⚠️ Shared Hosting Note

Auto-scaling workers automatically spawn replacements before the lifetime expires, making them ideal for shared hosting environments with execution time limits (e.g., 60 seconds).

Worker Monitor

Ensures minimum worker coverage and replaces dead workers:

# Run the monitor
php index.php /queue/cli/worker-monitor

# Custom options
php index.php /queue/cli/worker-monitor --min-workers=2 --interval=30

For shared hosting, add to cron:

* * * * * php /path/to/fuwafuwa/index.php /queue/cli/worker-monitor > /dev/null 2>&1

Production Setup

Using systemd (Recommended for Linux)

# /etc/systemd/system/fuwafuwa-queue.service
[Unit]
Description=Fuwafuwa Queue Worker
After=network.target

[Service]
User=www-data
Group=www-data
WorkingDirectory=/path/to/fuwafuwa
ExecStart=/usr/bin/php index.php /queue/cli/worker
Restart=always
RestartSec=5
StandardOutput=append:/var/log/fuwafuwa-queue.log
StandardError=append:/var/log/fuwafuwa-queue.log

[Install]
WantedBy=multi-user.target
# Enable and start
sudo systemctl daemon-reload
sudo systemctl enable fuwafuwa-queue
sudo systemctl start fuwafuwa-queue

Using Supervisor

# /etc/supervisor/conf.d/fuwafuwa-queue.conf
[program:fuwafuwa-queue]
command=/usr/bin/php /path/to/fuwafuwa/index.php /queue/cli/worker
directory=/path/to/fuwafuwa
autostart=true
autorestart=true
user=www-data
numprocs=2
process_name=%(program_name)s_%(process_num)02d
redirect_stderr=true
stdout_logfile=/var/log/fuwafuwa-queue.log

Configuration

Add to app/configs/config.ini:

[queue]
worker_sleep = 5
worker_timeout = 300
worker_max_attempts = 3
completed_job_ttl = 86400
failed_job_ttl = 604800

; Auto-scaling settings
autoscaling_worker_lifetime = 60
autoscaling_worker_idle_timeout = 1800
autoscaling_worker_heartbeat_interval = 10

; Monitor settings
min_workers = 1
worker_monitor_interval = 30
worker_cleanup_heartbeat_timeout = 60

CLI Commands

CommandDescription
workerRun persistent worker daemon
worker-autoRun auto-scaling worker
worker-monitorRun worker monitor
statsShow queue statistics
listList all jobs
retry --id=NRetry specific job
cancel --id=NCancel specific job
delete --id=NDelete specific job
cleanupClean up old jobs

Admin Interface

Access the web-based queue management at /admin/queue (requires admin authentication).

Features
  • View real-time queue statistics
  • Browse and filter jobs by status
  • Retry failed jobs
  • Cancel pending jobs
  • Delete old jobs
  • View job details and error messages

REST API Reference

The queue system provides a RESTful API for programmatic access. All endpoints require Bearer token authentication.

Authentication

API requests require an authentication token that is automatically generated when users log in. The token is passed via the Authorization header:

GET /api/queue/stats
Authorization: Bearer abc12345-def6-7890-ghij-klmnopqrstuv

How Authentication Works

The authentication flow ensures secure API access through several steps:

  1. User Login - When a user logs in, the system retrieves their unique token from the database
  2. Session Storage - The token is stored in the user's session ($f3['SESSION.token'])
  3. JavaScript Access - The template passes the token to the browser via window.sessionToken
  4. API Request - The fetchApi() utility automatically adds the Authorization: Bearer header
  5. Token Validation - The API controller validates the token against the user/member database

Available Endpoints

Endpoint Method Description
/api/queue/stats GET Get queue statistics
/api/queue/jobs GET List jobs with pagination and filtering
/api/queue/job?id={id} GET Get details of a specific job
/api/queue/retry POST Retry a failed or completed job (id in body)
/api/queue/cancel POST Cancel a pending or processing job (id in body)
/api/queue/delete DELETE Delete a job from the queue (id in body)
/api/queue/cleanup POST Clean up old completed and failed jobs
/api/queue/workers GET List all workers
/api/queue/monitor/start POST Launch worker monitor
/api/queue/monitor/stop POST Stop worker monitor
/api/queue/monitor/running GET Check if worker monitor is running

Query Parameters

For listing jobs, the following query parameters are available:

ParameterTypeDescription
pageintegerPage number (default: 1)
limitintegerItems per page (default: 20)
statusstringFilter by status (pending/processing/completed/failed)

Response Format

All API responses follow a consistent format:

Success Response

{
    "success": true,
    "message": "Queue statistics retrieved",
    "data": {
        "pending": 5,
        "processing": 2,
        "completed": 100,
        "failed": 1,
        "total": 108
    }
}

Error Response

{
    "success": false,
    "error": "Job not found"
}

HTTP Status Codes

CodeDescription
200Request successful
400Bad request (invalid parameters)
401Unauthorized (missing or invalid token)
403Forbidden (insufficient permissions)
404Resource not found
429Too many requests (rate limited)
500Internal server error

Frontend Integration

Fuwafuwa Framework provides a JavaScript utility for making authenticated API calls: fetchApi(). This utility handles token management, headers, and response parsing automatically.

The fetchApi() Utility

The fetchApi() function is available globally and simplifies API calls:

// GET request - fetch data
fetchApi('/api/queue/stats')
    .then(stats => {
        console.log('Pending jobs:', stats.pending);
        console.log('Completed jobs:', stats.completed);
    });

// POST request - retry a job
fetchApi('/api/queue/retry', { id: 123, delay: 0 }, 'POST')
    .then(result => {
        console.log('Job queued for retry:', result);
    });

// DELETE request - remove a job
fetchApi('/api/queue/delete', { id: 123 }, 'DELETE')
    .then(result => {
        console.log('Job deleted');
    });

Function Signature

fetchApi(url, data, method, token)
ParameterTypeDefaultDescription
urlstringrequiredAPI endpoint path
dataobjectnullRequest body data (or query params for GET)
methodstring'GET'HTTP method (GET, POST, PUT, DELETE)
tokenstringwindow.sessionTokenBearer token (auto-detected if null)

Automatic Features

The fetchApi() utility handles several concerns automatically:

  • Token Detection - Retrieves window.sessionToken if not explicitly provided
  • Authorization Header - Adds Authorization: Bearer <token> header
  • JSON Handling - Automatically serializes request body and parses response
  • Error Handling - Returns standardized error responses for 401, 403, 404, and 500 status codes

Prerequisites

For fetchApi() to work, ensure the session token is exposed to the browser. This is typically done in your core JavaScript template:

<script>
    const LOCALE = "{{@APP.lang_loc}}";
    const LANG = "{{@APP.lang}}";
    <check if="{{isset(@SESSION.token)}}">
    window.sessionToken = "{{ @SESSION.token }}";
    </check>
    dayjs.locale(LANG);
</script>

Example: Queue Admin Integration

Here's how the queue admin interface uses fetchApi() to manage jobs:

// Load queue statistics
function loadStats() {
    fetchApi('/api/queue/stats')
        .then(stats => {
            updateStatsDisplay(stats);
        });
}

// Load jobs with optional status filter
function loadJobs(status = null) {
    const url = status ? `/api/queue/jobs?status=${status}` : '/api/queue/jobs';
    fetchApi(url)
        .then(data => {
            renderJobsTable(data.jobs);
        });
}

// Retry a failed job
function retryJob(jobId) {
    fetchApi('/api/queue/retry', { id: jobId, delay: 0 }, 'POST')
        .then(result => {
            showSuccessToast('Job queued for retry');
            loadJobs(); // Refresh the list
        });
}

// Cancel a pending job
function cancelJob(jobId) {
    fetchApi('/api/queue/cancel', { id: jobId }, 'POST')
        .then(result => {
            showInfoToast('Job cancelled');
            loadJobs(); // Refresh the list
        });
}

// Delete a job
function deleteJob(jobId) {
    if (confirm('Are you sure you want to delete this job?')) {
        fetchApi('/api/queue/delete', { id: jobId }, 'DELETE')
            .then(result => {
                showSuccessToast('Job deleted');
                loadJobs(); // Refresh the list
            });
    }
}

// Clean up old jobs
function cleanupJobs() {
    fetchApi('/api/queue/cleanup', {}, 'POST')
        .then(result => {
            showSuccessToast(`Cleaned up ${result.cleaned_count} old jobs`);
            loadStats();
        });
}

Security Features

The queue API includes multiple layers of security to protect your application:

Authentication

  • Bearer Token Authentication - Each request must include a valid user token
  • Token Validation - Tokens are validated against the database on every request
  • Session Binding - Tokens are bound to user sessions and expire on logout

Authorization

  • RBAC Integration - Role-Based Access Control checks permissions before allowing actions
  • Permission Scopes - Different actions require different permissions

Rate Limiting

  • Request Throttling - Prevents API abuse through rate limiting
  • Configurable Limits - Adjust limits based on user roles or endpoints

Input Validation

  • Parameter Sanitization - All inputs are sanitized before processing
  • Type Validation - Parameter types are validated (integers, strings, etc.)
  • SQL Injection Protection - Uses prepared statements for all database operations

CSRF Protection

  • Session-Bound Tokens - API tokens are tied to user sessions
  • Automatic Expiration - Tokens become invalid when sessions end

Error Handling

  • Generic Error Messages - Sensitive information is not exposed in error responses
  • Logging - Security events are logged for audit trails
⚠️ Security Best Practices
  • Always use HTTPS in production to protect tokens in transit
  • Implement proper token rotation for long-lived sessions
  • Monitor API access logs for suspicious activity
  • Keep worker processes isolated with minimal database permissions
  • Sanitize all user input before passing to job handlers