大量のデータをDBにインポートしたい際に、jobを利用することで非同期処理でデータを投入することができます。
1.job(ジョブ)とqueue(キュー)とworker(ワーカー)について
- job(ジョブ)・・・1つ1つの処理
- queue(キュー)・・・待ち行列
- worker(ワーカー)・・・処理をする人
job(ジョブ)をqueue(キュー)に送ることをdispatch(ディスパッチ)といい、コントローラーやcommand(コマンド)に適宜設定します。今回はcommandを使います。
job(ジョブ)を保存するqueue(キュー)ドライバーにはデータベース、Redis、Amazon SQS等あり、今回はデータベース(database)を利用します。
2.queue(jobsテーブル)の作成
ジョブを保存するテーブルを生成
php artisan queue:table
これでjobsというテーブルが生成されます。
jobsテーブルをdatabaseに反映
php artisan migrate
キューを使うとこのテーブルに未実行のjobが溜まっていきます。
envファイルの修正
# QUEUE_CONNECTION=sync
QUEUE_CONNECTION=database
デフォルトではsyncとなっているが、databaseに変更します。
3.job(ジョブ)の作成
jobファイルの作成
php artisan make:job ImportProducts
以下にファイルが作成される。
App\jobs\ImportProducts.php
handle()に処理を記載
use App\Models\Product; use App\Models\ProductInfo; use Illuminate\Support\Facades\DB; use Illuminate\Support\Facades\Storage; public function handle(){ $Products = Storage::get('public/Products.json'); $json = mb_convert_encoding($Products, 'UTF8', 'ASCII,JIS,UTF-8,EUC-JP,SJIS-WIN'); $array = json_decode($json, true); $Products_arrays = $array['Products']; DB::beginTransaction(); try{ foreach($Products_arrays AS $Products_array){ $ProductInfo_array = $Products_array['ProductInfo']; $ProductInfo = ProductInfo::create([ 'category' => $ProductInfo_array['category'], 'term_from' => $ProductInfo_array['termFrom'], 'selling_flag' => $ProductInfo_array['sellingFlag'] ]); Product::create([ 'product_code' => $Products_array['ProductCode'], 'product_name' => $Products_array['ProductName'], 'description' => $Products_array['Description'], 'price' => $Products_array['Price'], 'product_info_id' => $ProductInfo->id ]); } DB::commit(); } catch(Exception $exception){ DB::rollback(); throw $exception; } }
上記は下のようなデータ(JSON)を読み取り、連想配列として扱えるようにした後、ProductsとProductInfoモデルを使ってそれぞれテーブルにデータを挿入していく処理になります。
Products.json
{ "Year": 2021, "Products": [ { "ProductCode": "A01", "ProductName": "チョコレート", "Description": "甘さ控えめのチョコレート", "Price": "120", "ProductInfo": { "category": "スナック", "termFrom": "20210110", "sellingFlag": "1" } }, { "ProductCode": "A02", "ProductName": "ポテトチップス", "Description": "うすしお味のポテトチップス", "Price": "140", "ProductInfo": { "category": "スナック", "termFrom": "20210215", "sellingFlag": "1" } }, { "ProductCode": "B01", "ProductName": "コーラ", "Description": "ダイエットコカ・コーラ", "Price": "130", "ProductInfo": { "category": "飲料水", "termFrom": "20210325", "sellingFlag": "1" } } ], "ShopId": "1" }
Products部分はデータの例として3件までしか載せていませんが、大量のデータ(商品)がある場合はここが繰り返されるイメージです。
4.command(コマンド)の作成
command(コマンド)の作成
php artisan make:command FetchImportProducts
以下にファイルが作成される。
App\Console\Commands\FetchImportProducts
$signatureのところを変更する。
protected $signature = 'command:fetch_import_products';
handle()内でjobsをdispatchする
use App\Jobs\ImportProducts; use Illuminate\Support\Facades\Log; public function handle(){ Log::info("start import product data"); ImportProducts::dispatch(); Log::info("end import product data"); }
5.worker(ワーカー)の起動とコマンドの実行
worker(ワーカー)の起動
キューに入ったジョブを裏側で処理
php artisan queue:work
コマンドの実行(別のpromptを立ち上げて実行)
php artisan command:fetch_import_products
コマンドを実行するとqueueに入ったjobが実行されます。
Processingで処理され、Processedになると処理が完了します。
またProcessedになると、jobsテーブルは空になります。
jobが失敗した場合は、failed_jobsテーブルにデータが作成されます。
exceptionにエラーの内容が格納されています。