プログラミング勉強ノート

プログラミングで勉強した事を書いていきます。

Reactive Extensions 2.2 まとめ(1)

ReactiveExtensions 2.2.4を勉強した事のまとめ(随時更新中)

Taskとの相互変換

IObservableとTaskで相互に変換できる。
詳細な情報は@xin9leさんのブログ(Rxとの相互運用)が分かりやすいと思います。

■IObservableからTaskへの変換
  • ToTaskメソッドでTaskに変換。
  • OnComplete発行でTaskの処理完了する。
  • OnComplete直前のOnNextの値がTask.Resultになる。
var task = Observable.Range(1, 3).ToTask(); // IObservableをTaskに変換
Console.WriteLine(await task); // taskの結果を表示(3が表示される)
■TaskからIObservableへの変換
  • ToObservableメソッドでIObservableに変換。
  • IObservableの他の非同期系の物と動作が一緒で、処理中はHot、処理完了したらCold(Subscribeする度にキャッシュしているOnComplete直前のOnNextの値を発行)になる。
var task = Task.Run(() => "ToObservable"); // 文字列を返すTask生成
var observable = task.ToObservable(); // TaskをIObservableに変換
observable.Subscribe(x => Console.WriteLine(x));


await

IObservableもawaitする事が出来るようになった。
詳細な情報は下記のサイトが分かりやすいと思います。
@xin9leさんのブログ(Rxとの相互運用)
@neueccさんのブログ(Reactive Extensions v1.1.11011.11リリースに見る.NET 4.5からの非同期処理)

  • IObservableに対してawaitすることで非同期で値を取り出す事が出来る。
  • 取得出来る値はOnComplete直前のOnNextの値(他の非同期系のIObservableと同じような動き)。
var result = await Observable.Range(1, 3); // awaitで値取り出し
Console.WriteLine(result); // 結果表示(3)


Wait

Waitメソッドが追加された。詳細な情報は下記のサイトが分かりやすいと思います。
@neueccさんのブログ(Reactive Extensions v1.1.11011.11リリースに見る.NET 4.5からの非同期処理)

  • 同期的に最後の値を取得
  • Lastと同じ動作
  • 元々あった同期的に値を取り出すメソッド(First等)がObsoleteになり
    その代わりとして追加された?
var result = Observable.Range(1, 3).Wait(); // 同期的に最後の値を取得
Console.WriteLine(result); // 結果表示(3)


FirstAsync, FirstOrDefaultAsync, LastAsync, LastOrDefaultAsync, SingleAsync, SingleOrDefaultAsync

同期的に値を取得するメソッドがObsoleteになり
その代わりに非同期で取得出来るメソッドが追加された?
詳細な情報は下記のサイトが分かりやすいと思います。
@neueccさんのブログ(Reactive Extensions v1.1.11011.11リリースに見る.NET 4.5からの非同期処理)

非推奨メソッド 追加メソッド
First FirstAsync
FirstOrDefault FirstOrDefaultAsync
Last LastAsync
LastOrDefault LastOrDefaultAsync
Single SingleAsync
SingleOrDefault SingleOrDefaultAsync
var ids = Observable.Range(1,3);
var result1 = await ids.FirstAsync(); // 非同期で取得(1の値を取得)
var result2 = ids.FirstAsync().Wait(); // 同期的に取得(1の値を取得)


FromAsyncPattern

FromAsyncPatternメソッドがObsoleteに変更された。
詳細な情報は下記のサイトが分かりやすいと思います。
@neueccさんのブログ(Reactive Extensions v1.1.11011.11リリースに見る.NET 4.5からの非同期処理)

  • .NET4.5から非同期系のメソッド(***Async)が追加されたのが理由?
  • TaskFactory.FromAsyncを使いTaskを取得しToObservableで
    IObservableに変換する事で非推奨のメソッドを使わずIObservable化は可能


ForEach、ForEachAsync

ForEachメソッドがObsoleteになり新たにForEachAsyncメソッドが追加された。

  • ForEachAsyncメソッドを使うことにより非同期にForEachの処理が出来る。
  • ForEachメソッドでは、ForEachを呼んだスレッドで同期的に実行されていたので処理が
    終了するまでスレッドがブロックされる。
  • ForEachAsyncでは引数で渡されたIObservableに対してSubscribeして
    OnNextで発行された値に対してactionを実行しているので
    IObservableがカレントスレッドで動作している場合はスレッドがブロックされるので
    非同期に動作させたい場合はスレッドプール上等で動かす必要がある。
// Rangeがカレントスレッドで動くのでawaitしているのに制御戻らず
// ForEachAsyncの処理が終わるまでスレッドブロックされる
await Observable.Range(1, 10000000)
    .ForEachAsync(x => Console.WriteLine(x));


// Rangeがスレッドプールで動くので非同期でForEachAsyncが処理される
await Observable.Range(1, 10000000, System.Reactive.Concurrency.ThreadPoolScheduler.Instance)
    .ForEachAsync(x => Console.WriteLine(x));


// ForEachの場合はRangeをスレッドプールで動作させても
// ForEachが同期的に実行されるので呼んだスレッドが処理終了されるまでブロックされる
Observable.Range(1, 10000000, System.Reactive.Concurrency.ThreadPoolScheduler.Instance)
    .ForEach(x => Console.WriteLine(x));
  • ForEachAsyncは非同期で処理できるが、引数に渡せるのは
    Action型なのでOnNext毎に実行される処理を非同期にしてしまうと正しく完了待機できない
    OnNext毎のactionの処理も非同期にしたい場合はSelectManyを使う
    (詳細は@neueccさんのブログ(非同期時代のLINQ)参照)。


DelaySubscription

ディレイ処理のメソッドとして、DelaySubscriptionメソッドが追加された。

■Delayメソッドの動作

Delayメソッドは発行された値を後続に流すのを遅延する。
Delay中に発行された値をキャッシュしておき、指定時間経過後に後続に流す。

■DelaySubscriptionメソッドの動作

DelaySubscriptionメソッドは処理の開始を遅延する。
Subscribeしてから処理が開始されるまでの時間を遅延する。
遅延中にObservableから値が発行されていても無視する。
開始時間を遅延させるのでメソッドチェーン内のどの位置においても結果は変わらない。
メソッドチェーン内で何度も呼んだ場合はトータルの時間分処理の開始を遅延する。

■動作確認用のObservable作成
// 1,2を発行するObservable(1秒間隔で発行)
var delayTestObservable = Observable.Create<int>(observer =>
    {
        Console.WriteLine("{0:HH:mm:ss}:OnNext(1)", DateTime.Now);
        observer.OnNext(1);

        // 1秒スリープ
        System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));

        Console.WriteLine("{0:HH:mm:ss}:OnNext(2)", DateTime.Now);
        observer.OnNext(2);
        observer.OnCompleted();
        return () => { };
    });
■Delayメソッドの動作確認

Delayメソッドより後の処理は、発行された値を渡すのが遅延されている。
Delayメソッドより前の処理は、遅延されず動き続ける。

Console.WriteLine("{0:HH:mm:ss}:StartMethod", DateTime.Now);
delayTestObservable
    // OnNextされたらすぐに処理される
    .Do(x => Console.WriteLine("{0:HH:mm:ss}:Do1({1})", DateTime.Now, x))
    // 5秒遅延させる
    .Delay(TimeSpan.FromSeconds(5))
    // 5秒遅延1回なので、OnNextから5秒後に処理される
    .Do(x => Console.WriteLine("{0:HH:mm:ss}:Do2({1})", DateTime.Now, x))
    // 5秒遅延させる
    .Delay(TimeSpan.FromSeconds(5))
    // 5秒遅延2回なので、OnNextから10秒後に処理される
    .Subscribe(x => Console.WriteLine("{0:HH:mm:ss}:Subscribe({1})", DateTime.Now, x));


/* 実行結果
10:36:54:StartMethod
10:36:54:OnNext(1)
10:36:54:Do1(1)
10:36:55:OnNext(2)
10:36:55:Do1(2)
10:36:59:Do2(1)
10:37:00:Do2(2)
10:37:04:Subscribe(1)
10:37:05:Subscribe(2)
*/
■DelaySubscriptionメソッドの動作確認

SubScribeされてから実際に処理が開始されるまでが遅延されている。

Console.WriteLine("{0:HH:mm:ss}:StartMethod", DateTime.Now);
delayTestObservable
    //OnNextされたらすぐに処理される
    .Do(x => Console.WriteLine("{0:HH:mm:ss}:Do1({1})", DateTime.Now, x))
    //処理の開始を5秒遅延させる
    .DelaySubscription(TimeSpan.FromSeconds(5))
    //OnNextされたらすぐに処理される
    .Do(x => Console.WriteLine("{0:HH:mm:ss}:Do2({1})", DateTime.Now, x))
    //処理の開始をを更に5秒遅延させる
    .DelaySubscription(TimeSpan.FromSeconds(5))
    //OnNextされたらすぐに処理される
    .Subscribe(x => Console.WriteLine("{0:HH:mm:ss}:Subscribe({1})", DateTime.Now, x));


/* 実行結果
10:46:43:StartMethod
10:46:53:OnNext(1)
10:46:53:Do1(1)
10:46:53:Do2(1)
10:46:53:Subscribe(1)
10:46:54:OnNext(2)
10:46:54:Do1(2)
10:46:54:Do2(2)
10:46:54:Subscribe(2)
*/
■DelayメソッドでDelaySubscriptionメソッドと同じ動きをさせるには?
// 開始処理を遅延させたいObservableを定義
var ids = Observable.Range(1, 3)
    .Do(x => Console.WriteLine("{0:HH:mm:ss}:Range OnNext({1})", DateTime.Now, x));


// Delayを単純に呼んだ場合
Console.WriteLine("{0:HH:mm:ss}:Start", DateTime.Now);
ids.Delay(TimeSpan.FromSeconds(5))
    .Subscribe(x => Console.WriteLine("{0:HH:mm:ss}:Subscribe OnNext({1})", DateTime.Now, x));

/* 処理結果:SubscribeのOnNext処理が遅延しているだけでidsの開始処理は遅延されていない
13:00:01:Start
13:00:01:Range OnNext(1)
13:00:01:Range OnNext(2)
13:00:01:Range OnNext(3)
13:00:06:Subscribe OnNext(1)
13:00:06:Subscribe OnNext(2)
13:00:06:Subscribe OnNext(3)
*/


// Emptyに対してDelayしConcatで開始処理を遅延したいObservableを渡す
Console.WriteLine("{0:HH:mm:ss}:Start", DateTime.Now);
Observable.Empty<int>()
    .Delay(TimeSpan.FromSeconds(5))
    .Concat(ids)
    .Subscribe(x => Console.WriteLine("{0:HH:mm:ss}:Subscribe OnNext({1})", DateTime.Now, x));

/* 処理結果:idsの開始処理も遅延されている
13:08:03:Start
13:08:08:Range OnNext(1)
13:08:08:Subscribe OnNext(1)
13:08:08:Range OnNext(2)
13:08:08:Subscribe OnNext(2)
13:08:08:Range OnNext(3)
13:08:08:Subscribe OnNext(3)
*/

ReactivePropertyのソース見れば
OnErrorRetryメソッドでDelayを使った開始処理の遅延がありますのでそっちを参考にしましょう。


Create

// 5秒毎に値を発行するIObservable
Observable.Create<int>(async observer =>
    {
        // 値発行
        observer.OnNext(1);
        // 5秒待機(※非同期なメソッド呼び出し Taskを返すデリゲートなのでawait利用できる)
        await Task.Delay(TimeSpan.FromSeconds(5));
        // 値発行
        observer.OnNext(2);
        // 5秒待機(※非同期なメソッド呼び出し Taskを返すデリゲートなのでawait利用できる)
        await Task.Delay(TimeSpan.FromSeconds(5));
        // 値発行
        observer.OnNext(3);
        // 完了
        observer.OnCompleted();
        // Disposeされた時に実行されるActionを返す
        return () => { };
    })
    .Subscribe(x => Console.WriteLine("{0:HH:mm:ss}:Subscribe OnNext({1})", DateTime.Now, x));

/* 処理結果:5秒毎に値が発行されている
13:53:05:Subscribe OnNext(1)
13:53:10:Subscribe OnNext(2)
13:53:15:Subscribe OnNext(3)
*/


Defer DeferAsync

■Deferメソッド
  • Taskを受け取るオーバーロードが増えた
    Func<Task<IObservable<TResult>>>を引数に渡す。
  • 非同期にIObservableを生成するメソッドをIObservable化する?
  • 内部の処理では、Func<Task<IObservable<TResult>>>をStartAsyncに渡して、IObservable<IObservable<TResult>>を取得し、それをMergeでばらしてIObservable<TResult>に変換している?
// 非同期でIObservable<int>を生成するメソッド
private async static Task<IObservable<int>> CreateObservableRangeAsync()
{
    // 5秒待機
    await Task.Delay(TimeSpan.FromSeconds(5));
    // IObservable<int>を生成
    return Observable.Range(1, 3);
}

// 非同期でIObservable<int>を生成するTaskをIObservable化
// Task<IObservable<int>> => IObservable<IObservable<int>> => IObservable<int> の順番で変換される
// StartAsync(ToObservable) => Merge の順番でメソッドが呼ばれている
IObservable<int> defer = Observable.Defer(CreateObservableRangeAsync);

// Subscribe
defer.Subscribe(Console.WriteLine);

/* 結果表示
1
2
3
*/
■DeferAsyncメソッド
  • Deferとの違いはCancellationTokenを引数に貰えるだけ?


StartAsync

  • Taskを返すメソッドをIObservable化出来る。
  • Func<Task>, Func<Task<TResult>>, Func<CancellationToken,Task>, Func<CancellationToken,Task<TResult>>を引数に渡せる。
  • 引数無しのFuncは、TaskをToObservableするのと同じ。
  • CancellationTokenを引数に受け取るFuncは、購読解除時(Dispose)に
    CancellationTokenSourceのCancelを呼んでくれる。
// Taskを返すメソッドをIObservable化
// TaskをToObservableするのと変わらない
Observable.StartAsync(() =>
    {
        return Task.Run(() => "StartAsync");
    })
    .Subscribe(Console.WriteLine);


/* 結果表示
StartAsync
*/


// CancellationTokenを引数に貰うオーバーロード
// 購読解除した時にCancellationTokenSource.Cancelを呼んでくれる
// TaskをToObservableで変換した場合は購読解除してもCancelは呼んでくれないのでTaskの処理は終了するまで動き続けている
var dispose = Observable.StartAsync(token =>
    {
        return Task.Run(async () =>
            {
                try
                {
                    // Cancelするまで無限ループ
                    while (true)
                    {
                        // Cancelされたら例外発行
                        token.ThrowIfCancellationRequested();
                        // 1秒待機
                        await Task.Delay(TimeSpan.FromSeconds(1));
                    }
                    return "StartAsync Cancel";
                }
                catch (OperationCanceledException ex)
                {
                    // cancelされた時にメッセージ表示
                    Console.WriteLine(ex.Message);
                    throw;
                }
            });
    })
    .Subscribe(Console.WriteLine);

// すぐに購読解除
dispose.Dispose();


/* 結果表示
操作はキャンセルされました。
*/


FromAsync

  • Taskを返すメソッドをIObservable化出来る。
  • StartAsyncをDeferで包んでいる。
  • Deferで包んでいるのでSubscribeするまでTaskの処理は開始されない。


Using

  • Task関係のオーバーロードが追加されている
  • Taskの処理を確実に終了してくれる?


Schedulerの指定

Scheduleの指定の仕方が変わった。
今までは、スレッドプール上で実行したい場合はScheduler.ThreadPool等、Schedulerクラスで定義されているIScheduleを渡していたがObsoleteに変更された。

■新しいSchedulerの指定に使う定義(一部)

  • System.Reactive.Concurrency.DefaultScheduler.Instance
  • System.Reactive.Concurrency.CurrentThreadScheduler.Instance
  • System.Reactive.Concurrency.ImmediateScheduler.Instances
  • System.Reactive.Concurrency.ThreadPoolScheduler.Instance
  • System.Reactive.Concurrency.NewThreadScheduler.Default
  • System.Reactive.Concurrency.TaskPoolScheduler.Default
// スレッドプール上で実行
// 非推奨となったスケジューラの指定方法
Observable.Range(1, 10000, System.Reactive.Concurrency.Scheduler.ThreadPool)
    .Subscribe(Console.WriteLine);

// スレッドプール上で実行
// 今後推奨されるスケジューラの指定方法
Observable.Range(1, 10000, System.Reactive.Concurrency.ThreadPoolScheduler.Instance)
    .Subscribe(Console.WriteLine);


合成処理

合成のメソッドでTaskを渡せるようになった。
Task<TSource>を受け取るメソッド(SelectMany)
IObservable<Task<TSource>>を受け取るメソッド(Merge等)

  • Task<TSource>を受け取るメソッドは内部でTask<TSource>をToObservableしている。
  • IObservable<Task<TSource>>を受け取るメソッドは内部でSelect呼んでToObservableしている。


Rx勉強するのに参考にしているサイト


個人的なメモ

  • Visual Studioデバッグ設定で「マイコードのみを有効にする」のチェックをはずしていた方がよい?
    これ有効にしてるとOnErrorの処理等記述していてもマイコードで例外をキャッチしていないので例外発生するたびにデバッガが捕捉するので面倒。