MyCSS

2010/01/28

Amazon SQSの使いどころ 実際編 はてなブックマークに追加

Amazon SQSに関しての前回までの記事
AmazonSQSが使えるのは解ったとして、実際の所はどうなんでしょう?という所を、サンプルソースを交えて書いてみたいと思います。内容は自分の経験に基づいて書いてますが、実物と比べるとかなり簡略化していますのでご注意ください。でも、考え方としては大きく外れる事はありません。

想定するストーリー
PDFを作る処理をWeb上でやります。このPDFを作るには、最低でも数分かかる事が解っています。したがって、PDFが出来たら、ユーザーにメールでお知らせする事にします。


ここでは、フロントエンドにRuby on Railsを、バックエンドにJavaを用いています。両者は同一のデータベースを参照出来るようになっています。言い換えると、両者で共有している部分はデータベースだけで、粗な結合になっています。

フロントエンドの動き
フロントエンドの役割は、ユーザーと直接対話する事です。ここでは、PDF作成受付画面と、受付完了画面の表示を行います。

動きを解説する前に、少しデータベースに関して書いておきます。データベースは、ごく一般的なRDBを使用し、PDF作成ジョブを表す「Jobs」というテーブルを用意してあります。

このJobsテーブルは、このように定義してあります。
CREATE TABLE jobs (
  id SERIAL PRIMARY KEY,
  sqs_message_id VARCHAR(128),
  sqs_message_handle VARCHAR(256),
  status VARCHAR(50)
);
statusとは、PDFの作成状態がどうなっているのか?という識別子です。PDFの作成が始まっていなければwaiting、PDF作成中であればmaking、作成完了であればfinished、となります。
sqs_message_idとは、SQSにメッセージを投げると取得出来るIDで、SQSのQueueに積んであるメッセージを識別するものです。
sqs_receipt_handleとは、SQSからメッセージを受領したときに取得出来る文字列で、メッセージを削除するのに使用します。また、このカラムに値が入っているという事は、SQSメッセージを受けた事という目印にもなります。

フロントエンドはPDF作成のリクエストを受けると、Jobsテーブルに、status = 'waiting' でレコードを挿入します。次に、SQSメッセージにJob情報(ここではプライマリキーであるID)を書き込み、SQSのサーバに対してメッセージを投げます。その後「受け付けました」画面を出して、PDFが出来上がったらメールでお知らせする旨を表示します。
以上でフロントエンドの役割はおしまいです。

下記は、Ruby on Railsのコードの例です。これは、PDF生成リクエストを受け付けるコントローラ(Jobs)内のメソッド(create)を想定しています。SQSにアクセスするために、RightAwsを使っています。
class JobsController < ApplicationController
  def create
    @job = Job.new
    @job.status = 'waiting'

    if @job.save then # jobインサートが完了したら、SQSにメッセージを投げる
      sqs = RightAws::SqsGen2.new('AWSのID', 'AWSのSecretKey')
      queue = sqs.queue('SQSキュー名')
      message = queue.send_message(@job.id) # メッセージの中身はJobのID
      @job.sqs_message_id = message.id      # JobにSQSのメッセージIDを記録しておく
      @job.save!

      flash[:notice] = "PDFが出来たら、メールするので待っててね!"
      redirect_to :action => :thanks, :id => @job
    else
      render :nothing => true, :status => 500
    end
  end
end

バックエンドの動き
バックエンドは、ユーザーに見えない所でひたすらSQSを監視しています。バックエンドはメッセージの存在に気づいたら、それを一つ取り出し、中身を読みます。メッセージにはJobのIDが書いてありますから、それをもとにデータベースを見に行って、該当するレコードを引っ張りだします。PDFの作成が終わったら、ユーザーに「できたよ!」メールを送信し、Jobのstatusを「finished」にしておしまいです。
一つのジョブをこなしたら、またSQSの監視に戻ります。陰でひたすら作業する、地味でつらい仕事です(笑)。

下記はJavaのコードの例です。PollingCommandというRunnableとして実装していますので、サーバ側ではこのオブジェクトをScheduledThreadPoolなどで定期的に実行する事になります。また、SQSへのアクセスは、typicaを使用しています。
ちなみに、Javaのデータベースの扱いをどう書こうか迷ったのですが、解りやすさを優先させるため、エラーを全く考慮していない素のJDBCとしました。仕事では決してこのようなコードを書かないでくださいね(笑)。実物はS2JDBCを利用してサクサク書きました。
package com.dateofrock.blog.sqssample;

import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import com.xerox.amazonws.sqs2.Message;
import com.xerox.amazonws.sqs2.MessageQueue;
import com.xerox.amazonws.sqs2.SQSUtils;

public class PollingCommand implements Runnable {

 public void run() {
  // SQSからメッセージを受信
  MessageQueue msgQueue = null;
  Message message = null;
  try {
   // キューに接続
   msgQueue = SQSUtils.connectToQueue("SQSキュー名", "AWSのID", "AWSのSecretKey");
   // メッセージを一つ受け取る
   message = msgQueue.receiveMessage();
   if (message == null) {
    // メッセージが受け取れなかったらなにもしない
    return;
   }

   // メッセージの中にJobのプライマリキーが書いてある
   long jobId = Long.parseLong(message.getMessageBody());

   // ジョブステータスを作成中にアップデートし、receiptHandleを記録しておく。
   Job job = findById(jobId);
   job.status = "making";
   job.sqsReceiptHandle = message.getReceiptHandle();
   updateJob(job);

   // SQSからメッセージをdelete
   msgQueue.deleteMessage(message);

   // PDF生成してるつもり
   Thread.sleep(3000);
   // メールを送信してるつもり
   Thread.sleep(1000);

   // ジョブステータスを完了にアップデートしておく。
   job.status = "finished";
   updateJob(job);

  } catch (Exception e) {
   e.printStackTrace();
  }
 }

 public class Job {
  public long id;
  public String status;
  public String sqsMessageId;
  public String sqsReceiptHandle;
 }

 private void updateJob(Job job) {
  final String sql = "UPDATE jobs SET status = ? , sqs_message_id = ?, sqs_receipt_handle = ?, updated_at = ?";
  Connection con = null;
  try {
   con = DriverManager.getConnection("JDBCのURL", "DBユーザー名", "DBパスワード");
   PreparedStatement ps = con.prepareStatement(sql);
   ps.setString(1, job.status);
   ps.setString(2, job.sqsMessageId);
   ps.setString(3, job.sqsReceiptHandle);
   ps.setDate(4, new Date(System.currentTimeMillis()));
   ps.executeUpdate();
   ps.close();
  } catch (SQLException e) {
   e.printStackTrace();
  } finally {
   try {
    con.commit();
    con.close();
   } catch (SQLException e) {
    e.printStackTrace();
   }
  }
 }

 private Job findById(long jobId) {
  final String sql = "SELECT * FROM jobs WHERE id = ?";
  Job job = null;
  Connection con = null;
  try {
   con = DriverManager.getConnection("JDBCのURL", "DBユーザー名", "DBパスワード");
   PreparedStatement ps = con.prepareStatement(sql);
   ps.setLong(1, jobId);
   ResultSet rs = ps.executeQuery();
   rs.next();
   job = new Job();
   job.status = rs.getString("status");
   job.sqsMessageId = rs.getString("sqs_message_id");
   job.sqsReceiptHandle = rs.getString("sqs_receipt_handle");
   rs.close();
   ps.close();
  } catch (Exception e) {
   e.printStackTrace();
  } finally {
   try {
    con.commit();
    con.close();
   } catch (SQLException e) {
    e.printStackTrace();
   }
  }
  return job;
 }
}

まとめ
キューをまともに扱うとすると、結構大変です。ですが、その面倒な部分をSQSを使う事によって、比較的楽に非同期処理を実装する事が出来ました。今回の例では、わざわざ間に「Jobs」というテーブルを挟んだので、ちょっと面倒な事になっていますが、このような処理が不要なケースもあると思います。

SQSの活用例として記事にしておきました。どなたかのお役に立てれば幸いです。

2 件のコメント :

trshugu さんのコメント...

SQSで検索してきました。
わかりやすかったです!

Takehito Tanabe さんのコメント...

trshugu さん、ありがとうございます!

コメントを投稿