Scala言語の非同期処理の鍵を握るFuture型の理解と基本操作

March 26, 2023

Tags /

Future 型のポイント

Future 型が表現するもの

Future 型は非同期処理の実行状態を表現する役割と例外を吸収するエフェクトフルコンテキストとしての役割を兼ね備えています。まとめると、Future 型が表現する状態は以下の 3 つのパターンになります。

実行状態 成否状態 Future 型の値
実行中 - -
実行完了 成功 実行結果
実行完了 失敗 例外

非同期処理の実行状態を表現する役割

Future 型は、非同期処理が行われる際に使用されます。非同期処理を行った場合、その実行結果を知りたいという状況はよくあります。ここで、Future 型の役割は、非同期処理がまだ実行中で実行結果が利用できない状態か、それとも完了して実行結果が利用可能な状態かどうかを表現することです。

例外を吸収するエフェクトフルコンテキストとしての役割

Future は成功した場合に実行結果を保持し、失敗した場合には例外を保持するという Try 型の機能も持っています。Option 型を例外を吸収するためのエフェクトフルなコンテキストと解釈する場合、Try 型は単に失敗したという情報だけでなく、例外を保持することができるため、Option 型の上位互換と見ることができます。つまり、Future 型は Try 型と互換性があり、Future 型も例外を吸収するエフェクトフルなコンテキストであると言えます。

単一の Future 型の値に対して処理をしたいとき

まず、Future 型の値に対して処理を行いたい場合、その非同期処理が成功して完了している必要があります。これは、処理を開始する前に非同期処理を待つ必要があることを意味します。待ち方には、ブロッキングとノンブロッキングの 2 つの方法が存在します。

ブロッキングの方法では、Await.result を使用しますが、ノンブロッキングの方法では、コンビネーターを利用します。ただし、原則としてブロッキングせず、ノンブロッキングに非同期処理を合成することが推奨されます。つまり、Await.result は原則として使用しないことです。逆に言うと、Await.result を使用する場合には、それ相応の根拠を用意する必要があります。

代表的なノンブロッキングの方法としては、map コンビネーターがあります。Option 型の map コンビネーターでは、成功した場合(Some の場合)にのみ処理を行いますが、同様に、Future 型の map コンビネーターも、成功して完了した場合(Success の場合)にのみ処理を行います。

ブロッキングとノンブロッキングの挙動の違いについて

ブロッキングは Future 型の値を使おうとしている呼び出し側のスレッドが、その非同期処理が完了するまで待ち状態に入ることを意味します。一方、ノンブロッキングでは待ち状態に入らず、つまり、プログラムの処理は先に進みます。

ブロッキングをしてはいけない理由

非同期処理において、タスクの実行はスレッドプールを利用して行われます。Future 型を使って非同期処理を開始する際には、ExecutionContext が要求されますが、これはスレッドプールとして認識して問題ありません。ブロッキング処理を行うと、スレッドプール内の待機中のスレッドが占有されることになります。そのため、多くのスレッドがブロッキング処理を行うと、スレッドプール内の利用可能なスレッド数が減少します。利用可能なスレッドがなくなると、新たな非同期処理は即座に開始できず、遅延が発生します。さらに、利用可能なスレッドがない状況では、デッドロックのリスクも存在します。

それでもブロッキングをするというのであれば

スレッドプールのスレッド枯渇を防ぐ措置が発動するかもしれない

一般的に使用される scala.concurrent.ExecutionContext.Implicits.global や Play Framework のデフォルトスレッドプールは、デフォルト設定で ForkJoinPool を採用しています。ForkJoinPool では、ブロッキングが検出された際に自動的に新しいスレッドが追加されるようになっています。ForkJoinPool にブロッキングを検知させる方法として、blocking 関数を使うことができます。ここで、Await.result は blocking を使用しています。つまり、Await.result による非同期処理の待機をした場合、そのスレッドプールのスレッドの数が増えますので、スレッド枯渇によるデッドロックや処理性能の低下は回避できます。しかし、スレッドプールのスレッド数を無限に増やし続けることはできませんし、新しいスレッドの作成にはシステムリソースを消費しますので、望ましくありません。

非同期処理自体がブロッキングを含む場合

非同期処理を待つためのブロッキング(Await.result)ではなく、その非同期処理がブロッキングな処理を呼び出す場合についてです。その場合は、その非同期処理内で blocking を呼び出さないと、前述したスレッド枯渇問題に陥る可能性があります。もしくは、別のスレッドプールを用意するという方法もあります。例えば、DB 接続を想定したとき、DB 接続用のコネクションプールのコネクション数が一定であれば、その数と同じスレッド数で固定するスレッドプールを用意するというものです。スレッドプールのスレッド数を固定する場合は、ForkJoinPool ではなく ThreadPoolExecutor を使用することで簡単に実現できます。

スレッドプールの状態を確認する方法

jstack コマンドを使用することで、スレッド情報やスタックトレースが取得できます。 多くのスレッドがブロックされている場合や、予想以上に多くのスレッドが生成されている場合は、異常が発生している可能性が高いです。また、アプリケーションがフリーズしたり、応答が遅くなったりした場合の調査でも役立つでしょう。

複数の Future 型の値に対して処理をしたいとき

初心者が陥りやすいのは、2 つの Future 型に対して処理をした結果、戻りの型が Future[Future[T]] のようにネストしてしまうことです。しかし、このネストしたものが表現するパターンは、以下のとおりです。

外側の Future 内側の Future 全体的な状態
実行中 実行中 実行中
実行中 成功完了 実行中
実行中 失敗完了 実行中
成功完了 実行中 実行中
成功完了 成功完了 成功完了
成功完了 失敗完了 失敗完了
失敗完了 実行中 実行中
失敗完了 成功完了 失敗完了
失敗完了 失敗完了 失敗完了

前提として、2 つの Future が両方とも成功して完了しなければ、全体が成功して完了した状態にはなりません。そのため、Future[Future[T]]が表現する状態は実質的にFuture[T]と同じものになります。このことから、Future[Future[T]]Future[T]に平坦化することが望ましいです。平坦化を行うためには flatMap コンビネーターを使用します。この平坦化できる性質はモナドという概念を表していることになります。

非同期処理の開始タイミングについて

非同期処理が開始されるタイミングによって処理性能が影響を受けることがあります。具体的には、非同期処理が開始されるのは Future#apply を呼び出した時であり、処理性能を意識する場合はこの開始タイミングに注意する必要があります。

以下は、典型的なコード例です。

def execTaskA(): Unit = ???

def execTaskB(): Unit = ???

// タスクAが成功で完了した場合にタスクBが開始される
Future(execTaskA()).flatMap(_ => Future(execTaskB()).map(_ => ()))

// タスクAとタスクBは並列で処理される
val taskA = Future(execTaskA())
val taskB = Future(execTaskB())
taskA.flatMap(_ => taskB.map(_ => ()))

for 式と flatMap の関係

Future 型においても for 式による flatMap の書き換えが可能です。詳細は省略します。

おわりに

本記事では、Scala 言語における非同期処理と Future 型の基本的な概念や使い方を紹介しました。根本的な考え方は Option 型と同じであるため、そちらも参考にしてください。


Written by Gayamasan who lives and works in Japan.