IO.select処理で役に立つセルフパイプテクニック
はじめに
IO.select処理をしていて困った問題としてブロッキング問題があると思います。ブロッキングとは、バッファにデータが全くなくてデータ読み込み処理で待機が発生したり、バッファにデータがあるが、読み込みたいバイト数のデータがなくてデータ読み込み処理が待機したりすることで起きる現象です(書き込みも同様)。IO.select処理でブロッキングが発生したら後続の処理が待たされてしまい非効率です。この問題を解決する方法としてセルフパイプテクニックがあります。セルフパイプテクニックの歴史は1990年頃から始まり、現在も私たちが普段使っているライブラリーで使われてます。セルフパイプテクニック
IO.selectでブロッキングが起きる例と解決例
例えば、rubyでIO.select処理を行いたい場合は、IO.pipeというものを使いこのように書くことができます。これはブロッキングが起きてしまう例です。
eg_1 = -> {
async_heavy_process = -> { puts "heavy" }
r, w = IO.pipe
fork { sleep 5; w.puts "hoge" }
IO.select([r])
async_heavy_process.()
puts r.gets
}
eg_1.()
5秒立たないと、asyncheavyprocess処理が走らなく効率が悪いプログラムになってしまってます。非同期で実行される重い処理は早く実行したいものです。では問題を解決する事を考えましょう。IO.selectに渡されたIOオブジェクトの中に常に準備完了状態のものがあればIO.selectでブロッキングされることは無くなります。
このアイディアの元生まれたのがセルフパイプテクニックです。ではこのアイディアを埋め込んだ例1
の修正版のコード例2をみてみましょう。
eg_2 = -> {
async_heavy_process = -> { puts "heavy"; }
self_reader, self_writer = IO.pipe
self_writer.puts 0
r, w = IO.pipe
fork { sleep 5; w.puts "hoge" }
IO.select([r, self_reader])
async_heavy_process.()
puts r.gets
}
eg_2.()
実行したらすぐに"heavy"
が表示されてIO.selectの部分でブロッキングされない事がわります。他にも用途があると思うがもし知っていたら教えて欲しいです。
セルフパイプテクニックが使われているライブラリ例
例で紹介した例は非常にシンプルで実用性に乏しいので最後に短なライブラリーでこのセルフパイプテクニックがどのように使われているのかを紹介します。
- foreman
- unicorn
foreman
foremanは起動したいコマンドを定義したProcfileを読み取り、マルチプロセスで実行、各プロセスで発生した標準出力(標準エラー)はパイプを通してメインプロセスで起動しているプログラムに渡されて、標準出力で表示されるツールです。具体的にはこんな感じのもの
app: sleep 5 && echo 'app' && exit 1; # 子プロセス1
web: while :; do sleep 1 && echo 'web'; done; # 子プロセス2
$ foreman start
00:57:43 app.1 | started with pid 21149 # メインプロセス/メインスレッドで出力
00:57:43 web.1 | started with pid 21150 # メインプロセス/メインスレッドで出力
00:57:44 web.1 | web # 子プロセス2にwriterを渡して書き込ませ、readerを通して、メインプロセス/スレッド2で出力
00:57:45 web.1 | web # 子プロセス2にwriterを渡して書き込ませ、readerを通して、メインプロセス/スレッド2で出力
00:57:46 web.1 | web # 子プロセス2にwriterを渡して書き込ませ、readerを通して、メインプロセス/スレッド2で出力
00:57:47 web.1 | web # 子プロセス2にwriterを渡して書き込ませ、readerを通して、メインプロセス/スレッド2で出力
00:57:48 app.1 | app # 子プロセス1にwriterを渡して書き込ませ、readerを通して、メインプロセス/スレッド2で出力
00:57:48 web.1 | web # 子プロセス1にwriterを渡して書き込ませ、readerを通して、メインプロセス/スレッド2で出力
00:57:48 app.1 | exited with code 1 # メインプロセス/スレッド2で子プロセス1の終了を確認
00:57:48 system | sending SIGTERM to all processes # メインプロセス/メインスレッドから子プロセスへSIGTERMが送られた時の出力(windowsだとSIGKILL)
00:57:48 web.1 | terminated by SIGTERM # メインプロセス/メインスレッドから全ての子プロセスのterminatedを確認した時の出力
である。でどこの処理で使われているかというと、子プロセスに渡したパイプからの標準出力(標準エラー)を取得する処理(waitforoutput)で使われている。コードでいえばここである。
# https://github.com/ddollar/foreman/blob/5b815c5d8077511664a712aca90b070229ca6413/lib/foreman/engine.rb#L406-L420
def watch_for_output
Thread.new do
begin
loop do
io = IO.select([@selfpipe[:reader]] + @readers.values, nil, nil, 30)
read_self_pipe
handle_signals
handle_io(io ? io.first : [])
end
rescue Exception => ex
puts ex.message
puts ex.backtrace
end
end
end
io = IO.select([@selfpipe[:reader]] + @readers.values, nil, nil, 30)
もしこのセルフパイプがなければどうなるかというと、何らかの原因でIO.selectが永久的にブロッキングされてしまったら、watchforoutputの後続の処理である子プロセスの終了確認チェック処理(waitforshutdownorchild_termination)が実行されなくなってしまうという事である。それはforemanでは子プロセスをkillする事ができなくなる事を意味しており最悪の事態です。
# https://github.com/ddollar/foreman/blob/5b815c5d8077511664a712aca90b070229ca6413/lib/foreman/engine.rb#L54-L63
def start
register_signal_handlers
startup
spawn_processes
watch_for_output
sleep 0.1
wait_for_shutdown_or_child_termination
shutdown
exit(@exitstatus) if @exitstatus
end
unicorn
こちらはコードを丁寧に読んでないのであまり詳しくは言及しませんがコードをgrepするとセルフパイプが使われている事がわかります。
# https://github.com/defunkt/unicorn/blob/2c347116305338710331d238fefa23f00e98cf54/lib/unicorn/http_server.rb#L82-L91
# We use @self_pipe differently in the master and worker processes:
#
# * The master process never closes or reinitializes this once
# initialized. Signal handlers in the master process will write to
# it to wake up the master from IO.select in exactly the same manner
# djb describes in https://cr.yp.to/docs/selfpipe.html
#
# * The workers immediately close the pipe they inherit. See the
# Unicorn::Worker class for the pipe workers use.
@self_pipe = []
そして多分この処理でブロッキングを回避するのに役立ってると思います。
# https://github.com/defunkt/unicorn/blob/2c347116305338710331d238fefa23f00e98cf54/lib/unicorn/http_server.rb#L748
def worker_loop(worker)
#
# 省略
#
ret = IO.select(readers, nil, nil, @timeout) and ready = ret[0]
#
# 省略
#
end
興味があったらコードを読んでみたらいいと思います。
まとめ
IO.select処理で発生するブロッキングを回避するためのセルフパイプテクニックを紹介しました。普段お世話になっているforemanやunicornなどのライブラリーで結構使われているテクニックなのでこの際習得してものにしてみたらどうでしょうか。日本での記事が全くなかったので記事にしてみました。至らぬところはあるかと思いますが役に立つと幸いです。