ReactiveGraphs.jl

This package provides a framework to run computations in a tolological order of the dependency graph. It aims to be fast and allocation free, for low-latency applications.

What does it do?

Let's give an example.

Example of computational graph

Here, A, B, and C represent some inputs/sources to the graph. Meaning they contain some data. The nodes 1, 2 and 3 correspond to derived data sets:

  • 1 depends on A and B,
  • 2 depends on B,
  • 3 depends on 1, 2 and C.

This only mean that when A is updated, the nodes 1 and 3 must be updated, and in this order. When B is updated, nodes 1, 2 and 3 must be updated, either in (1, 2, 3) or (2, 1, 3) orders. Those 2 orders are called topological, for the DAG displayed above.

Getting Started

To build a graph, one needs to start with some inputs, which are roots of the graph. We can create inputs by providing any type, or by providing an object that will be mutated.

input_1 = input(Int64)
input_2 = input(Ref(0))
Inputs are never considered initialized

For now, when creating an input from an object, the initial value will not be considered initialized before changing its value, or updating it.

Then, derived nodes can be build with map

node_1 = map(input_1) do x
    println("node 1: $(2x)")
    2x 
end
node_2 = map(node_1, input_1, input_2) do x, y, z
    println("node 2: $(x + y + z[])")
    x + y + z[]
end
Note

To ensure type stability, and performance, the types of the nodes are resolved at this stage. Hence the methods used in map must already be defined at this stage. We can verify that node_2 defined above contains a value of type Int.

julia> eltype(node_2)
Int64

To use this graph, we need to push values into the two inputs. We can either push a new value, or a function that mutates the current value. To do so, we need to wrap our inputs into a Source, and push to the sources, and not the inputs directly.

s1 = Source(input_1) # Captures the current state of the graph. Nodes must not be added afterwards.
s2 = Source(input_2) # Captures the current state of the graph. Nodes must not be added afterwards.
push!(s1, 1) # prints "node 1: 2". The second node cannot be evaluated since the data is missing
push!(s2, x -> x[] = 2)# prints "node 2: 5"
push!(s1, 3) # prints "node 1: 6" "node 2: 11".

Introducing this wrapping may seem a bit cumbersome to the user. But it is used to achieve high performance. When creating a source, the current state of the graph is being captured in the parameters of the Source type. This allows to dispatch the subsequent calls to push! on performant generated methods. This capture of the the graph implies that we first need to build the complete graph before wrapping the inputs into sources. Otherwise, the nodes added subsequently will be ignored.

As explained, in the introduction, each time an input is updated, the data will flow down the graph, updating all children nodes.

Note

We can notice how updating the first input only updates node_2 once. This differs with simple reactive programming implementations, where the graph is generally traversed in a depth-first manner, with repetitions (typycally if the graph is not a tree). Here the graph is traversed in the topologic order of the construction.

Updating several nodes at once

Sometimes, it can be useful to update several inputs synchronously. To do so, the users can push! to pairs of source and their corresponding value/"mutating function".

n1 = input(Int)
n2 = input(Ref(0))
map(n1, n2) do x, y
    println(x + y[])
end
push!(Source(n1) => 1, Source(n2) => (x->x[]=2)) # prints "3"

Controlling the flow of the graph

This package provides a way to avoid direct filter and indirect select triggering of children.

Filtering

Consider the following case:

input_1 = input(Float64)
n = map(x->println("new update: $x"), input_1)

To avoid triggering node n when the value of input_1 is NaN, one can use filter.

input_1 = input(Float64)
filtered = filter(x->!isnan(x), input_1)
n = map(x->println("new update: $x"), filtered)

s1 = Source(input_1) # compiles the graph. Nodes must not be added afterwards.
push!(s1, 1.0) # prints "new update: 1.0"
push!(s1, NaN) # prints nothing

Selecting

Consider now the following case:

input_1 = input(Float64)
input_2 = input(Float64)
filtered = filter(x->!isnan(x), input_1)
n = map(filtered, input_2) do x, y
    println("new update: $(x+y)")
    x + y
end

Even if input_1 if filtered, when input_2 is triggered, the value of filtered will be used, whether the filtering condition is activated or not. To prevent the computation of n, users should use select instead:

input_1 = input(Float64)
input_2 = input(Nothing)
filtered = filter(x->!isnan(x), input_1)
selected = select(x->!isnan(x), input_1)
map((x,y) -> println("filtered"), filtered, input_2)
map((x,y) -> println("selected"), selected, input_2)
    
s1 = Source(input_1) # compiles the graph. Nodes must not be added afterwards.
s2 = Source(input_2) # compiles the graph. Nodes must not be added afterwards.
push!(s1, 1.0) # prints nothing 
push!(s2, nothing) # prints "filtered" and then "selected"
push!(s1, NaN) # prints nothing 
push!(s2, nothing) # prints "filtered" only

Comparison with Observables.jl

ReactiveGraphs.jl executes nodes in a topological order of the graph, while Observables.jl uses a depth-first ordering (each node calling its children).

using Observables
x1 = Observable(nothing)
x2 = map(x->println("x2"), x1)
x3 = map(x->println("x3"), x1)
x4 = map((x, y)->println("x4"), x2, x3)
x1[] = nothing # prints x2 x4 x3 x4

using ReactiveGraphs
x1 = input(nothing)
x2 = map(x->println("x2"), x1)
x3 = map(x->println("x3"), x1)
x4 = map((x,y)->println("x4"), x2, x3)
push!(Source(x1), nothing) # prints x2 x3 x4

Also, ReactiveGraphs.jl is designed for performance, in the case of static graphs, while Observables is designed for dynamic graphs.

julia> using BenchmarkTools
julia> x1 = Observable(1)
       x2 = map(x->x+1, x1)
       @benchmark setindex!($x1, 2)
BenchmarkTools.Trial: 10000 samples with 195 evaluations.
 Range (min … max):  484.185 ns …  6.983 μs  ┊ GC (min … max): 0.00% … 90.47%
 Time  (median):     490.810 ns              ┊ GC (median):    0.00%
 Time  (mean ± σ):   496.060 ns ± 72.381 ns  ┊ GC (mean ± σ):  0.18% ±  1.22%

     ▄██▆▃▁                                                     
  ▁▃▆██████▇▅▄▄▄▃▃▃▄▃▃▃▃▃▂▃▃▂▂▂▂▂▂▂▂▂▁▂▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁ ▂
  484 ns          Histogram: frequency by time          534 ns <

 Memory estimate: 48 bytes, allocs estimate: 3.

julia> x1 = input(Int)
       x2 = map(x->x+1, x1)
       s = Source(x1)
       @benchmark push!($x1, 2)
BenchmarkTools.Trial: 10000 samples with 1000 evaluations.
 Range (min … max):  1.500 ns … 10.625 ns  ┊ GC (min … max): 0.00% … 0.00%
 Time  (median):     1.542 ns              ┊ GC (median):    0.00%
 Time  (mean ± σ):   1.573 ns ±  0.221 ns  ┊ GC (mean ± σ):  0.00% ± 0.00%

           █         ▇        ▃         ▃         ▁          ▁
  █▁▁▁▁▁▁▁▁█▁▁▁▁▁▁▁▁▁█▁▁▁▁▁▁▁▁█▁▁▁▁▁▁▁▁▁█▁▁▁▁▁▁▁▁▁█▁▁▁▁▁▁▁▁▅ █
  1.5 ns       Histogram: log(frequency) by time     1.75 ns <

  Memory estimate: 0 bytes, allocs estimate: 0.

Benchmark

This library is meant to be fast, and allocation free. Here is an example using the main operations of ReactiveGraphs.jl.

julia>i1 = input(Int)
      i2 = input(Bool)
      i3 = input(Bool)
      i1f = filter(i1, i2)
      i1s = select(i1, i3)
      n2 = map(x->x+1, i1f)
      n3 = foldl((state, x)-> state + x, 1, i1s)
      n4 = inlinedmap(+,n2,n3)
      n5 = lag(1, n4)
      
      s1 = Source(i1)
      s2 = Source(i2)
      s3 = Source(i3)
      push!(s1, 1)
      push!(s2, true)
      push!(s3, true)
      v = 1
      @benchmark push!($s1, $v)
BenchmarkTools.Trial: 10000 samples with 1000 evaluations.                                      
Range (min … max):  8.708 ns … 27.625 ns  ┊ GC (min … max): 0.00% … 0.00%                      
Time  (median):     8.792 ns              ┊ GC (median):    0.00%                              
Time  (mean ± σ):   8.800 ns ±  0.252 ns  ┊ GC (mean ± σ):  0.00% ± 0.00%                      

 ▂       ▆       █       ▄▃       ▂       ▂                 ▁                                  
 █▁▁▁▁▁▁▁█▁▁▁▁▁▁▁█▁▁▁▁▁▁▁██▁▁▁▁▁▁▁█▁▁▁▁▁▁▁█▁▁▁▁▁▁▁▆▁▁▁▁▁▁▁▆ █                                  
 8.71 ns      Histogram: log(frequency) by time        9 ns <                                  

Memory estimate: 0 bytes, allocs estimate: 0.    

API

ReactiveGraphs.NodeType
Node{name}

Objects of type Node correspond to the nodes of the computational graph. Each node is identified by a uniquely generated name name.

source
ReactiveGraphs.SourceMethod
Source(::Node)

Transforms an input node into a Source, which is a type stable version of the former. This type is used to update the roots of the graph with Base.push!. The input objects are not used directly, for performance considerations.

julia> i = input(String)
       map(println, i)
       s = Source(i)
       push!(s, "example")
example

julia> i = input(Ref(0))
       map(x->println(x[]), i)
       s = Source(i)
       push!(s, ref -> ref[] = 123)
123

Sources can also be used simultaneously,

julia> i1 = input(Int)
       i2 = input(Int)
       m = map(+, i1, i2)
       map(print, m)
       s1 = Source(i1)
       s2 = Source(i2)
       push!(s1 => 1, s2 => 2)
       push!(s1 => 3, s2 => 4)
37
source
Base.filterMethod
filter(f::function, x::Node; name)

Bulds a node that contains the same value as node x, but that only forwards an update when the function f is returns true, while evaluated on the value of node x.

If name is provided, it will be appended to the generated symbol that identifies the node.

source
Base.filterMethod
filter(x::Node, condition::Node; name)

Bulds a node that contains the same value as node x, but that does not update its children when the value of condition is true.

If name is provided, it will be appended to the generated symbol that identifies the node.

source
Base.foldlMethod
foldl(f, state, arg::Node, args::Node...; name)

Creates a node that contains a state initialized by state. When any of the arguments are updated, the state is updated by calling state_updated = f(state, arg, args...).

If name is provided, it will be appended to the generated symbol that identifies the node.

source
Base.mapMethod
map(f, arg::Node, args::Node...; name)

Creates a node whose value is given by calling f with the values of the nodes (arg, arg...).

If name is provided, it will be appended to the generated symbol that identifies the node.

source
ReactiveGraphs.constantMethod
constant(x; name)

Bulds a node that contains the constant value x, and that does not propagate directly. If x is a Bool, then the constant value will be propagated by Julia's compiler.

If name is provided, it will be appended to the generated symbol that identifies the node.

source
ReactiveGraphs.inlinedmapMethod
inlinedmap(f, arg::Node, args::Node...; name)

Similarly to map, creates a node whose value is given by calling f with the values of the nodes (arg, arg...). Contrarily to map, the value is not stored, and the function call is performed each time the value of the node is required.

See the implementation of lag for a use case example.

If name is provided, it will be appended to the generated symbol that identifies the node.

source
ReactiveGraphs.inputMethod
input(x; name)

Creates a node that contains x. This should be used when x is mutable. To push a value in the node, one need to wrap it in a Source, and call setindex!. See Source.

If name is provided, it will be appended to the generated symbol that identifies the node.

Example:

julia> i = input(Ref(0))
       s = Source(i)
       push!(s, x -> x[] = 1)
source
ReactiveGraphs.inputMethod
input(::Type{T}; name)

Creates a node that will contain a element of type T. To push a value in the node, one need to wrap it in a Source, and call setindex!. See Source.

If name is provided, it will be appended to the generated symbol that identifies the node.

Example:

julia> i = input(Int)
       s = Source(i)
       push!(s, 1)
source
ReactiveGraphs.lagMethod
lag(n::Integer, node::Node; name)

Creates a node that contains the n-th lagged value of node.

If name is provided, it will be appended to the generated symbol that identifies the node.

Example:

julia> i = input(Int)
       n = lag(2, i)
       map(print, n)
       s = Source(i)
       for x = 1:7
           push!(s, x)
       end
12345
source
ReactiveGraphs.quietMethod
quiet(node::Node; name)

Creates a node that contains the same value as node, but does not trigger its children.

If name is provided, it will be appended to the generated symbol that identifies the node.

source
ReactiveGraphs.selectMethod
select(f::Function, x::Node; name)

Bulds a node that contains the same value as node x, but that prevents updating its children when the value of map(f, x) is false.

If name is provided, it will be appended to the generated symbol that identifies the node.

source
ReactiveGraphs.selectMethod
select(x::Node, condition::Node; name)

Bulds a node that contains the same value as node x, but that prevents updating its children when the value of condition is false.

If name is provided, it will be appended to the generated symbol that identifies the node.

source
ReactiveGraphs.updatedMethod
updated(arg::Node; name)

Creates a node that contains true when the node arg has been updated, false otherwise.

If name is provided, it will be appended to the generated symbol that identifies the node.

source