From 9c6015bb39561f11f885b8a553d68e05ab762e69 Mon Sep 17 00:00:00 2001 From: michalsn Date: Wed, 20 May 2026 17:35:33 +0200 Subject: [PATCH] fix: SQLite3 queue pop atomicity --- docs/configuration.md | 2 + docs/running-queues.md | 2 + src/Models/QueueJobModel.php | 77 ++++++++++++++++++++++++++++++ tests/Models/QueueJobModelTest.php | 55 +++++++++++++++++++++ 4 files changed, 136 insertions(+) diff --git a/docs/configuration.md b/docs/configuration.md index 03d0a3d..3c3ede6 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -43,6 +43,8 @@ The configuration settings for `database` handler. The [Strict Mode](https://codeigniter.com/user_guide/database/transactions.html#strict-mode) for the given `dbGroup` is automatically disabled - due to the nature of the queue worker. + When using **SQLite3** with multiple worker processes, configure `busyTimeout` in the selected connection group. This lets a worker wait briefly when another process holds SQLite's write lock instead of failing immediately with `SQLITE_BUSY`. + ### $redis The configuration settings for `redis` handler. You need to have a [ext-redis](https://github.com/phpredis/phpredis) installed to use it. diff --git a/docs/running-queues.md b/docs/running-queues.md index 5bae1c0..3ab20bd 100644 --- a/docs/running-queues.md +++ b/docs/running-queues.md @@ -117,6 +117,8 @@ When using the `chain()` method, there are a few important differences compared As mentioned above, sometimes we may want to have multiple instances of the same command running at the same time. The queue is safe to use in that scenario with all databases as long as you keep the `skipLocked` to `true` in the config file. Only for SQLite3 driver, this setting is not relevant as it provides atomicity without the need for explicit concurrency control. +When using **SQLite3** with multiple worker processes, configure `busyTimeout` in the selected connection group. This lets a worker wait briefly when another process holds SQLite's write lock instead of failing immediately with `SQLITE_BUSY`. + The PHPRedis and Predis drivers are also safe to use with multiple instances of the same command. ### Handling long-running process diff --git a/src/Models/QueueJobModel.php b/src/Models/QueueJobModel.php index a61e5ee..4e49263 100644 --- a/src/Models/QueueJobModel.php +++ b/src/Models/QueueJobModel.php @@ -23,6 +23,7 @@ use CodeIgniter\Validation\ValidationInterface; use Config\Database; use ReflectionException; +use Throwable; class QueueJobModel extends Model { @@ -72,6 +73,80 @@ public function getFromQueue(string $name, array $priority): ?QueueJob // Make sure we still have the connection $this->db->reconnect(); } + + return match ($this->db->DBDriver) { + 'SQLite3' => $this->popAtomic($name, $priority), + default => $this->popWithLock($name, $priority), + }; + } + + /** + * Claim the oldest pending job while holding SQLite's write lock. + * + * SQLite has no FOR UPDATE SKIP LOCKED. BEGIN IMMEDIATE takes the write + * lock before the SELECT, so the candidate cannot be claimed by another + * worker between selecting it and marking it RESERVED. + * + * @throws ReflectionException + */ + private function popAtomic(string $name, array $priority): ?QueueJob + { + if ($this->db->simpleQuery('BEGIN IMMEDIATE') === false) { + return null; + } + + try { + $builder = $this->builder() + ->where('queue', $name) + ->where('status', Status::PENDING->value) + ->where('available_at <=', Time::now()->timestamp) + ->limit(1); + + $builder = $this->setPriority($builder, $priority); + $sql = $builder->getCompiledSelect(); + + $query = $this->db->query($sql); + + if ($query === false) { + $this->db->simpleQuery('ROLLBACK'); + + return null; + } + + /** @var QueueJob|null $row */ + $row = $query->getCustomRowObject(0, QueueJob::class); + + if ($row === null) { + $this->db->simpleQuery('COMMIT'); + + return null; + } + + $this->builder() + ->where('id', $row->id) + ->where('status', Status::PENDING->value) + ->update(['status' => Status::RESERVED->value]); + + $claimed = $this->db->affectedRows() === 1; + + $this->db->simpleQuery('COMMIT'); + + return $claimed ? $row : null; + } catch (Throwable $e) { + $this->db->simpleQuery('ROLLBACK'); + + throw $e; + } + } + + /** + * Claim the oldest pending job inside a transaction, using + * FOR UPDATE SKIP LOCKED where supported. + * + * @throws ReflectionException + */ + private function popWithLock(string $name, array $priority): ?QueueJob + { // Start transaction $this->db->transStart(); @@ -87,6 +162,8 @@ public function getFromQueue(string $name, array $priority): ?QueueJob $query = $this->db->query($this->skipLocked($sql)); if ($query === false) { + $this->db->transComplete(); + return null; } /** @var QueueJob|null $row */ diff --git a/tests/Models/QueueJobModelTest.php b/tests/Models/QueueJobModelTest.php index 4d61f7e..21fdda8 100644 --- a/tests/Models/QueueJobModelTest.php +++ b/tests/Models/QueueJobModelTest.php @@ -13,8 +13,11 @@ namespace Tests\Models; +use CodeIgniter\Queue\Entities\QueueJob; +use CodeIgniter\Queue\Enums\Status; use CodeIgniter\Queue\Models\QueueJobModel; use CodeIgniter\Test\ReflectionHelper; +use Tests\Support\Database\Seeds\TestDatabaseQueueSeeder; use Tests\Support\TestCase; /** @@ -24,6 +27,8 @@ final class QueueJobModelTest extends TestCase { use ReflectionHelper; + protected $seed = TestDatabaseQueueSeeder::class; + public function testQueueJobModel(): void { $model = model(QueueJobModel::class); @@ -59,4 +64,54 @@ public function testSkipLockedFalse(): void $this->assertSame($sql, $result); } + + public function testGetFromQueueReservesPendingJob(): void + { + $model = model(QueueJobModel::class); + + $job = $model->getFromQueue('queue1', ['default']); + + $this->assertInstanceOf(QueueJob::class, $job); + $this->assertSame('queue1', $job->queue); + $this->seeInDatabase('queue_jobs', [ + 'id' => $job->id, + 'status' => Status::RESERVED->value, + ]); + } + + public function testGetFromQueueReturnsNullWhenNothingPending(): void + { + $model = model(QueueJobModel::class); + + $this->assertNull($model->getFromQueue('queue123', ['default'])); + } + + public function testGetFromQueueOnSQLite3ClaimsEachPendingJobOnce(): void + { + $model = model(QueueJobModel::class); + + if ($model->db->DBDriver !== 'SQLite3') { + $this->markTestSkipped('BEGIN IMMEDIATE claim is SQLite3-only.'); + } + + $model->insert(new QueueJob([ + 'queue' => 'queue1', + 'payload' => ['job' => 'extra', 'data' => []], + 'priority' => 'default', + 'status' => Status::PENDING->value, + 'attempts' => 0, + 'available_at' => 1_697_269_865, + ])); + + $job1 = $model->getFromQueue('queue1', ['default']); + $job2 = $model->getFromQueue('queue1', ['default']); + + $this->assertInstanceOf(QueueJob::class, $job1); + $this->assertInstanceOf(QueueJob::class, $job2); + $this->assertSame(2, $job1->id); + $this->assertSame(3, $job2->id); + $this->assertNull($model->getFromQueue('queue1', ['default'])); + $this->seeInDatabase('queue_jobs', ['id' => 2, 'status' => Status::RESERVED->value]); + $this->seeInDatabase('queue_jobs', ['id' => 3, 'status' => Status::RESERVED->value]); + } }