Laravel jobをqueue(キュー)につっこんで非同期処理をする

Laravel

大量のデータをDBにインポートしたい際に、jobを利用することで非同期処理でデータを投入することができます。

1.job(ジョブ)とqueue(キュー)とworker(ワーカー)について

  • job(ジョブ)・・・1つ1つの処理
  • queue(キュー)・・・待ち行列
  • worker(ワーカー)・・・処理をする人

job-queue-worker

job(ジョブ)をqueue(キュー)に送ることをdispatch(ディスパッチ)といい、コントローラーやcommand(コマンド)に適宜設定します。今回はcommandを使います。

job(ジョブ)を保存するqueue(キュー)ドライバーにはデータベース、Redis、Amazon SQS等あり、今回はデータベース(database)を利用します。

公式ドキュメント

2.queue(jobsテーブル)の作成

ジョブを保存するテーブルを生成

php artisan queue:table

これでjobsというテーブルが生成されます。

jobsテーブルをdatabaseに反映

php artisan migrate

キューを使うとこのテーブルに未実行のjobが溜まっていきます。

envファイルの修正

# QUEUE_CONNECTION=sync
QUEUE_CONNECTION=database

デフォルトではsyncとなっているが、databaseに変更します。

3.job(ジョブ)の作成

jobファイルの作成

php artisan make:job ImportProducts

以下にファイルが作成される。
App\jobs\ImportProducts.php

本記事ではjsonデータをDBにインポートする例を取り上げています。


handle()に処理を記載

use App\Models\Product;
use App\Models\ProductInfo;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Storage;

public function handle(){
  $Products = Storage::get('public/Products.json');
  $json = mb_convert_encoding($Products, 'UTF8', 'ASCII,JIS,UTF-8,EUC-JP,SJIS-WIN');
  $array = json_decode($json, true);
  $Products_arrays = $array['Products'];
  DB::beginTransaction();
  try{
    foreach($Products_arrays AS $Products_array){
      $ProductInfo_array = $Products_array['ProductInfo'];
      $ProductInfo = ProductInfo::create([
        'category' => $ProductInfo_array['category'],
        'term_from' => $ProductInfo_array['termFrom'],
        'selling_flag' => $ProductInfo_array['sellingFlag']
      ]);

      Product::create([
        'product_code' => $Products_array['ProductCode'],
        'product_name' => $Products_array['ProductName'],
        'description' => $Products_array['Description'],
        'price' => $Products_array['Price'],
        'product_info_id' => $ProductInfo->id
      ]);
    }
    DB::commit();
  } catch(Exception $exception){
    DB::rollback();
    throw $exception;
  }
}

上記は下のようなデータ(JSON)を読み取り、連想配列として扱えるようにした後、ProductsとProductInfoモデルを使ってそれぞれテーブルにデータを挿入していく処理になります。

Products.json

{
  "Year": 2021,
  "Products": [
    {
      "ProductCode": "A01",
      "ProductName": "チョコレート",
      "Description": "甘さ控えめのチョコレート",
      "Price": "120",
      "ProductInfo": {
        "category": "スナック",
        "termFrom": "20210110",
        "sellingFlag": "1"
      }
    },
    {
      "ProductCode": "A02",
      "ProductName": "ポテトチップス",
      "Description": "うすしお味のポテトチップス",
      "Price": "140",
      "ProductInfo": {
        "category": "スナック",
        "termFrom": "20210215",
        "sellingFlag": "1"
      }
    },
    {
      "ProductCode": "B01",
      "ProductName": "コーラ",
      "Description": "ダイエットコカ・コーラ",
      "Price": "130",
      "ProductInfo": {
        "category": "飲料水",
        "termFrom": "20210325",
        "sellingFlag": "1"
      }
    }
  ],
  "ShopId": "1"
}

Products部分はデータの例として3件までしか載せていませんが、大量のデータ(商品)がある場合はここが繰り返されるイメージです。

4.command(コマンド)の作成

command(コマンド)の作成

php artisan make:command FetchImportProducts

以下にファイルが作成される。
App\Console\Commands\FetchImportProducts

$signatureのところを変更する。

protected $signature = 'command:fetch_import_products';

handle()内でjobsをdispatchする

use App\Jobs\ImportProducts;
use Illuminate\Support\Facades\Log;
public function handle(){
  Log::info("start import product data");
  ImportProducts::dispatch();
  Log::info("end import product data");
}

 

5.worker(ワーカー)の起動とコマンドの実行

worker(ワーカー)の起動
キューに入ったジョブを裏側で処理

php artisan queue:work

コマンドの実行(別のpromptを立ち上げて実行)

php artisan command:fetch_import_products

command-run

コマンドを実行するとqueueに入ったjobが実行されます。
Processingで処理され、Processedになると処理が完了します。

queue-work

またProcessedになると、jobsテーブルは空になります。

jobが失敗した場合は、failed_jobsテーブルにデータが作成されます。

failed_jobs

exceptionにエラーの内容が格納されています。