Skip to content

Commit b2b8f7a

Browse files
committed
Scratch: observable progress
1 parent ef107e9 commit b2b8f7a

1 file changed

Lines changed: 66 additions & 0 deletions

File tree

src/Demo/Scratch/Observable.fs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,14 @@ module AList =
7878
member x.Dispose() = shutdown()
7979
}
8080

81+
let toObservable (list : alist<'a>) =
82+
{ new IObservable<IndexList<'a> * IndexListDelta<'a> * IndexList<'a>> with
83+
member x.Subscribe(obs : IObserver<IndexList<'a> * IndexListDelta<'a> * IndexList<'a>>) =
84+
list |> observe (fun o d n ->
85+
obs.OnNext(o,d,n)
86+
)
87+
}
88+
8189
module ASet =
8290
let observe (action : HashSet<'a> -> HashSetDelta<'a> -> HashSet<'a> -> unit) (set : aset<'a>) =
8391
let reader = set.GetReader()
@@ -114,6 +122,14 @@ module ASet =
114122
member x.Dispose() = shutdown()
115123
}
116124

125+
let toObservable (set : aset<'a>) =
126+
{ new IObservable<HashSet<'a> * HashSetDelta<'a> * HashSet<'a>> with
127+
member x.Subscribe(obs : IObserver<HashSet<'a> * HashSetDelta<'a> * HashSet<'a>>) =
128+
set |> observe (fun o d n ->
129+
obs.OnNext(o,d,n)
130+
)
131+
}
132+
117133
module AMap =
118134
let observe (action : HashMap<'a, 'b> -> HashMapDelta<'a, 'b> -> HashMap<'a, 'b> -> unit) (map : amap<'a, 'b>) =
119135
let reader = map.GetReader()
@@ -150,6 +166,56 @@ module AMap =
150166
member x.Dispose() = shutdown()
151167
}
152168

169+
let toObservable (map : amap<'a, 'b>) =
170+
{ new IObservable<HashMap<'a, 'b> * HashMapDelta<'a, 'b> * HashMap<'a, 'b>> with
171+
member x.Subscribe(obs : IObserver<HashMap<'a, 'b> * HashMapDelta<'a, 'b> * HashMap<'a, 'b>>) =
172+
map |> observe (fun o d n ->
173+
obs.OnNext(o,d,n)
174+
)
175+
}
176+
177+
module AVal =
178+
let observe (action : option<'a> -> 'a -> unit) (value : aval<'a>) =
179+
let action = OptimizedClosures.FSharpFunc<_,_,_>.Adapt action
180+
let mutable running = true
181+
let signal = AsyncSignal(true)
182+
let sub = value.AddMarkingCallback signal.Pulse
183+
184+
Async.Start <|
185+
async {
186+
let mutable old = None
187+
while running do
188+
do! Async.AwaitTask (signal.Wait())
189+
if running then
190+
do! Async.SwitchToThreadPool()
191+
192+
let v = value.GetValue AdaptiveToken.Top
193+
match old with
194+
| Some o when DefaultEquality.equals o v -> ()
195+
| _ ->
196+
action.Invoke(old, v)
197+
old <- Some v
198+
199+
}
200+
201+
let shutdown() =
202+
sub.Dispose()
203+
running <- false
204+
signal.Pulse()
205+
206+
{ new IDisposable with
207+
member x.Dispose() = shutdown()
208+
}
209+
210+
let toObservable (value : aval<'a>) =
211+
{ new IObservable<option<'a> * 'a> with
212+
member x.Subscribe(obs : IObserver<option<'a> * 'a>) =
213+
value |> observe (fun o n ->
214+
obs.OnNext(o,n)
215+
)
216+
}
217+
218+
153219

154220
let run() =
155221
let a = clist []

0 commit comments

Comments
 (0)