From 4c5f2a3ced357cccc906cc667686ac262a999238 Mon Sep 17 00:00:00 2001 From: Tuomas Hietanen Date: Thu, 21 Nov 2024 20:30:40 +0000 Subject: [PATCH 1/5] Added more functions --- src/FSharp.Control.R3/Observable.fs | 29 +++++++++++++++++++++++++ src/FSharp.Control.R3/TaskObservable.fs | 19 ++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/src/FSharp.Control.R3/Observable.fs b/src/FSharp.Control.R3/Observable.fs index b1f5afe..03e9d51 100644 --- a/src/FSharp.Control.R3/Observable.fs +++ b/src/FSharp.Control.R3/Observable.fs @@ -2,4 +2,33 @@ module FSharp.Control.R3.Observable open R3 +/// Maps the given observable with the given function let inline map (f : 't -> 'r) source = ObservableExtensions.Select (source, f) + +/// Maps the given observable with the given function and the index of the element +let inline mapi (f : int -> 't -> 'r) source = ObservableExtensions.Select (source, (fun i x -> f x i)) + +/// Filters the observable elements of a sequence based on a predicate +let inline filter (f : 't -> bool) source = ObservableExtensions.Where (source, f) + +/// Hides the identy of an observable sequence +let inline asObservable source : Observable<'Source> = ObservableExtensions.AsObservable source + +/// Binds an observable to generate a subsequent observable. +let inline bind (f : 'T -> Observable<'TNext>) source = ObservableExtensions.SelectMany (source, f) + +/// Converts the elements of the sequence to the specified type +let inline cast<'T, 'CastType> (source) = ObservableExtensions.Cast<'T, 'CastType> (source) + +/// Concatenates the second observable sequence to the first observable sequence +/// upn the successful termination of the first +let inline concat source = ObservableExtensions.Concat source + +/// Returns an observable sequence that only contains distinct elements +let inline distinct source = ObservableExtensions.Distinct source + +/// Bypasses a specified number of elements in an observable sequence and then returns the remaining elements. +let inline skip (count : int) (source) = ObservableExtensions.Skip (source, count) + +/// Takes n elements (from the beginning of an observable sequence? ) +let inline take (count : int) (source) = ObservableExtensions.Take (source, count) diff --git a/src/FSharp.Control.R3/TaskObservable.fs b/src/FSharp.Control.R3/TaskObservable.fs index 6c6252e..1a8e312 100644 --- a/src/FSharp.Control.R3/TaskObservable.fs +++ b/src/FSharp.Control.R3/TaskObservable.fs @@ -7,6 +7,7 @@ open FSharp.Control.R3 module Observable = + /// Maps the given observable with the given function let mapAsync (options : ProcessingOptions) (f : CancellationToken -> 't -> Task<'r>) source = let selector x ct = ValueTask<'r> (f ct x) ObservableExtensions.SelectAwait ( @@ -20,6 +21,9 @@ module Observable = let length cancellationToken source = ObservableExtensions.CountAsync (source, cancellationToken) + /// Invokes an action for each element in the observable sequence, and propagates all observer + /// messages through the result sequence. This method can be used for debugging, logging, etc. of query + /// behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline. let inline iter cancellationToken (action : 't -> unit) source = ObservableExtensions.ForEachAsync (source, action, cancellationToken) let iterAsync cancellationToken options (action : CancellationToken -> 't -> Task) source = @@ -27,3 +31,18 @@ module Observable = |> mapAsync options action |> length cancellationToken :> Task + + /// Applies an accumulator function over an observable sequence, returning the + /// result of the aggregation as a single element in the result sequence + let inline aggregateAsync cancellationToken seed (f : 'r -> 't -> 'r) source = + ObservableExtensions.AggregateAsync (source, seed, f, cancellationToken) + + /// Determines whether all elements of an observable satisfy a predicate + let inline allAsync cancellationToken (f : 't -> bool) source = ObservableExtensions.AllAsync (source, f, cancellationToken) + + /// Determines whether an observable sequence contains a specified value + /// which satisfies the given predicate + let inline existsAsync source = ObservableExtensions.AnyAsync source + + /// Returns the first element of an observable sequence + let inline firstAsync source = ObservableExtensions.FirstAsync source From bcc60f8f51590bf9227c08a8d83ee35c083ddead Mon Sep 17 00:00:00 2001 From: Tuomas Hietanen Date: Fri, 22 Nov 2024 09:26:21 +0000 Subject: [PATCH 2/5] Moved to alphabetical order, added a unit-test, moved some functions from Task to normal namespace. --- docsSrc/Tutorials/Getting_Started.md | 7 +++--- src/FSharp.Control.R3/Observable.fs | 29 ++++++++++++++++------ src/FSharp.Control.R3/TaskObservable.fs | 15 ----------- tests/FSharp.Control.R3.Tests/Tests.fs | 33 +++++++++++++++++++++++++ 4 files changed, 58 insertions(+), 26 deletions(-) diff --git a/docsSrc/Tutorials/Getting_Started.md b/docsSrc/Tutorials/Getting_Started.md index a962e56..a7bf83e 100644 --- a/docsSrc/Tutorials/Getting_Started.md +++ b/docsSrc/Tutorials/Getting_Started.md @@ -48,10 +48,9 @@ Then you do: [lang=bash] paket install FSharp.Control.R3 -Now you have available functions like: +Now you can use functions closer to the traditional F#-style, like: ```fsharp -open FSharp.Control.R3.Async - -Observable.length r3Bus +// Filter events +let interesting = r3Bus |> FSharp.Control.R3.Observable.filter (fun x -> x % 2 = 0) ``` diff --git a/src/FSharp.Control.R3/Observable.fs b/src/FSharp.Control.R3/Observable.fs index 03e9d51..2854472 100644 --- a/src/FSharp.Control.R3/Observable.fs +++ b/src/FSharp.Control.R3/Observable.fs @@ -2,14 +2,13 @@ module FSharp.Control.R3.Observable open R3 -/// Maps the given observable with the given function -let inline map (f : 't -> 'r) source = ObservableExtensions.Select (source, f) +/// Applies an accumulator function over an observable sequence, returning the +/// result of the aggregation as a single element in the result sequence +let inline aggregateAsync cancellationToken seed (f : 'r -> 't -> 'r) source = + ObservableExtensions.AggregateAsync (source, seed, f, cancellationToken) -/// Maps the given observable with the given function and the index of the element -let inline mapi (f : int -> 't -> 'r) source = ObservableExtensions.Select (source, (fun i x -> f x i)) - -/// Filters the observable elements of a sequence based on a predicate -let inline filter (f : 't -> bool) source = ObservableExtensions.Where (source, f) +/// Determines whether all elements of an observable satisfy a predicate +let inline allAsync cancellationToken (f : 't -> bool) source = ObservableExtensions.AllAsync (source, f, cancellationToken) /// Hides the identy of an observable sequence let inline asObservable source : Observable<'Source> = ObservableExtensions.AsObservable source @@ -27,6 +26,22 @@ let inline concat source = ObservableExtensions.Concat source /// Returns an observable sequence that only contains distinct elements let inline distinct source = ObservableExtensions.Distinct source +/// Determines whether an observable sequence contains a specified value +/// which satisfies the given predicate +let inline existsAsync source = ObservableExtensions.AnyAsync source + +/// Returns the first element of an observable sequence +let inline firstAsync source = ObservableExtensions.FirstAsync source + +/// Filters the observable elements of a sequence based on a predicate +let inline filter (f : 't -> bool) source = ObservableExtensions.Where (source, f) + +/// Maps the given observable with the given function +let inline map (f : 't -> 'r) source = ObservableExtensions.Select (source, f) + +/// Maps the given observable with the given function and the index of the element +let inline mapi (f : int -> 't -> 'r) source = ObservableExtensions.Select (source, (fun i x -> f x i)) + /// Bypasses a specified number of elements in an observable sequence and then returns the remaining elements. let inline skip (count : int) (source) = ObservableExtensions.Skip (source, count) diff --git a/src/FSharp.Control.R3/TaskObservable.fs b/src/FSharp.Control.R3/TaskObservable.fs index 1a8e312..7386f5f 100644 --- a/src/FSharp.Control.R3/TaskObservable.fs +++ b/src/FSharp.Control.R3/TaskObservable.fs @@ -31,18 +31,3 @@ module Observable = |> mapAsync options action |> length cancellationToken :> Task - - /// Applies an accumulator function over an observable sequence, returning the - /// result of the aggregation as a single element in the result sequence - let inline aggregateAsync cancellationToken seed (f : 'r -> 't -> 'r) source = - ObservableExtensions.AggregateAsync (source, seed, f, cancellationToken) - - /// Determines whether all elements of an observable satisfy a predicate - let inline allAsync cancellationToken (f : 't -> bool) source = ObservableExtensions.AllAsync (source, f, cancellationToken) - - /// Determines whether an observable sequence contains a specified value - /// which satisfies the given predicate - let inline existsAsync source = ObservableExtensions.AnyAsync source - - /// Returns the first element of an observable sequence - let inline firstAsync source = ObservableExtensions.FirstAsync source diff --git a/tests/FSharp.Control.R3.Tests/Tests.fs b/tests/FSharp.Control.R3.Tests/Tests.fs index 71951ef..e45d2ce 100644 --- a/tests/FSharp.Control.R3.Tests/Tests.fs +++ b/tests/FSharp.Control.R3.Tests/Tests.fs @@ -30,3 +30,36 @@ type ObservableTests () = } |> Async.StartImmediateAsTask :> Task + + + [] + member _.``Test filter`` () = + + let mutable hasvisited = false + use r3Bus = new R3.Subject () + let interesting = + r3Bus + |> FSharp.Control.R3.Observable.filter (fun x -> x % 2 = 0) + + // No-one listens yet (vs R3.ReplaySubject + r3Bus.OnNext 2 + + use subscription = + R3.ObservableExtensions.SubscribeAwait ( + interesting, + fun i cancellationToken -> + task { + // Listen events + + hasvisited <- true + + Assert.AreEqual (4, i) + + return () + } + |> System.Threading.Tasks.ValueTask + ) + + // Publish some events, "4" should be heard + [ 3..5 ] |> List.iter r3Bus.OnNext + Assert.AreEqual (true, hasvisited) From 497f3c143b6d08385dd84cf6bf8bbb47dcb97a6d Mon Sep 17 00:00:00 2001 From: Tuomas Hietanen Date: Fri, 22 Nov 2024 10:12:28 +0000 Subject: [PATCH 3/5] Rename tests to ObservableTests --- tests/FSharp.Control.R3.Tests/FSharp.Control.R3.Tests.fsproj | 2 +- tests/FSharp.Control.R3.Tests/{Tests.fs => ObservableTests.fs} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename tests/FSharp.Control.R3.Tests/{Tests.fs => ObservableTests.fs} (100%) diff --git a/tests/FSharp.Control.R3.Tests/FSharp.Control.R3.Tests.fsproj b/tests/FSharp.Control.R3.Tests/FSharp.Control.R3.Tests.fsproj index 1d0305c..e2d11b7 100644 --- a/tests/FSharp.Control.R3.Tests/FSharp.Control.R3.Tests.fsproj +++ b/tests/FSharp.Control.R3.Tests/FSharp.Control.R3.Tests.fsproj @@ -5,7 +5,7 @@ - + diff --git a/tests/FSharp.Control.R3.Tests/Tests.fs b/tests/FSharp.Control.R3.Tests/ObservableTests.fs similarity index 100% rename from tests/FSharp.Control.R3.Tests/Tests.fs rename to tests/FSharp.Control.R3.Tests/ObservableTests.fs From 7f6b6cb48b4b474ff4f575ceee15304afdbeb42d Mon Sep 17 00:00:00 2001 From: Tuomas Hietanen Date: Fri, 22 Nov 2024 11:00:22 +0000 Subject: [PATCH 4/5] add rxquery builder, similar to FSharp.Control.Reactive --- src/FSharp.Control.R3/Observable.fs | 65 +++++++++++++++++++ tests/FSharp.Control.R3.Tests/BuilderTests.fs | 53 +++++++++++++++ .../FSharp.Control.R3.Tests.fsproj | 1 + 3 files changed, 119 insertions(+) create mode 100644 tests/FSharp.Control.R3.Tests/BuilderTests.fs diff --git a/src/FSharp.Control.R3/Observable.fs b/src/FSharp.Control.R3/Observable.fs index 2854472..45eb805 100644 --- a/src/FSharp.Control.R3/Observable.fs +++ b/src/FSharp.Control.R3/Observable.fs @@ -47,3 +47,68 @@ let inline skip (count : int) (source) = ObservableExtensions.Skip (source, coun /// Takes n elements (from the beginning of an observable sequence? ) let inline take (count : int) (source) = ObservableExtensions.Take (source, count) + + +[] +module Builders = + open System + + /// A reactive query builder. + /// See http://mnajder.blogspot.com/2011/09/when-reactive-framework-meets-f-30.html + type RxQueryBuilder () = + member __.For (s : Observable<_>, body : _ -> Observable<_>) = s.SelectMany (body) + [] + member __.Select (s : Observable<_>, [] selector : _ -> _) = s.Select (selector) + [] + member __.Where (s : Observable<_>, [] predicate : _ -> bool) = s.Where (predicate) + [] + member __.TakeWhile (s : Observable<_>, [] predicate : _ -> bool) = s.TakeWhile (predicate) + [] + member __.Take (s : Observable<_>, count : int) = s.Take (count) + [] + member __.SkipWhile (s : Observable<_>, [] predicate : _ -> bool) = s.SkipWhile (predicate) + [] + member __.Skip (s : Observable<_>, count : int) = s.Skip (count) + member __.Zero () = Observable.Empty (TimeProvider.System) + member __.Yield (value) = Observable.Return (value, TimeProvider.System) + [] + member __.Count (s : Observable<_>) = ObservableExtensions.CountAsync (s) + [] + member __.All (s : Observable<_>, [] predicate : _ -> bool) = s.AllAsync (new Func<_, bool> (predicate)) + [] + member __.Contains (s : Observable<_>, key) = s.ContainsAsync (key) + [] + member __.Distinct (s : Observable<_>) = s.Distinct () + [] + member __.ExactlyOne (s : Observable<_>) = s.SingleAsync () + [] + member __.ExactlyOneOrDefault (s : Observable<_>) = s.SingleOrDefaultAsync () + [] + member __.Find (s : Observable<_>, [] predicate : _ -> bool) = s.FirstAsync (new Func<_, bool> (predicate)) + [] + member __.Head (s : Observable<_>) = s.FirstAsync () + [] + member __.HeadOrDefault (s : Observable<_>) = s.FirstOrDefaultAsync () + [] + member __.Last (s : Observable<_>) = s.LastAsync () + [] + member __.LastOrDefault (s : Observable<_>) = s.LastOrDefaultAsync () + [] + member __.MaxBy (s : Observable<'a>, [] valueSelector : 'a -> 'b) = s.MaxByAsync (new Func<'a, 'b> (valueSelector)) + [] + member __.MinBy (s : Observable<'a>, [] valueSelector : 'a -> 'b) = s.MinByAsync (new Func<'a, 'b> (valueSelector)) + + [] + member inline __.SumBy (s : Observable<_>, [] valueSelector : _ -> _) = + s + .Select(valueSelector) + .AggregateAsync (Unchecked.defaultof<_>, new Func<_, _, _> (fun a b -> a + b)) + + [] + member __.Zip (s1 : Observable<_>, s2 : Observable<_>, [] resultSelector : _ -> _) = + s1.Zip (s2, new Func<_, _, _> (resultSelector)) + + [] + member __.Iter (s : Observable<_>, [] selector : _ -> _) = s.ForEachAsync (selector) + + let rxquery = RxQueryBuilder () diff --git a/tests/FSharp.Control.R3.Tests/BuilderTests.fs b/tests/FSharp.Control.R3.Tests/BuilderTests.fs new file mode 100644 index 0000000..68f41c0 --- /dev/null +++ b/tests/FSharp.Control.R3.Tests/BuilderTests.fs @@ -0,0 +1,53 @@ +namespace FSharp.Control.R3.Tests + +open System +open System.Threading.Tasks +open FSharp.Control.R3.Async +open FSharp.Control.R3.Observable.Builders +open Microsoft.VisualStudio.TestTools.UnitTesting +open Swensen.Unquote + +[] +type BuilderTests () = + + [] + member _.``Test builder rxquery`` () = + + let mutable hasvisited = false + use r3Bus = new R3.Subject () + + let interesting = rxquery { + for i in r3Bus do + where (i % 2 = 0) + select i + + // Same as: + // if i % 2 = 0 then + // yield i + + } + + // No-one listens yet (vs R3.ReplaySubject + r3Bus.OnNext 2 + + use subscription = + R3.ObservableExtensions.SubscribeAwait ( + interesting, + fun i cancellationToken -> + task { + // Listen events + + hasvisited <- true + + Assert.AreEqual (4, i) + + return () + } + |> System.Threading.Tasks.ValueTask + ) + + // Publish some events, "4" should be heard + [ 3..5 ] |> List.iter r3Bus.OnNext + // Note: Query will not be awaited, that's why delay. + System.Threading.Thread.Sleep 300 + Assert.AreEqual (true, hasvisited) diff --git a/tests/FSharp.Control.R3.Tests/FSharp.Control.R3.Tests.fsproj b/tests/FSharp.Control.R3.Tests/FSharp.Control.R3.Tests.fsproj index e2d11b7..865ef9c 100644 --- a/tests/FSharp.Control.R3.Tests/FSharp.Control.R3.Tests.fsproj +++ b/tests/FSharp.Control.R3.Tests/FSharp.Control.R3.Tests.fsproj @@ -5,6 +5,7 @@ + From 7d1ec979901633a8e60621abd0402e80c7712b03 Mon Sep 17 00:00:00 2001 From: Tuomas Hietanen Date: Fri, 22 Nov 2024 12:24:09 +0000 Subject: [PATCH 5/5] relase-notes-update --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index cf78661..a368282 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +- Some basic functions added: `filter`, `bind`, `concat`, `distinct`, `mapi`, `skip`, `take`, ... +- rxquery { ... } builder added ## [0.1.0] - 2024-11-18 First pre-release