Kotlin coroutines Flowを用いたAndroid SensorEventに対する非同期ストリーム処理
こんにちはソフトウェアエンジニアの @banana-umai です。
今回のエントリーではKotlinを用いたAndroid開発にまつわるTipsをご紹介したいと思います。
MODEではこれまでAndroidデバイスを用いて自動車の走行データを取得するためのAndroidアプリや、お客様ご自身でAndroidアプリケーションをMODE Cloudに接続してIoTゲートウェイとして利用するためのSDKを開発してきました。本稿では、MODEのAndroid開発において多く用いているセンサーデータを非同期ストリームとして処理する、という方法をご紹介します。具体的にはKotlinのcouroutinesライブラリを用いることになるのですが、その具体的な方法や、それによるメリットなどについてお伝えできればと思います。
Android開発をしている中でFlowやChannelなどの使い所はどこだろう?と思っている方々や、これからセンサーを用いた開発を行うとしている方々の参考になれば幸いです。
AndroidでSensor Dataを利用する方法
本題に入る前に、まずは基本的的なAndroidにおけるSensor Dataの利用方法について触れたいと思います。詳細は公式ドキュメントなどを参考にしていただければと思いますが、とても単純化すると以下のような形で利用することになると思います。
val listener = object : SensorEventListener { override fun onSensorChanged(event: SensorEvent?) { .... } override fun onAccuracyChanged(sensor: Sensor?, accuracy: Int) {...} } val sensor = sensorManager.getDefaultSensor(SENSOR_TYPE) sensorManager.registerListener(listener, sensor, ...)
SensorEventListenerを定義し、SensorManagerに登録することで、onSensorChangedメソッドがコールバックされてセンサーデータを受け取ることになります。Android開発においてコールバックベースのAPIは数多くあり、センサーデータの利用方法についても珍しい方法ではないと思いますが、一般的にコールバックベースのAPIを多用していくと処理が複雑化してきた際に制御の流れをコードから追いかけるのが難しくなりがちです。そこで、次節ではcoroutines.Flowを用いてコールバックベースのAPIを、ストリームベースのAPIに変換する方法を紹介します。
coroutines Flowを用いてセンサーデータをストリームとして扱う
FlowはKotlinにおける並行処理、非同期処理のデファクト標準になりつつある(なっている?)kotlinx.coroutinesライブラリの一部で、非同期データストリームを提供するコンポーネントです(詳細な説明は公式ドキュメントや作者の一人であるRoman Elizarov氏のブログなどを御覧ください)。
それでは、具体的にSensorEventストリームを作成する方法を説明していきたいと思います。ここでは、以下のように利用できるsensorEventFlow関数を作成します。
class MainActivity : AppCompatActivity() { override fun onCreate(savedInstanceState: Bundle?) { ...snip lifecycleScope.launch { val sensorManager = getSystemService(Context.SENSOR_SERVICE) as SensorManager val sensor = sensorManager.getDefaultSensor(Sensor.TYPE_ACCELEROMETER) // 本当はsensorが利用可能かチェックしたほうが良い val delay = SensorManager.SENSOR_DELAY_NORMAL val handler = Handler(mainLooper) // 本当はBackground用のHandlerThreadを作ったほうが良い sensorEventFlow(sensor, sensorManager, delay, handler).collect { event -> Log.d("SensorEvent", "receive $event") } } } }
細かいエラー処理などを端折るとsensorEventFlow関数の実装は概ね以下のような形になります。
@ExperimentalCoroutinesApi fun sensorEventFlow(sensor: Sensor, manager: SensorManager, delay: Int, handler: Handler): Flow<SensorEvent> { return channelFlow<SensorEvent> { // ・・・1 val listener = object : SensorEventListener { override fun onSensorChanged(event: SensorEvent?) { if (event?.sensor?.type == sensor.type) { launch { send(event) } // ・・・2 } } override fun onAccuracyChanged(sensor: Sensor?, accuracy: Int) { // do something } } try { manager.registerListener(listener, sensor, delay, handler) awaitClose() // ・・・3 } finally { manager.unregisterListener(listener, sensor) // ・・・4 } } }
- channelFlowはFlow buildersと呼ばれる関数の一つで、引数となるコールバック関数内でストリームのデータを作成します。実際にやっているのは前節で解説したAndroidのSensor Dataの利用方法と同様にListnerを作成して、SensorManagerに登録しています。ポイントとなるのは、ListenerのonSensorChanged関数の実装です。
- send関数の呼び出しによって、ストリームにデータ(sensorEventFlowの場合はSensorEventオブジェクト)を送っています。send関数はsuspend関数なので、launch関数(Coroutine builder)によってcoroutineをinvokeする必要があります。
- awaitClose関数は、親となるCoroutineがCloseするまで、channelFlowに渡せれる関数の実行をブロックする処理です。この関数で処理をブロックしないと、即channelFlowのコールバック関数は終了してしまいlistenerがイベントを送信することができません。
- 最後にfinally句の中でsensorの監視を終了します。3)で書いたawaitClose関数はCoroutineが終了する際に、CancelationExceptionを発生させて処理を終了するため、finally句内の処理は実行することができます。
コールバックベースのAPIをストリームに変換することができれば、ストリームに対しての処理を書いていくことができるようになります。例えば以下のような形です。
sensorEventFlow(sensor, sensorManager, delay, handler) .buffer(100) .map { //データ変換処理 } .catch { //例外エラーハンドリング} .collect { // 最終的なデータの処理 }
- buffer: 後続の処理で一時的なデータの消費が一時的に遅くなった際にデータを貯めこめるようにする
- map: データの変換をする。例えば生のSensorEventをよりアプリケーションで利用したい抽象データ型に変換するなど
- catch: 前段(upstream)の処理において何らかのエラー(例外)が発生することが予想される場合にエラー処理を挟む
この他にも様々なオペレータが定義されているので必要に応じて組み合わせて使うことができます。また、ここに定義されていないオペレータを自作することも可能です例: データのスロットリング
また、Flowの特徴の一つはバックプレッシャーが組み込まれている点です。バックプレッシャーとはストリーム処理において、ストリームの下流の処理の滞りを上流に伝えるための仕組みです*1。上記のオペレーターの組み合わせの例でbufferを定義していますが、場合によってはそのbufferサイズを超えてしまうことがあるかもしれません。最終的にこのバックプレッシャーはストリームデータの発生元まで返ってくることになります。SensorEventListnerのonSensorChangedメソッド内でsendを行うところまで戻ってきます。バックプレッシャーが発生した場合に、sendはデフォルトでブロックしますが、coroutineで提供されているselect APIを用いることで、バックプレッシャーが生じた際に何らかの処理を行うように定義することも可能です。
fun sensorEventFlow(sensor: Sensor, manager: SensorManager, delay: Int, handler: Handler): Flow<SensorEvent> { return channelFlow<SensorEvent> { val side = Channel<SensorEvent>() launch { for (event in side) { // do something here } } val listener = object : SensorEventListener { override fun onSensorChanged(event: SensorEvent?) { if (event?.sensor?.type == sensor.type) { launch { select<Unit> { onSend(event) {} side.onSend(event) {} } } } } override fun onAccuracyChanged(sensor: Sensor?, accuracy: Int) { // do something } } try { manager.registerListener(listener, sensor, delay, handler) awaitClose() } finally { side.close() manager.unregisterListener(listener, sensor) } } }
上記のコードでは、SensorEventストリームの下流の処理で詰まってバックプレッシャーが大本まで到達した際に、select式を使って、sideチャネルに処理を逃がすための処理です。sideチャネルの宣言した直後にcoroutineを作成し、その中でsideチャネルに入ってきた場合の処理を記述します。ログを取る、ユーザーに通知する、sensorの監視を停止する、再起動する、など、アプリケーションの性質に応じてどのような処理するのが適切かは変わってくるかと思われます。
まとめ
channelFlowというFlow builder関数を使うことで、コールバックベースのAndroidのセンサーAPIをストリームとして扱うことができるようになります。coroutinesライブラリにはストリームに対する処理を行うためのツールキットが揃っており、宣言的にデータの加工、フィルタリング、エラー処理などを施すことができるというメリットがあります。また、Flowはバックプレッシャーが組み込まれているため、ストリーム処理の下流における処理の不具合を上流に伝達し、ハンドルすることが可能です。なお、Javaをベースで実装する場合、概ね同様のことをRxJavaのFlowableを用いて実現することが可能ですので、別の機会にご紹介できればと思います。センサーはソフトウェア的に時系列で状態をデータを発生するデータソースとみなせるため、ストリーム処理とは非常に相性が良いと思われます。また、Androidとセンサーデータを用いた(ビジネス)アプリケーションの開発に興味がある方の参考になれば幸いです。
最後に、センサーを活用したコネクティッドなビジネスアプリケーションを構築したいという方におかれましては、MODEではAndroidのSDKをはじめ、MODE Labという共同開発方式や各種ソリューションを提供していますので、ご興味ある方はぜひお問い合わせください。
*1:バックプレッシャーの詳細な説明はBackpressure explained — the resisted flow of data through softwareがわかりやすいかと思います