(1 2 (3 4))
(+ 3 4)
(if (< 0 1) (1 2 3) ())
(defun S (n) (+ n 1))
(defstruct pair
(x 0)
(y 0))
(defclass vec2 (pair) ())
(defmethod abs ((v vec2))
(+ (* :x :x) (* :y :y)))
class AnalysisReactor
extends Reactor[String] {
var totalLength = 0
}
spawn(Proto[AnalysisReactor])
val ch: Channel[String]
ch ! "event payload"
var totalLength = 0
main
.events
.onEvent {
x => totalLength += x.length
}
// state
// main connector
// event stream
// event handler
var numSch = 0
sysEvents
.onMatch {
case Scheduled =>
numSch += 1
case Terminated =>
println(totalLength / numSch)
}
var numSch = 0
sysEvents // Events[SysEvent]
.onMatch {
case Scheduled =>
numSch += 1
case Terminated =>
println(totalLength / numSch)
}
def router(p: T => Channel[T]) = {
val connector = open[T]
}
def router(p: T => Channel[T]) = {
val connector = open[T]
connector.events onEvent { x =>
p(x) ! x
}
}
def router(p: T => Channel[T]) = {
val connector = open[T]
connector.events onEvent { x =>
p(x) ! x
}
connector.channel
}
def router(p: T => Channel[T]) = {
val connector = open[T]
connector.events onEvent { x =>
p(x) ! x
}
connector.channel
}
def roundRobin(chs: Channel[T]*) = {
var i = -1
(x: T) => {
i = (i + 1) % chs.length
chs(i)
}
}
val input: Channel[T] =
router(roundRobin(outputs))
spawn
starts a reactor.
Channel[T]
send an event.
Events[T]
delivers events.
open[T]
creates a channel.
type Req[T, S] = (T, Channel[S])
type Server[T, S] = Channel[Req[T, S]]
type Req[T, S] = (T, Channel[S])
type Server[T, S] = Channel[Req[T, S]]
def server(f: T => S) = {
val c = open[Req[T, S]]
}
type Req[T, S] = (T, Channel[S])
type Server[T, S] = Channel[Req[T, S]]
def server(f: T => S) = {
val c = open[Req[T, S]]
c.events onMatch {
case (x, ch) => ch ! f(x)
}
c.channel
}
def ?[T, S]
(s: Server[T, S], x: T): Events[S] = {
}
def ?[T, S]
(s: Server[T, S], x: T): Events[S] = {
val reply = open[S]
}
def ?[T, S]
(s: Server[T, S], x: T): Events[S] = {
val reply = open[S]
s ! (x, reply.channel)
reply.events
}
def ?[T, S]
(s: Server[T, S], x: T): Events[S] = {
val reply = open[S]
s ! (x, reply.channel)
reply.events
}
val data = mutable.Map(1 -> 2, 3 -> 4)
val s = server(data)
def ?[T, S]
(s: Server[T, S], x: T): Events[S] = {
val reply = open[S]
s ! (x, reply.channel)
reply.events
}
val data = mutable.Map(1 -> 2, 3 -> 4)
val s = server(data)
(s ? 1) onEvent { x =>
println(x)
}
type TwoWay.Req[I, O] =
Req[Channel[I], Channel[O]]
type TwoWay.Server[I, O] =
Channel[TwoWay.Req[I, O]]
type TwoWay[I, O] = (Channel[I], Events[O])
def twoWayServer[I, O]:
(TwoWay.Server[I, O], Events[TwoWay[O, I]]) = {
val c = open[TwoWay.Req[I, O]]
}
type TwoWay[I, O] = (Channel[I], Events[O])
def twoWayServer[I, O]:
(TwoWay.Server[I, O], Events[TwoWay[O, I]]) = {
val c = open[TwoWay.Req[I, O]]
val connections = c.events map {
case (input, reply) =>
}
}
type TwoWay[I, O] = (Channel[I], Events[O])
def twoWayServer[I, O]:
(TwoWay.Server[I, O], Events[TwoWay[O, I]]) = {
val c = open[TwoWay.Req[I, O]]
val connections = c.events map {
case (input, reply) =>
val output = open[O]
reply ! output.channel
(input, output.events)
}
(c.channel, connections)
}
def connect[I, O](
s: TwoWay.Server[I, O]
): Events[TwoWay[I, O]] = {
val c = open[I]
(s ? c) map { output =>
(output, c.events)
}
}
type Reliable.Req[T] =
TwoWay.Req[Long, (T, Long)]
type Reliable.Server[T] =
TwoWay.Server[Long, (T, Long)]
def reliableServer[T]:
(Reliable.Server[T], Events[Events[T]]) = {
val (server, twoways) =
twoWayServer[Long, (T, Long)]
}
def reliableServer[T]:
(Reliable.Server[T], Events[Events[T]]) = {
val (server, twoways) =
twoWayServer[Long, (T, Long)]
val connections = twoways map {
case (acks, events) =>
}
(server, connections)
}
def reliableServer[T]:
(Reliable.Server[T], Events[Events[T]]) = {
val (server, twoways) =
twoWayServer[Long, (T, Long)]
val connections = twoways map {
case (acks, events) =>
val reliable = open[T]
}
(server, connections)
}
def reliableServer[T]:
(Reliable.Server[T], Events[Events[T]]) = {
val (server, twoways) =
twoWayServer[Long, (T, Long)]
val connections = twoways map {
case (acks, events) =>
val reliable = open[T]
reorder(events, reliable.channel, acks)
reliable.events
}
(server, connections)
}
def reorder(
events: Events[(T, Long)], r: Channel[T],
acks: Channel[Long]
): Unit = {
var nextStamp = 1L
val queue = new BinaryHeap[(T, Long)]()
def reorder(
events: Events[(T, Long)], r: Channel[T],
acks: Channel[Long]
): Unit = {
var nextStamp = 1L
val queue = new BinaryHeap[(T, Long)]()
events onMatch { case v @ (x, stamp) =>
if (stamp == nextStamp) {
acks ! nextStamp
nextStamp += 1
r ! x
while (nextStamp == queue.head._2) {
acks ! nextStamp
nextStamp += 1
r ! y }
} else queue.enqueue(v) } }
def openReliable[T](server: Reliable.Server[T]):
Events[Channel[T]] = {
server.connect() map {
case (output, acks) =>
}
}
def openReliable[T](server: Reliable.Server[T]):
Events[Channel[T]] = {
server.connect() map {
case (output, acks) =>
val reliable = open[T]
install(reliable.events, output, acks)
reliable.channel
}
}
type Pump[T] = (Queue[T], Events[T])
type Pump[T] = (Queue[T], Events[T])
def backpressurePump[T]
(twoway: TwoWay[T, Int]): Pump[T] = {
val (in, pressure) = twoway
}
type Pump[T] = (Queue[T], Events[T])
def backpressurePump[T]
(twoway: TwoWay[T, Int]): Pump[T] = {
val (in, pressure) = twoway
val (queue, xs) = in.toPump
}
type Pump[T] = (Queue[T], Events[T])
def backpressurePump[T]
(twoway: TwoWay[T, Int]): Pump[T] = {
val (in, pressure) = twoway
val (queue, xs) = in.toPump
xs.onEvent(x => pressure ! x)
(queue, xs)
}
type Valve[T] = (Channel[T], Signal[Boolean])
type Valve[T] = (Channel[T], Signal[Boolean])
def backpressureValve[T]
(twoway: TwoWay[Int, T]): Valve[T] = {
val (pressure, out) = twoway
}
type Valve[T] = (Channel[T], Signal[Boolean])
def backpressureValve[T]
(twoway: TwoWay[Int, T]): Valve[T] = {
val (pressure, out) = twoway
val c = open[T]
val sends = c.events.map(x => -1)
}
type Valve[T] = (Channel[T], Signal[Boolean])
def backpressureValve[T]
(twoway: TwoWay[Int, T]): Valve[T] = {
val (pressure, out) = twoway
val c = open[T]
val sends = c.events.map(x => -1)
val available = (pressure union sends)
.scanPast(0)(_ + _).toSignal
}
type Valve[T] = (Channel[T], Signal[Boolean])
def backpressureValve[T]
(twoway: TwoWay[Int, T]): Valve[T] = {
val (pressure, out) = twoway
val c = open[T]
val sends = c.events.map(x => -1)
val available = (pressure union sends)
.scanPast(0)(_ + _).toSignal
c.events.filter(x => available())
.onEvent(x => out ! x)
}
type Valve[T] = (Channel[T], Signal[Boolean])
def backpressureValve[T]
(twoway: TwoWay[Int, T]): Valve[T] = {
val (pressure, out) = twoway
val c = open[T]
val sends = c.events.map(x => -1)
val available = (pressure union sends)
.scanPast(0)(_ + _).toSignal
c.events.filter(x => available())
.onEvent(x => out ! x)
(c.channel, available)
}
val bp = Reactor.backpressureServer()
val s = system.spawn(bp)
s.openBackpressure() onEvent { valve =>
while (valve.available) {
valve ! "request"
}
}
http://github.com/reactors-io/reactors http://reactors.io
Use a spacebar or arrow keys to navigate