2013-04-04

GNU/LinuxのC++11でプログラミングの常識がひっくりかえった

C++11のコア言語を詳細に解説する本を書いている。ライブラリについては本書の範疇ではないし、どちらかと言えばコア言語の方が好きなのだが、ライブラリの解説だってやってやれないことはない。せっかくなのでC++11の範囲で非同期処理について書くことにしよう。

例えばあるファイルを全部メモリ上に読み込むような処理を考えよう。ファイルの読み込みは時間がかかる。残念ながら、まだmmapのようなmemory mapped fileは標準ライブラリにないし(あったとしてもこの問題には不適だし)、非同期I/Oもない。そのため、ファイルの読み込みは別のスレッドで行い、結果を元のスレッドに返したい。

これは非常に難しい問題である。別のスレッドを作ってその終了を待たなければならないどころか、その別のスレッドから何とかしてデータのやり取りをしなければならないのだ。それだけではなく、ファイルを読み込むという処理も書かなければならない。

幸い、C++11にはstd::asyncがある。これを使えば、なんとスレッドを作るとかデータの受け渡しをするとかいう誰でも書けるがゆえに難しい処理と、ファイルを読み込むという重要な処理を完全に分離できるのだ。

まず、ファイルを読み込む部分だけを関数として書こう。スレッドとかデータの受け渡しとか同期などは、一切考えなくてよい。重要なのは、別スレッドに渡したいデータは関数の戻り値で返すということだ。

// FILE *をfcloseするためのデリーター
struct FILE_ptr_deleter
{
    using pointer = FILE * ;
    void operator ()( pointer ptr ) const
    {
        std::fclose( ptr ) ;
    }
} ;

// ファイルの読み込み
std::vector<char> file_reader( std::string file_name  )
{
    // そもそもPOSIXってbを無視するんだよね。
    // いまどきbが意味を持つ古代の環境なんて一つしかないよね。どれとはあえていわないけど。
    std::unique_ptr<FILE, FILE_ptr_deleter> stream( std::fopen( file_name.c_str(), "rb" ) ) ;

    // ファイルが開けたかどうか確認
    if ( stream.get() == nullptr )
    { // エラー報告はもちろん例外を投げる
        throw std::runtime_error("fopen failed.") ;
    }

    // ファイルサイズを取得
    // いい加減C++にもまともなファイルシステムライブラリがほしい。
    std::fseek( stream.get(), 0L, SEEK_END ) ;
    long size = std::ftell( stream.get() ) ;
    std::rewind( stream.get() ) ;

    // あとは読むだけ
    std::vector<char> buf( size ) ;
    std::fread( buf.data(), size, 1, stream.get() ) ;

    return buf ;
}

まあ、こんな感じだろう。関数file_readerはファイル名を引数で受け取って読み込み、結果を戻り値として返す。この関数に、スレッドや、スレッドを超えたデータの受け渡しのためのコードは一切ない。そんな一般的な処理などわざわざユーザーが手を下すまでもないからだ。ファイルが開けない場合のエラー報告は、例外で行う。例外を使わない理由はない。

さて、この関数を別スレッドで実行して、さらに結果を安全に別のスレッドに返すのがstd::asyncの仕事だ。std::asyncの使い方は非常に簡単。

int main()
{
    try
    {
        // 別スレッドで読み込み開始
        auto result = std::async( std::launch::async, file_reader, std::string("example.txt") ) ;

        // その他の必要な処理

        // さて、ブロックされてもいいので結果を待とうか
        auto buf = result.get() ;

        // せっかくだから出力してみようか。
        for ( char c : buf )
        {
            std::cout << c ;
        }      


    } catch ( std::exception & exception )
    { // エラーはもちろん例外で受け取る。
        std::cout << exception.what() << std::endl ;
    }

}

このように非常に簡単に別スレッドをたちあげて結果を受け取ることができる。

std::asyncは、関数オブジェクトと、その関数オブジェクトに渡す実引数を受け取り、std::futureを戻り値として返す。この場合の戻り値の型は、std::future< std::vector<char> >である。と言っても、馬鹿正直に型名を書くのは面倒なのでauto指定子を使う。ポリシーとしてstd::launch::asyncを渡しているので、すぐさま別スレッドで実行が開始される。

さて、いよいよ結果が欲しくなった。ここでは、ブロックしてもかまわないので、結果が得られるまで待つfutureのメンバー関数getを使う。このgetを呼び出すことにより、別スレッドで実行される関数が終了して値を返すまで待ち、結果を別スレッドでも読めるように返してくれる。std::vectorを返しているが、C++11にはムーブがあるので、パフォーマンス上の問題はない。

さて、関数file_readerは別スレッドで実行されるが、エラー報告にはなんと例外を使っている。これは当然である。std::asyncは例外をスレッド間を超えて、正しくpropagateしてくれるのだ。例外のスレッドを超えたpropagateはC++11で新しく追加された機能であり、std::current_exceptionでstd::exception_ptrを取得して、それを別スレッドでstd::rethrow_exceptionを使って再びthrowすることにより、スレッド間を超えた例外のpropageteを実現できる。

最後に、蛇足的だが結果を標準出力に出力している。ここで使っているのはrange-based forだ。

このように、std::asyncを使えば、本当に重要な処理だけに注力することができる。スレッドを作るとかスレッド間の同期とかデータの受け渡しとか例外のpropageteとかは、わざわざ書くまでもないのだ。

残念ながら、C++11のasyncやfutureには極めて基本的な機能しかない。しかし安心して欲しい。次期C++に向けて現在提案中のN3558が採用された暁には、とても便利になる予定だ。

うん、GNU/Linuxは何の関係もなかった。ついでに、C++11ではすでに常識なので、とくにひっくりかえる常識もなかった。

追記:ファイルの読み込みをfstreamで実装してみたが、どうもあまり代わり映えしない。


std::vector<char> file_reader( std::string file_name )
{
    std::ifstream stream( file_name, std::ios::in | std::ios::binary ) ;
    if ( !stream )
    {
        
        throw std::runtime_error("failed to open a file.") ;
    }

    stream.seekg( 0, std::ios::end ) ;
    auto pos = stream.tellg() ;
    stream.seekg( 0, std::ios::beg ) ;

    std::vector<char> buf( pos ) ;
    stream.read( buf.data(), pos ) ;
    
    return buf ;
}

もう少しまともなI/Oライブラリが欲しい。

No comments: