ReactiveGraphs.jl
This package provides a framework to run computations in a tolological order of the dependency graph. It aims to be fast and allocation free, for low-latency applications.
What does it do?
Let's give an example.
Here, A
, B
, and C
represent some inputs/sources to the graph. Meaning they contain some data. The nodes 1
, 2
and 3
correspond to derived data sets:
1
depends onA
andB
,2
depends onB
,3
depends on1
,2
andC
.
This only mean that when A
is updated, the nodes 1
and 3
must be updated, and in this order. When B
is updated, nodes 1
, 2
and 3
must be updated, either in (1
, 2
, 3
) or (2
, 1
, 3
) orders. Those 2 orders are called topological, for the DAG displayed above.
Getting Started
To build a graph, one needs to start with some inputs, which are roots of the graph. We can create inputs by providing any type, or by providing an object that will be mutated.
input_1 = input(Int64)
input_2 = input(Ref(0))
For now, when creating an input from an object, the initial value will not be considered initialized before changing its value, or updating it.
Then, derived nodes can be build with map
node_1 = map(input_1) do x
println("node 1: $(2x)")
2x
end
node_2 = map(node_1, input_1, input_2) do x, y, z
println("node 2: $(x + y + z[])")
x + y + z[]
end
To ensure type stability, and performance, the types of the nodes are resolved at this stage. Hence the methods used in map must already be defined at this stage. We can verify that node_2
defined above contains a value of type Int
.
julia> eltype(node_2)
Int64
To use this graph, we need to push values into the two inputs. We can either push a new value, or a function that mutates the current value. To do so, we need to wrap our inputs into a Source
, and push to the sources, and not the inputs directly.
s1 = Source(input_1) # Captures the current state of the graph. Nodes must not be added afterwards.
s2 = Source(input_2) # Captures the current state of the graph. Nodes must not be added afterwards.
push!(s1, 1) # prints "node 1: 2". The second node cannot be evaluated since the data is missing
push!(s2, x -> x[] = 2)# prints "node 2: 5"
push!(s1, 3) # prints "node 1: 6" "node 2: 11".
Introducing this wrapping may seem a bit cumbersome to the user. But it is used to achieve high performance. When creating a source, the current state of the graph is being captured in the parameters of the Source
type. This allows to dispatch the subsequent calls to push!
on performant generated methods. This capture of the the graph implies that we first need to build the complete graph before wrapping the inputs into sources. Otherwise, the nodes added subsequently will be ignored.
As explained, in the introduction, each time an input is updated, the data will flow down the graph, updating all children nodes.
We can notice how updating the first input only updates node_2
once. This differs with simple reactive programming implementations, where the graph is generally traversed in a depth-first manner, with repetitions (typycally if the graph is not a tree). Here the graph is traversed in the topologic order of the construction.
Updating several nodes at once
Sometimes, it can be useful to update several inputs synchronously. To do so, the users can push!
to pairs of source and their corresponding value/"mutating function".
n1 = input(Int)
n2 = input(Ref(0))
map(n1, n2) do x, y
println(x + y[])
end
push!(Source(n1) => 1, Source(n2) => (x->x[]=2)) # prints "3"
Controlling the flow of the graph
This package provides a way to avoid direct filter
and indirect select
triggering of children.
Filtering
Consider the following case:
input_1 = input(Float64)
n = map(x->println("new update: $x"), input_1)
To avoid triggering node n
when the value of input_1
is NaN
, one can use filter
.
input_1 = input(Float64)
filtered = filter(x->!isnan(x), input_1)
n = map(x->println("new update: $x"), filtered)
s1 = Source(input_1) # compiles the graph. Nodes must not be added afterwards.
push!(s1, 1.0) # prints "new update: 1.0"
push!(s1, NaN) # prints nothing
Selecting
Consider now the following case:
input_1 = input(Float64)
input_2 = input(Float64)
filtered = filter(x->!isnan(x), input_1)
n = map(filtered, input_2) do x, y
println("new update: $(x+y)")
x + y
end
Even if input_1
if filtered, when input_2
is triggered, the value of filtered
will be used, whether the filtering condition is activated or not. To prevent the computation of n
, users should use select
instead:
input_1 = input(Float64)
input_2 = input(Nothing)
filtered = filter(x->!isnan(x), input_1)
selected = select(x->!isnan(x), input_1)
map((x,y) -> println("filtered"), filtered, input_2)
map((x,y) -> println("selected"), selected, input_2)
s1 = Source(input_1) # compiles the graph. Nodes must not be added afterwards.
s2 = Source(input_2) # compiles the graph. Nodes must not be added afterwards.
push!(s1, 1.0) # prints nothing
push!(s2, nothing) # prints "filtered" and then "selected"
push!(s1, NaN) # prints nothing
push!(s2, nothing) # prints "filtered" only
Comparison with Observables.jl
ReactiveGraphs.jl
executes nodes in a topological order of the graph, while Observables.jl
uses a depth-first ordering (each node calling its children).
using Observables
x1 = Observable(nothing)
x2 = map(x->println("x2"), x1)
x3 = map(x->println("x3"), x1)
x4 = map((x, y)->println("x4"), x2, x3)
x1[] = nothing # prints x2 x4 x3 x4
using ReactiveGraphs
x1 = input(nothing)
x2 = map(x->println("x2"), x1)
x3 = map(x->println("x3"), x1)
x4 = map((x,y)->println("x4"), x2, x3)
push!(Source(x1), nothing) # prints x2 x3 x4
Also, ReactiveGraphs.jl
is designed for performance, in the case of static graphs, while Observables is designed for dynamic graphs.
julia> using BenchmarkTools
julia> x1 = Observable(1)
x2 = map(x->x+1, x1)
@benchmark setindex!($x1, 2)
BenchmarkTools.Trial: 10000 samples with 195 evaluations.
Range (min … max): 484.185 ns … 6.983 μs ┊ GC (min … max): 0.00% … 90.47%
Time (median): 490.810 ns ┊ GC (median): 0.00%
Time (mean ± σ): 496.060 ns ± 72.381 ns ┊ GC (mean ± σ): 0.18% ± 1.22%
▄██▆▃▁
▁▃▆██████▇▅▄▄▄▃▃▃▄▃▃▃▃▃▂▃▃▂▂▂▂▂▂▂▂▂▁▂▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁ ▂
484 ns Histogram: frequency by time 534 ns <
Memory estimate: 48 bytes, allocs estimate: 3.
julia> x1 = input(Int)
x2 = map(x->x+1, x1)
s = Source(x1)
@benchmark push!($x1, 2)
BenchmarkTools.Trial: 10000 samples with 1000 evaluations.
Range (min … max): 1.500 ns … 10.625 ns ┊ GC (min … max): 0.00% … 0.00%
Time (median): 1.542 ns ┊ GC (median): 0.00%
Time (mean ± σ): 1.573 ns ± 0.221 ns ┊ GC (mean ± σ): 0.00% ± 0.00%
█ ▇ ▃ ▃ ▁ ▁
█▁▁▁▁▁▁▁▁█▁▁▁▁▁▁▁▁▁█▁▁▁▁▁▁▁▁█▁▁▁▁▁▁▁▁▁█▁▁▁▁▁▁▁▁▁█▁▁▁▁▁▁▁▁▅ █
1.5 ns Histogram: log(frequency) by time 1.75 ns <
Memory estimate: 0 bytes, allocs estimate: 0.
Benchmark
This library is meant to be fast, and allocation free. Here is an example using the main operations of ReactiveGraphs.jl.
julia>i1 = input(Int)
i2 = input(Bool)
i3 = input(Bool)
i1f = filter(i1, i2)
i1s = select(i1, i3)
n2 = map(x->x+1, i1f)
n3 = foldl((state, x)-> state + x, 1, i1s)
n4 = inlinedmap(+,n2,n3)
n5 = lag(1, n4)
s1 = Source(i1)
s2 = Source(i2)
s3 = Source(i3)
push!(s1, 1)
push!(s2, true)
push!(s3, true)
v = 1
@benchmark push!($s1, $v)
BenchmarkTools.Trial: 10000 samples with 1000 evaluations.
Range (min … max): 8.708 ns … 27.625 ns ┊ GC (min … max): 0.00% … 0.00%
Time (median): 8.792 ns ┊ GC (median): 0.00%
Time (mean ± σ): 8.800 ns ± 0.252 ns ┊ GC (mean ± σ): 0.00% ± 0.00%
▂ ▆ █ ▄▃ ▂ ▂ ▁
█▁▁▁▁▁▁▁█▁▁▁▁▁▁▁█▁▁▁▁▁▁▁██▁▁▁▁▁▁▁█▁▁▁▁▁▁▁█▁▁▁▁▁▁▁▆▁▁▁▁▁▁▁▆ █
8.71 ns Histogram: log(frequency) by time 9 ns <
Memory estimate: 0 bytes, allocs estimate: 0.
API
Base.filter
Base.filter
Base.foldl
Base.map
ReactiveGraphs.constant
ReactiveGraphs.inlinedmap
ReactiveGraphs.input
ReactiveGraphs.input
ReactiveGraphs.lag
ReactiveGraphs.quiet
ReactiveGraphs.select
ReactiveGraphs.select
ReactiveGraphs.updated
ReactiveGraphs.Node
ReactiveGraphs.Source
ReactiveGraphs.Node
— TypeNode{name}
Objects of type Node
correspond to the nodes of the computational graph. Each node is identified by a uniquely generated name name
.
ReactiveGraphs.Source
— MethodSource(::Node)
Transforms an input node into a Source
, which is a type stable version of the former. This type is used to update the roots of the graph with Base.push!
. The input objects are not used directly, for performance considerations.
julia> i = input(String)
map(println, i)
s = Source(i)
push!(s, "example")
example
julia> i = input(Ref(0))
map(x->println(x[]), i)
s = Source(i)
push!(s, ref -> ref[] = 123)
123
Sources can also be used simultaneously,
julia> i1 = input(Int)
i2 = input(Int)
m = map(+, i1, i2)
map(print, m)
s1 = Source(i1)
s2 = Source(i2)
push!(s1 => 1, s2 => 2)
push!(s1 => 3, s2 => 4)
37
Base.filter
— Methodfilter(f::function, x::Node; name)
Bulds a node that contains the same value as node x
, but that only forwards an update when the function f
is returns true
, while evaluated on the value of node x
.
If name
is provided, it will be appended to the generated symbol that identifies the node.
Base.filter
— Methodfilter(x::Node, condition::Node; name)
Bulds a node that contains the same value as node x
, but that does not update its children when the value of condition
is true.
If name
is provided, it will be appended to the generated symbol that identifies the node.
Base.foldl
— Methodfoldl(f, state, arg::Node, args::Node...; name)
Creates a node that contains a state initialized by state
. When any of the arguments are updated, the state is updated by calling state_updated = f(state, arg, args...)
.
If name
is provided, it will be appended to the generated symbol that identifies the node.
Base.map
— Methodmap(f, arg::Node, args::Node...; name)
Creates a node whose value is given by calling f
with the values of the nodes (arg, arg...)
.
If name
is provided, it will be appended to the generated symbol that identifies the node.
ReactiveGraphs.constant
— Methodconstant(x; name)
Bulds a node that contains the constant value x, and that does not propagate directly. If x
is a Bool
, then the constant value will be propagated by Julia's compiler.
If name
is provided, it will be appended to the generated symbol that identifies the node.
ReactiveGraphs.inlinedmap
— Methodinlinedmap(f, arg::Node, args::Node...; name)
Similarly to map
, creates a node whose value is given by calling f
with the values of the nodes (arg, arg...)
. Contrarily to map, the value is not stored, and the function call is performed each time the value of the node is required.
See the implementation of lag
for a use case example.
If name
is provided, it will be appended to the generated symbol that identifies the node.
ReactiveGraphs.input
— Methodinput(x; name)
Creates a node that contains x
. This should be used when x
is mutable. To push a value in the node, one need to wrap it in a Source
, and call setindex!
. See Source
.
If name
is provided, it will be appended to the generated symbol that identifies the node.
Example:
julia> i = input(Ref(0))
s = Source(i)
push!(s, x -> x[] = 1)
ReactiveGraphs.input
— Methodinput(::Type{T}; name)
Creates a node that will contain a element of type T
. To push a value in the node, one need to wrap it in a Source
, and call setindex!
. See Source
.
If name
is provided, it will be appended to the generated symbol that identifies the node.
Example:
julia> i = input(Int)
s = Source(i)
push!(s, 1)
ReactiveGraphs.lag
— Methodlag(n::Integer, node::Node; name)
Creates a node that contains the n
-th lagged value of node
.
If name
is provided, it will be appended to the generated symbol that identifies the node.
Example:
julia> i = input(Int)
n = lag(2, i)
map(print, n)
s = Source(i)
for x = 1:7
push!(s, x)
end
12345
ReactiveGraphs.quiet
— Methodquiet(node::Node; name)
Creates a node that contains the same value as node
, but does not trigger its children.
If name
is provided, it will be appended to the generated symbol that identifies the node.
ReactiveGraphs.select
— Methodselect(f::Function, x::Node; name)
Bulds a node that contains the same value as node x
, but that prevents updating its children when the value of map(f, x)
is false
.
If name
is provided, it will be appended to the generated symbol that identifies the node.
ReactiveGraphs.select
— Methodselect(x::Node, condition::Node; name)
Bulds a node that contains the same value as node x
, but that prevents updating its children when the value of condition
is false
.
If name
is provided, it will be appended to the generated symbol that identifies the node.
ReactiveGraphs.updated
— Methodupdated(arg::Node; name)
Creates a node that contains true when the node arg has been updated, false otherwise.
If name
is provided, it will be appended to the generated symbol that identifies the node.