AmazonSQSが使えるのは解ったとして、実際の所はどうなんでしょう?という所を、サンプルソースを交えて書いてみたいと思います。内容は自分の経験に基づいて書いてますが、実物と比べるとかなり簡略化していますのでご注意ください。でも、考え方としては大きく外れる事はありません。
想定するストーリー
PDFを作る処理をWeb上でやります。このPDFを作るには、最低でも数分かかる事が解っています。したがって、PDFが出来たら、ユーザーにメールでお知らせする事にします。ここでは、フロントエンドにRuby on Railsを、バックエンドにJavaを用いています。両者は同一のデータベースを参照出来るようになっています。言い換えると、両者で共有している部分はデータベースだけで、粗な結合になっています。
フロントエンドの動き
フロントエンドの役割は、ユーザーと直接対話する事です。ここでは、PDF作成受付画面と、受付完了画面の表示を行います。動きを解説する前に、少しデータベースに関して書いておきます。データベースは、ごく一般的なRDBを使用し、PDF作成ジョブを表す「Jobs」というテーブルを用意してあります。
このJobsテーブルは、このように定義してあります。
CREATE TABLE jobs ( id SERIAL PRIMARY KEY, sqs_message_id VARCHAR(128), sqs_message_handle VARCHAR(256), status VARCHAR(50) );statusとは、PDFの作成状態がどうなっているのか?という識別子です。PDFの作成が始まっていなければwaiting、PDF作成中であればmaking、作成完了であればfinished、となります。
sqs_message_idとは、SQSにメッセージを投げると取得出来るIDで、SQSのQueueに積んであるメッセージを識別するものです。
sqs_receipt_handleとは、SQSからメッセージを受領したときに取得出来る文字列で、メッセージを削除するのに使用します。また、このカラムに値が入っているという事は、SQSメッセージを受けた事という目印にもなります。
フロントエンドはPDF作成のリクエストを受けると、Jobsテーブルに、status = 'waiting' でレコードを挿入します。次に、SQSメッセージにJob情報(ここではプライマリキーであるID)を書き込み、SQSのサーバに対してメッセージを投げます。その後「受け付けました」画面を出して、PDFが出来上がったらメールでお知らせする旨を表示します。
以上でフロントエンドの役割はおしまいです。
下記は、Ruby on Railsのコードの例です。これは、PDF生成リクエストを受け付けるコントローラ(Jobs)内のメソッド(create)を想定しています。SQSにアクセスするために、RightAwsを使っています。
class JobsController < ApplicationController def create @job = Job.new @job.status = 'waiting' if @job.save then # jobインサートが完了したら、SQSにメッセージを投げる sqs = RightAws::SqsGen2.new('AWSのID', 'AWSのSecretKey') queue = sqs.queue('SQSキュー名') message = queue.send_message(@job.id) # メッセージの中身はJobのID @job.sqs_message_id = message.id # JobにSQSのメッセージIDを記録しておく @job.save! flash[:notice] = "PDFが出来たら、メールするので待っててね!" redirect_to :action => :thanks, :id => @job else render :nothing => true, :status => 500 end end end
バックエンドの動き
バックエンドは、ユーザーに見えない所でひたすらSQSを監視しています。バックエンドはメッセージの存在に気づいたら、それを一つ取り出し、中身を読みます。メッセージにはJobのIDが書いてありますから、それをもとにデータベースを見に行って、該当するレコードを引っ張りだします。PDFの作成が終わったら、ユーザーに「できたよ!」メールを送信し、Jobのstatusを「finished」にしておしまいです。一つのジョブをこなしたら、またSQSの監視に戻ります。陰でひたすら作業する、地味でつらい仕事です(笑)。
下記はJavaのコードの例です。PollingCommandというRunnableとして実装していますので、サーバ側ではこのオブジェクトをScheduledThreadPoolなどで定期的に実行する事になります。また、SQSへのアクセスは、typicaを使用しています。
ちなみに、Javaのデータベースの扱いをどう書こうか迷ったのですが、解りやすさを優先させるため、エラーを全く考慮していない素のJDBCとしました。仕事では決してこのようなコードを書かないでくださいね(笑)。実物はS2JDBCを利用してサクサク書きました。
package com.dateofrock.blog.sqssample; import java.sql.Connection; import java.sql.Date; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import com.xerox.amazonws.sqs2.Message; import com.xerox.amazonws.sqs2.MessageQueue; import com.xerox.amazonws.sqs2.SQSUtils; public class PollingCommand implements Runnable { public void run() { // SQSからメッセージを受信 MessageQueue msgQueue = null; Message message = null; try { // キューに接続 msgQueue = SQSUtils.connectToQueue("SQSキュー名", "AWSのID", "AWSのSecretKey"); // メッセージを一つ受け取る message = msgQueue.receiveMessage(); if (message == null) { // メッセージが受け取れなかったらなにもしない return; } // メッセージの中にJobのプライマリキーが書いてある long jobId = Long.parseLong(message.getMessageBody()); // ジョブステータスを作成中にアップデートし、receiptHandleを記録しておく。 Job job = findById(jobId); job.status = "making"; job.sqsReceiptHandle = message.getReceiptHandle(); updateJob(job); // SQSからメッセージをdelete msgQueue.deleteMessage(message); // PDF生成してるつもり Thread.sleep(3000); // メールを送信してるつもり Thread.sleep(1000); // ジョブステータスを完了にアップデートしておく。 job.status = "finished"; updateJob(job); } catch (Exception e) { e.printStackTrace(); } } public class Job { public long id; public String status; public String sqsMessageId; public String sqsReceiptHandle; } private void updateJob(Job job) { final String sql = "UPDATE jobs SET status = ? , sqs_message_id = ?, sqs_receipt_handle = ?, updated_at = ?"; Connection con = null; try { con = DriverManager.getConnection("JDBCのURL", "DBユーザー名", "DBパスワード"); PreparedStatement ps = con.prepareStatement(sql); ps.setString(1, job.status); ps.setString(2, job.sqsMessageId); ps.setString(3, job.sqsReceiptHandle); ps.setDate(4, new Date(System.currentTimeMillis())); ps.executeUpdate(); ps.close(); } catch (SQLException e) { e.printStackTrace(); } finally { try { con.commit(); con.close(); } catch (SQLException e) { e.printStackTrace(); } } } private Job findById(long jobId) { final String sql = "SELECT * FROM jobs WHERE id = ?"; Job job = null; Connection con = null; try { con = DriverManager.getConnection("JDBCのURL", "DBユーザー名", "DBパスワード"); PreparedStatement ps = con.prepareStatement(sql); ps.setLong(1, jobId); ResultSet rs = ps.executeQuery(); rs.next(); job = new Job(); job.status = rs.getString("status"); job.sqsMessageId = rs.getString("sqs_message_id"); job.sqsReceiptHandle = rs.getString("sqs_receipt_handle"); rs.close(); ps.close(); } catch (Exception e) { e.printStackTrace(); } finally { try { con.commit(); con.close(); } catch (SQLException e) { e.printStackTrace(); } } return job; } }
まとめ
キューをまともに扱うとすると、結構大変です。ですが、その面倒な部分をSQSを使う事によって、比較的楽に非同期処理を実装する事が出来ました。今回の例では、わざわざ間に「Jobs」というテーブルを挟んだので、ちょっと面倒な事になっていますが、このような処理が不要なケースもあると思います。SQSの活用例として記事にしておきました。どなたかのお役に立てれば幸いです。