Skip to content

Commit

Permalink
Merge pull request #4 from Thorium/more-functions
Browse files Browse the repository at this point in the history
Added more functions, and `rxquery { ... }`
  • Loading branch information
xperiandri authored Nov 22, 2024
2 parents 6619b48 + 7d1ec97 commit b8c007b
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 37 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
- Some basic functions added: `filter`, `bind`, `concat`, `distinct`, `mapi`, `skip`, `take`, ...
- rxquery { ... } builder added

## [0.1.0] - 2024-11-18
First pre-release
Expand Down
7 changes: 3 additions & 4 deletions docsSrc/Tutorials/Getting_Started.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,9 @@ Then you do:
[lang=bash]
paket install FSharp.Control.R3

Now you have available functions like:
Now you can use functions closer to the traditional F#-style, like:

```fsharp
open FSharp.Control.R3.Async
Observable.length r3Bus
// Filter events
let interesting = r3Bus |> FSharp.Control.R3.Observable.filter (fun x -> x % 2 = 0)
```
109 changes: 109 additions & 0 deletions src/FSharp.Control.R3/Observable.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,113 @@ module FSharp.Control.R3.Observable

open R3

/// Applies an accumulator function over an observable sequence, returning the
/// result of the aggregation as a single element in the result sequence
let inline aggregateAsync cancellationToken seed (f : 'r -> 't -> 'r) source =
ObservableExtensions.AggregateAsync (source, seed, f, cancellationToken)

/// Determines whether all elements of an observable satisfy a predicate
let inline allAsync cancellationToken (f : 't -> bool) source = ObservableExtensions.AllAsync (source, f, cancellationToken)

/// Hides the identy of an observable sequence
let inline asObservable source : Observable<'Source> = ObservableExtensions.AsObservable source

/// Binds an observable to generate a subsequent observable.
let inline bind (f : 'T -> Observable<'TNext>) source = ObservableExtensions.SelectMany (source, f)

/// Converts the elements of the sequence to the specified type
let inline cast<'T, 'CastType> (source) = ObservableExtensions.Cast<'T, 'CastType> (source)

/// Concatenates the second observable sequence to the first observable sequence
/// upn the successful termination of the first
let inline concat source = ObservableExtensions.Concat source

/// Returns an observable sequence that only contains distinct elements
let inline distinct source = ObservableExtensions.Distinct source

/// Determines whether an observable sequence contains a specified value
/// which satisfies the given predicate
let inline existsAsync source = ObservableExtensions.AnyAsync source

/// Returns the first element of an observable sequence
let inline firstAsync source = ObservableExtensions.FirstAsync source

/// Filters the observable elements of a sequence based on a predicate
let inline filter (f : 't -> bool) source = ObservableExtensions.Where (source, f)

/// Maps the given observable with the given function
let inline map (f : 't -> 'r) source = ObservableExtensions.Select (source, f)

/// Maps the given observable with the given function and the index of the element
let inline mapi (f : int -> 't -> 'r) source = ObservableExtensions.Select (source, (fun i x -> f x i))

/// Bypasses a specified number of elements in an observable sequence and then returns the remaining elements.
let inline skip (count : int) (source) = ObservableExtensions.Skip (source, count)

/// Takes n elements (from the beginning of an observable sequence? )
let inline take (count : int) (source) = ObservableExtensions.Take (source, count)


[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module Builders =
open System

/// A reactive query builder.
/// See http://mnajder.blogspot.com/2011/09/when-reactive-framework-meets-f-30.html
type RxQueryBuilder () =
member __.For (s : Observable<_>, body : _ -> Observable<_>) = s.SelectMany (body)
[<CustomOperation("select", AllowIntoPattern = true)>]
member __.Select (s : Observable<_>, [<ProjectionParameter>] selector : _ -> _) = s.Select (selector)
[<CustomOperation("where", MaintainsVariableSpace = true, AllowIntoPattern = true)>]
member __.Where (s : Observable<_>, [<ProjectionParameter>] predicate : _ -> bool) = s.Where (predicate)
[<CustomOperation("takeWhile", MaintainsVariableSpace = true, AllowIntoPattern = true)>]
member __.TakeWhile (s : Observable<_>, [<ProjectionParameter>] predicate : _ -> bool) = s.TakeWhile (predicate)
[<CustomOperation("take", MaintainsVariableSpace = true, AllowIntoPattern = true)>]
member __.Take (s : Observable<_>, count : int) = s.Take (count)
[<CustomOperation("skipWhile", MaintainsVariableSpace = true, AllowIntoPattern = true)>]
member __.SkipWhile (s : Observable<_>, [<ProjectionParameter>] predicate : _ -> bool) = s.SkipWhile (predicate)
[<CustomOperation("skip", MaintainsVariableSpace = true, AllowIntoPattern = true)>]
member __.Skip (s : Observable<_>, count : int) = s.Skip (count)
member __.Zero () = Observable.Empty (TimeProvider.System)
member __.Yield (value) = Observable.Return (value, TimeProvider.System)
[<CustomOperation("count")>]
member __.Count (s : Observable<_>) = ObservableExtensions.CountAsync (s)
[<CustomOperation("all")>]
member __.All (s : Observable<_>, [<ProjectionParameter>] predicate : _ -> bool) = s.AllAsync (new Func<_, bool> (predicate))
[<CustomOperation("contains")>]
member __.Contains (s : Observable<_>, key) = s.ContainsAsync (key)
[<CustomOperation("distinct", MaintainsVariableSpace = true, AllowIntoPattern = true)>]
member __.Distinct (s : Observable<_>) = s.Distinct ()
[<CustomOperation("exactlyOne")>]
member __.ExactlyOne (s : Observable<_>) = s.SingleAsync ()
[<CustomOperation("exactlyOneOrDefault")>]
member __.ExactlyOneOrDefault (s : Observable<_>) = s.SingleOrDefaultAsync ()
[<CustomOperation("find")>]
member __.Find (s : Observable<_>, [<ProjectionParameter>] predicate : _ -> bool) = s.FirstAsync (new Func<_, bool> (predicate))
[<CustomOperation("head")>]
member __.Head (s : Observable<_>) = s.FirstAsync ()
[<CustomOperation("headOrDefault")>]
member __.HeadOrDefault (s : Observable<_>) = s.FirstOrDefaultAsync ()
[<CustomOperation("last")>]
member __.Last (s : Observable<_>) = s.LastAsync ()
[<CustomOperation("lastOrDefault")>]
member __.LastOrDefault (s : Observable<_>) = s.LastOrDefaultAsync ()
[<CustomOperation("maxBy")>]
member __.MaxBy (s : Observable<'a>, [<ProjectionParameter>] valueSelector : 'a -> 'b) = s.MaxByAsync (new Func<'a, 'b> (valueSelector))
[<CustomOperation("minBy")>]
member __.MinBy (s : Observable<'a>, [<ProjectionParameter>] valueSelector : 'a -> 'b) = s.MinByAsync (new Func<'a, 'b> (valueSelector))

[<CustomOperation("sumBy")>]
member inline __.SumBy (s : Observable<_>, [<ProjectionParameter>] valueSelector : _ -> _) =
s
.Select(valueSelector)
.AggregateAsync (Unchecked.defaultof<_>, new Func<_, _, _> (fun a b -> a + b))

[<CustomOperation("zip", IsLikeZip = true)>]
member __.Zip (s1 : Observable<_>, s2 : Observable<_>, [<ProjectionParameter>] resultSelector : _ -> _) =
s1.Zip (s2, new Func<_, _, _> (resultSelector))

[<CustomOperation("iter")>]
member __.Iter (s : Observable<_>, [<ProjectionParameter>] selector : _ -> _) = s.ForEachAsync (selector)

let rxquery = RxQueryBuilder ()
4 changes: 4 additions & 0 deletions src/FSharp.Control.R3/TaskObservable.fs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ open FSharp.Control.R3

module Observable =

/// Maps the given observable with the given function
let mapAsync (options : ProcessingOptions) (f : CancellationToken -> 't -> Task<'r>) source =
let selector x ct = ValueTask<'r> (f ct x)
ObservableExtensions.SelectAwait (
Expand All @@ -20,6 +21,9 @@ module Observable =

let length cancellationToken source = ObservableExtensions.CountAsync (source, cancellationToken)

/// Invokes an action for each element in the observable sequence, and propagates all observer
/// messages through the result sequence. This method can be used for debugging, logging, etc. of query
/// behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
let inline iter cancellationToken (action : 't -> unit) source = ObservableExtensions.ForEachAsync (source, action, cancellationToken)

let iterAsync cancellationToken options (action : CancellationToken -> 't -> Task<unit>) source =
Expand Down
53 changes: 53 additions & 0 deletions tests/FSharp.Control.R3.Tests/BuilderTests.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
namespace FSharp.Control.R3.Tests

open System
open System.Threading.Tasks
open FSharp.Control.R3.Async
open FSharp.Control.R3.Observable.Builders
open Microsoft.VisualStudio.TestTools.UnitTesting
open Swensen.Unquote

[<TestClass>]
type BuilderTests () =

[<TestMethod>]
member _.``Test builder rxquery`` () =

let mutable hasvisited = false
use r3Bus = new R3.Subject<int> ()

let interesting = rxquery {
for i in r3Bus do
where (i % 2 = 0)
select i

// Same as:
// if i % 2 = 0 then
// yield i

}

// No-one listens yet (vs R3.ReplaySubject<int>
r3Bus.OnNext 2

use subscription =
R3.ObservableExtensions.SubscribeAwait (
interesting,
fun i cancellationToken ->
task {
// Listen events

hasvisited <- true

Assert.AreEqual<int> (4, i)

return ()
}
|> System.Threading.Tasks.ValueTask
)

// Publish some events, "4" should be heard
[ 3..5 ] |> List.iter r3Bus.OnNext
// Note: Query will not be awaited, that's why delay.
System.Threading.Thread.Sleep 300
Assert.AreEqual<bool> (true, hasvisited)
3 changes: 2 additions & 1 deletion tests/FSharp.Control.R3.Tests/FSharp.Control.R3.Tests.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
</PropertyGroup>

<ItemGroup>
<Compile Include="Tests.fs" />
<Compile Include="BuilderTests.fs" />
<Compile Include="ObservableTests.fs" />
</ItemGroup>

<ItemGroup>
Expand Down
65 changes: 65 additions & 0 deletions tests/FSharp.Control.R3.Tests/ObservableTests.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
namespace FSharp.Control.R3.Tests

open System
open System.Threading.Tasks
open FSharp.Control.R3.Async
open Microsoft.VisualStudio.TestTools.UnitTesting
open Swensen.Unquote

[<TestClass>]
type ObservableTests () =

[<TestMethod>]
member _.``Test length`` () : Task =

async {
use r3Bus = new R3.Subject<int> ()

r3Bus.OnNext 1

let lengthObs = Observable.length r3Bus

r3Bus.OnNext 2
r3Bus.OnNext 3
r3Bus.OnCompleted (R3.Result.Success)

let! res = lengthObs

Assert.AreEqual<int> (0, res)

}
|> Async.StartImmediateAsTask
:> Task


[<TestMethod>]
member _.``Test filter`` () =

let mutable hasvisited = false
use r3Bus = new R3.Subject<int> ()
let interesting =
r3Bus
|> FSharp.Control.R3.Observable.filter (fun x -> x % 2 = 0)

// No-one listens yet (vs R3.ReplaySubject<int>
r3Bus.OnNext 2

use subscription =
R3.ObservableExtensions.SubscribeAwait (
interesting,
fun i cancellationToken ->
task {
// Listen events

hasvisited <- true

Assert.AreEqual<int> (4, i)

return ()
}
|> System.Threading.Tasks.ValueTask
)

// Publish some events, "4" should be heard
[ 3..5 ] |> List.iter r3Bus.OnNext
Assert.AreEqual<bool> (true, hasvisited)
32 changes: 0 additions & 32 deletions tests/FSharp.Control.R3.Tests/Tests.fs

This file was deleted.

0 comments on commit b8c007b

Please sign in to comment.