« Go言語でトランザクション制御のラッパー関数を作った話 | メイン | Go言語のクエリビルダ−をタイプセーフに扱いたい »

2016年7月 3日 (日)

lambdaからlambdaを起動するリレー方式SQSコンシューマ

こんにちは。エンジニアの島袋です。 最近AWS LambdaによるSQSコンシューマを実装していて、困ったことがあったのでそれについて書きます。

TL;DR

  • SQSのコンシューマをLambdaで実装している
  • scheduled eventでlambdaを起動して、キューに存在するメッセージを全て処理したい
  • キューが空になるまでlambdaからlambdaを起動するリレー方式を実装した

作ってるもの

以下のようなシステムを作っています。

  • S3にファイルがputされる
  • S3 Event NotificationでSQSにメッセージをキューイング
  • Lambdaで実装したコンシューマがメッセージを取り出し、S3にputされたファイルについて何らかの処理を行う

割とあるあるな感じのユースケースだと思います。

S3 Event Notificationで直接Lambdaを起動すれば良いような気もしますが、先日こんなことがありましたね。

サーバレスアーキテクチャの「AWS Lambda」、米国東リージョンの一部のユーザーで障害。約3時間エラーや遅延など

Lamdaの障害が記憶に新しい〜新しすぎる〜(;´Д`)

S3 Event Notificationで直接Lambdaを起動した際にエラーが発生すると、リトライ処理が面倒そうなので、間にSQSを挟むことにしました。

(SQSの場合、メッセージコンシューマが明示的にメッセージを削除しないと一定時間経過後(visibilityTimeout)にメッセージは再びキューに入るので、自動リトライ可能)

Lambda自体はscheduled eventで一定時間毎に起動する想定です。

ファイルの処理に即時性が要求されないのであれば、特に問題の無い構成に見えます。

しかし、LambdaでSQSコンシューマを実装する場合、次の問題があります。

Lambdaは1回の起動時間に制限があるため、1度の起動で全メッセージを処理することが難しい

即時性が要求されないとはいえ、まだキューにメッセージが残ってるのに一定時間経過後にしか処理されないというのは少し微妙です。

できれば1回の起動で現在キューに存在しているメッセージについては全て処理して欲しいところです。

しかし、上記の通りLambdaには起動時間の制限があります。

Lambdaの起動時間上限目一杯までループで処理を繰り返す方法もありますが、起動時間管理の処理を書くのは結構面倒そうです。

Lambdaを複数並列に起動すればいけそうですが、仮にファイルの処理結果をなんらかのデータストアに書き込むと想定すると、データストアにかかる負荷が気になります。

データストア等にに過負荷をかけることなく、キューに残存するメッセージを全て処理する方法はないものでしょうか??

後続処理をまかせるLambdaを起動すればよいのでは?

一度メッセージを処理し終わって、sqsにキューにメッセージが残っているか確認し、残っていれば自身と同じLambdaファンクションを指定して起動するようにすれば良さそうです。

リレーでバトンタッチするイメージですね。

Undoukai_baton_relay


この方式の場合、厳密には「一回の起動」ではありませんが、次回のスケジューリングを待つことなくキュー内のメッセージが全て処理されます。

以下のような感じ

import λ from 'apex.js';
import AWS from 'aws-sdk';
import 'babel-polyfill';

const main = async () => {
  const lambda = new AWS.Lambda();
  const sqs = new AWS.SQS();
  // メッセージを取得
  const data = await sqs.receiveMessage({
    QueueUrl: 'xxx',
    MaxNumberOfMessages: 0,
    WaitTimeSeconds: 1,
  }).promise();

 if (data.Messages.length === 0) return;

  // メッセージに対してなんらかの処理をすると同時にdeleteMessageBatch用のEntryを作成
  const entries = data.Messages.map((message, idx) => {
    /* do somethng */
    return {
      id: idx.toString(),
      ReceiptHandle: message.ReceiptHandle,
    };
  });

  // 処理済みのメッセージを削除
  await sqs.deleteMessageBatch({
    QueueUrl: 'xxx',
    Entries: entries,
  }).promise();

  // キューに残っているメッセージ数を問い合わせ
  const attr = await sqs.getQueueAttribute({
    QueueUrl: 'xxx',
    AttributeNames: ['ApproximateNumberOfMessages'],
  }).promise();

  // String型なので、数値変換
  const remains = parseInt(attr.Attributes.ApproximateNumberOfMessages, 10);

  // メッセージが残っていればlambda起動
  if (remains > 0) {
    await lambda.invoke({
      FunctionName: 'xxx',
      InvocationType: 'event',
      LogType: 'none',
    }).promise();
  }
};

export default λ((e, ctx) => {
  main();
});

全体的にエラー処理ははしょってるので注意してください

(async/awaitつかってるところはtry-catchで囲む必要があります)

export defaultのところが見慣れない感じになっていますが、これはapexでlambdaを管理するときに利用できるラッパーです。

(弊社、Salesforceメインの会社なので紛らわしいプロダクト名ですね。。。)

重要なところはlambda.invokeのパラメータInvocationTypeにeventを指定しているところです。

ここでeventではなくRequestResponseを指定すると、起動したLambdaの処理が完了するまで処理をブロックしてしまうため、キューが空になるか、タイムアウトが発生するまで処理が返ってこないことになるため、無駄にコンピューティング時間を消費します。

eventを指定した場合、Lambda起動後に即座に処理が返ってくるので、あとは後続のLambdaにおまかせということになります。

まだ本番投入はしていませんが、動かしてみたところまずまず問題なさそうです。

Lambdaにgo言語サポートを加えて欲しい

実は今回のSQSコンシューマはいったんgo言語で実装していたんですが、serverlessの風にあてられて、もしくは社内でserverless風吹かせたくてサーバの運用コスト削減のためにLambdaで実装し直したという経緯があります。

前述のapexを利用すると裏ワザ的な方法でgo言語をLambdaで利用することが可能なんですが、残念ながらSQSをサポートしていないため諦めました。

Javascriptで書くと非同期処理の管理が煩雑になるんですよね。

今回はasync/awaitを使っているので大分マシですが、Promiseでやるとすごい縦長になって、あとからソースを見て読解するときになかなかつらいことになりそうです。

(愚直にcallback使うのは無論論外)

じゃあPython使えよって話になるんですが(実際ググったらPython利用しているケースが多い)、

  • サーバサイド : go言語
  • フロントエンド : javascript
  • Lambda : Python

と3言語同時に扱うのはなかなか厳しい(´・ω・`)

というわけでAWSさんは一刻も早くgo言語のサポートをお願いします(人∀・)タノム

おまけ : ショートポーリングに対する誤解

lambdaを5分毎に起動して、receiveMessageでmaxの10件づつメッセージを取り出して処理すればいいと考えていたんですが、実際に書いて動かしてみたところ、キュー内には10件以上メッセージが存在するにも関わらず、メッセージが1,2件しか返ってきません。

APIドキュメントを確認すると、以下の記述が見つかりました。

Short poll is the default behavior where a weighted random set of machines is sampled on a ReceiveMessage call. This means only the messages on the sampled machines are returned. If the number of messages in the queue is small (less than 1000), it is likely you will get fewer messages than you requested per ReceiveMessage call. If the number of messages in the queue is extremely small, you might not receive any messages in a particular ReceiveMessage response; in which case you should repeat the request.

aws-sdk APIリファレンスより引用

ショートポーリングの場合、キュー内のメッセージ数が少ない(1000以下など)場合、キュー内のメッセージ数で重み付けされたサンプル数のサーバにしか問い合わせにいかないため、MaxNumberOfMessagesに指定した数より少ないメッセージしか返ってこず、キュー内のメッセージ数が極めて少ない場合、メッセージが返却されないことがあるため、繰り返しreceiveMessageを発行する必要がある。

という理解であってるんですかね?

ロングポーリングの場合はどうなんでしょうか? 次の記述が見つかりました。

もう 1 つのメリットは、キューに使用可能なメッセージがあるがレスポンスに含まれていない場合に、偽の空のレスポンスを減らすことができる点です。この状況は、Amazon SQS がショート(標準)ポーリングを使用する場合(デフォルトの動作)に発生します。このとき、レスポンスに含めることができるメッセージがあるかどうかを調べるために、(重み付けされたランダム分散に基づいて)サーバーのサブセットだけに対してクエリが実行されます。一方、ロングポーリングを有効にすると、Amazon SQS はすべてのサーバーに対してクエリを実行します。

Amazon SQS ロングポーリング - Amazon Simple Queue Service より引用

どうやらロングポーリングの場合、上記の問題は発生しないようです。

(receiveMessageのWaitTimeSecondsを1以上にすると、ロングポーリングになる)

Lambdaはコンピューティング時間で課金されるため、なるべく「待ち」の時間は減らしたいところですが、ショートポーリングで1,2件づつメッセージを処理するよりは効率が良さそうです。

コメント

コメントを投稿

採用情報

株式会社フレクトでは、事業拡大のため、
・Salesforce/Force.comのアプリケーション開発
・HerokuやAWSなどのクラウドプラットフォーム上での
Webアプリケーション開発
エンジニア、マネージャーを募集中です。

未経験でも、これからクラウドをやってみたい方、
是非ご応募下さい。

フレクト採用ページへ

会社紹介

株式会社フレクトは、
認定コンサルタント
認定上級デベロッパー
認定デベロッパー
が在籍している、セールスフォースパートナーです。
また、heroku partnersにも登録されています。
herokuパートナー
株式会社フレクトのSalesforce/Force.com
導入支援サービス
弊社の認定プロフェッショナルが支援致します。
・Visualforce/Apexによるアプリ開発
・Salesforceと連携するWebアプリ開発
も承っております。
セールスフォースご検討の際は、
お気軽にお問合せください。

2024年4月

  1 2 3 4 5 6
7 8 9 10 11 12 13
14 15 16 17 18 19 20
21 22 23 24 25 26 27
28 29 30        
ブログ powered by TypePad