AmazonSQSが使えるのは解ったとして、実際の所はどうなんでしょう?という所を、サンプルソースを交えて書いてみたいと思います。内容は自分の経験に基づいて書いてますが、実物と比べるとかなり簡略化していますのでご注意ください。でも、考え方としては大きく外れる事はありません。
想定するストーリー
PDFを作る処理をWeb上でやります。このPDFを作るには、最低でも数分かかる事が解っています。したがって、PDFが出来たら、ユーザーにメールでお知らせする事にします。ここでは、フロントエンドにRuby on Railsを、バックエンドにJavaを用いています。両者は同一のデータベースを参照出来るようになっています。言い換えると、両者で共有している部分はデータベースだけで、粗な結合になっています。
フロントエンドの動き
フロントエンドの役割は、ユーザーと直接対話する事です。ここでは、PDF作成受付画面と、受付完了画面の表示を行います。動きを解説する前に、少しデータベースに関して書いておきます。データベースは、ごく一般的なRDBを使用し、PDF作成ジョブを表す「Jobs」というテーブルを用意してあります。
このJobsテーブルは、このように定義してあります。
CREATE TABLE jobs ( id SERIAL PRIMARY KEY, sqs_message_id VARCHAR(128), sqs_message_handle VARCHAR(256), status VARCHAR(50) );statusとは、PDFの作成状態がどうなっているのか?という識別子です。PDFの作成が始まっていなければwaiting、PDF作成中であればmaking、作成完了であればfinished、となります。
sqs_message_idとは、SQSにメッセージを投げると取得出来るIDで、SQSのQueueに積んであるメッセージを識別するものです。
sqs_receipt_handleとは、SQSからメッセージを受領したときに取得出来る文字列で、メッセージを削除するのに使用します。また、このカラムに値が入っているという事は、SQSメッセージを受けた事という目印にもなります。
フロントエンドはPDF作成のリクエストを受けると、Jobsテーブルに、status = 'waiting' でレコードを挿入します。次に、SQSメッセージにJob情報(ここではプライマリキーであるID)を書き込み、SQSのサーバに対してメッセージを投げます。その後「受け付けました」画面を出して、PDFが出来上がったらメールでお知らせする旨を表示します。
以上でフロントエンドの役割はおしまいです。
下記は、Ruby on Railsのコードの例です。これは、PDF生成リクエストを受け付けるコントローラ(Jobs)内のメソッド(create)を想定しています。SQSにアクセスするために、RightAwsを使っています。
class JobsController < ApplicationController
def create
@job = Job.new
@job.status = 'waiting'
if @job.save then # jobインサートが完了したら、SQSにメッセージを投げる
sqs = RightAws::SqsGen2.new('AWSのID', 'AWSのSecretKey')
queue = sqs.queue('SQSキュー名')
message = queue.send_message(@job.id) # メッセージの中身はJobのID
@job.sqs_message_id = message.id # JobにSQSのメッセージIDを記録しておく
@job.save!
flash[:notice] = "PDFが出来たら、メールするので待っててね!"
redirect_to :action => :thanks, :id => @job
else
render :nothing => true, :status => 500
end
end
endバックエンドの動き
バックエンドは、ユーザーに見えない所でひたすらSQSを監視しています。バックエンドはメッセージの存在に気づいたら、それを一つ取り出し、中身を読みます。メッセージにはJobのIDが書いてありますから、それをもとにデータベースを見に行って、該当するレコードを引っ張りだします。PDFの作成が終わったら、ユーザーに「できたよ!」メールを送信し、Jobのstatusを「finished」にしておしまいです。一つのジョブをこなしたら、またSQSの監視に戻ります。陰でひたすら作業する、地味でつらい仕事です(笑)。
下記はJavaのコードの例です。PollingCommandというRunnableとして実装していますので、サーバ側ではこのオブジェクトをScheduledThreadPoolなどで定期的に実行する事になります。また、SQSへのアクセスは、typicaを使用しています。
ちなみに、Javaのデータベースの扱いをどう書こうか迷ったのですが、解りやすさを優先させるため、エラーを全く考慮していない素のJDBCとしました。仕事では決してこのようなコードを書かないでくださいね(笑)。実物はS2JDBCを利用してサクサク書きました。
package com.dateofrock.blog.sqssample;
import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import com.xerox.amazonws.sqs2.Message;
import com.xerox.amazonws.sqs2.MessageQueue;
import com.xerox.amazonws.sqs2.SQSUtils;
public class PollingCommand implements Runnable {
public void run() {
// SQSからメッセージを受信
MessageQueue msgQueue = null;
Message message = null;
try {
// キューに接続
msgQueue = SQSUtils.connectToQueue("SQSキュー名", "AWSのID", "AWSのSecretKey");
// メッセージを一つ受け取る
message = msgQueue.receiveMessage();
if (message == null) {
// メッセージが受け取れなかったらなにもしない
return;
}
// メッセージの中にJobのプライマリキーが書いてある
long jobId = Long.parseLong(message.getMessageBody());
// ジョブステータスを作成中にアップデートし、receiptHandleを記録しておく。
Job job = findById(jobId);
job.status = "making";
job.sqsReceiptHandle = message.getReceiptHandle();
updateJob(job);
// SQSからメッセージをdelete
msgQueue.deleteMessage(message);
// PDF生成してるつもり
Thread.sleep(3000);
// メールを送信してるつもり
Thread.sleep(1000);
// ジョブステータスを完了にアップデートしておく。
job.status = "finished";
updateJob(job);
} catch (Exception e) {
e.printStackTrace();
}
}
public class Job {
public long id;
public String status;
public String sqsMessageId;
public String sqsReceiptHandle;
}
private void updateJob(Job job) {
final String sql = "UPDATE jobs SET status = ? , sqs_message_id = ?, sqs_receipt_handle = ?, updated_at = ?";
Connection con = null;
try {
con = DriverManager.getConnection("JDBCのURL", "DBユーザー名", "DBパスワード");
PreparedStatement ps = con.prepareStatement(sql);
ps.setString(1, job.status);
ps.setString(2, job.sqsMessageId);
ps.setString(3, job.sqsReceiptHandle);
ps.setDate(4, new Date(System.currentTimeMillis()));
ps.executeUpdate();
ps.close();
} catch (SQLException e) {
e.printStackTrace();
} finally {
try {
con.commit();
con.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
private Job findById(long jobId) {
final String sql = "SELECT * FROM jobs WHERE id = ?";
Job job = null;
Connection con = null;
try {
con = DriverManager.getConnection("JDBCのURL", "DBユーザー名", "DBパスワード");
PreparedStatement ps = con.prepareStatement(sql);
ps.setLong(1, jobId);
ResultSet rs = ps.executeQuery();
rs.next();
job = new Job();
job.status = rs.getString("status");
job.sqsMessageId = rs.getString("sqs_message_id");
job.sqsReceiptHandle = rs.getString("sqs_receipt_handle");
rs.close();
ps.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
con.commit();
con.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
return job;
}
}
まとめ
キューをまともに扱うとすると、結構大変です。ですが、その面倒な部分をSQSを使う事によって、比較的楽に非同期処理を実装する事が出来ました。今回の例では、わざわざ間に「Jobs」というテーブルを挟んだので、ちょっと面倒な事になっていますが、このような処理が不要なケースもあると思います。SQSの活用例として記事にしておきました。どなたかのお役に立てれば幸いです。










