Oracle Labs Switzerland
October 2017
The following is intended to provide some insight into a line of research in Oracle Labs. It is intended for information purposes only, and may not be incorporated into any contract.
It is not a commitment to deliver any material, code, or functionality, and should not be relied in making purchasing decisions. Oracle reserves the right to alter its development plans and practices at any time, and the development, release, and timing of any features or functionality described in connection with any Oracle product or service remains at the sole discretion of Oracle.
Any views expressed in this presentation are my own and do not necessarily reflect the views of Oracle.
Should a message-based programming model have backpressure as a core primitive?
Syntax
terms ::=
...
spawn(t)
t ! t
t.onEvent(t)
open[T]
types ::=
...
Events[T]
Channel[T]
Semantics
P = { R }
program
Semantics
P = { R }
Semantics
P = { R }
R = (t, { C })
program
reactor
Semantics
P = { R }
R = (t, { C })
Semantics
P = { R }
R = (t, { C })
C = (c, e, x*, F)
program
reactor
connector
Semantics
P = { R }
R = (t, { C })
C = (c, e, x*, F)
Semantics
P = { R }
R = (t, { C })
C = (c, e, x*, F)
F = { f }
program
reactor
connector
callbacks
Semantics
P = { R }
R = (t, { C })
C = (c, e, x*, F)
F = { f }
How to encode a function that returns a backpressure channel?
def backpressure[T]: BackpressureChannel[T]
Encode backpressure as a composition of several simpler protocols.
type Req[T, S] = (T, Channel[S])
type Req[T, S] = (T, Channel[S])
type Server[T, S] = Channel[Req[T, S]]
type Req[T, S] = (T, Channel[S])
type Server[T, S] = Channel[Req[T, S]]
def server(f: T => S): Server[T, S] = {
}
type Req[T, S] = (T, Channel[S])
type Server[T, S] = Channel[Req[T, S]]
def server(f: T => S): Server[T, S] = {
val c = open[Req[T, S]]
}
type Req[T, S] = (T, Channel[S])
type Server[T, S] = Channel[Req[T, S]]
def server(f: T => S): Server[T, S] = {
val c = open[Req[T, S]]
c.events onEvent {
case (x, ch) => ch ! f(x)
}
}
type Req[T, S] = (T, Channel[S])
type Server[T, S] = Channel[Req[T, S]]
def server(f: T => S): Server[T, S] = {
val c = open[Req[T, S]]
c.events onEvent {
case (x, ch) => ch ! f(x)
}
c.channel
}
def ?[T, S]
(s: Server[T, S], x: T): Events[S] = {
}
def ?[T, S]
(s: Server[T, S], x: T): Events[S] = {
val reply = open[S]
}
def ?[T, S]
(s: Server[T, S], x: T): Events[S] = {
val reply = open[S]
s ! (x, reply.channel)
}
def ?[T, S]
(s: Server[T, S], x: T): Events[S] = {
val reply = open[S]
s ! (x, reply.channel)
reply.events
}
def ?[T, S]
(s: Server[T, S], x: T): Events[S] = {
val reply = open[S]
s ! (x, reply.channel)
reply.events
}
val data = mutable.Map(1 -> 2, 3 -> 4)
val s = server(data)
def ?[T, S]
(s: Server[T, S], x: T): Events[S] = {
val reply = open[S]
s ! (x, reply.channel)
reply.events
}
val data = mutable.Map(1 -> 2, 3 -> 4)
val s = server(data)
(s ? 1) onEvent { x =>
println(x)
}
type TwoWay[I, O] = (Channel[O], Events[I])
type TwoWay.Req[I, O] =
Req[Channel[I], Channel[O]]
type TwoWay.Req[I, O] =
Req[Channel[I], Channel[O]]
type TwoWay.Server[I, O] =
Server[Channel[I], Channel[O]]
type TwoWay[I, O] = (Channel[O], Events[I])
type TwoWay[I, O] = (Channel[I], Events[O])
def twoWayServer[I, O]:
(TwoWay.Server[I, O], Events[TwoWay[O, I]])
type TwoWay[I, O] = (Channel[I], Events[O])
def twoWayServer[I, O]:
(TwoWay.Server[I, O], Events[TwoWay[O, I]]) = {
val c = open[TwoWay.Req[I, O]]
}
type TwoWay[I, O] = (Channel[I], Events[O])
def twoWayServer[I, O]:
(TwoWay.Server[I, O], Events[TwoWay[O, I]]) = {
val c = open[TwoWay.Req[I, O]]
val connections = c.events map {
case (input, sender) =>
}
}
type TwoWay[I, O] = (Channel[I], Events[O])
def twoWayServer[I, O]:
(TwoWay.Server[I, O], Events[TwoWay[O, I]]) = {
val c = open[TwoWay.Req[I, O]]
val connections = c.events map {
case (input, sender) =>
val output = open[O]
sender ! output.channel
(input, output.events): TwoWay[O, I]
}
}
type TwoWay[I, O] = (Channel[I], Events[O])
def twoWayServer[I, O]:
(TwoWay.Server[I, O], Events[TwoWay[O, I]]) = {
val c = open[TwoWay.Req[I, O]]
val connections = c.events map {
case (input, sender) =>
val output = open[O]
sender ! output.channel
(input, output.events)
}
(c.channel, connections)
}
def connect[I, O](
s: TwoWay.Server[I, O]
): Events[TwoWay[I, O]] = {
}
def connect[I, O](
s: TwoWay.Server[I, O]
): Events[TwoWay[I, O]] = {
val c = open[I]
}
def connect[I, O](
s: TwoWay.Server[I, O]
): Events[TwoWay[I, O]] = {
val c = open[I]
(s ? c) map { output =>
(output, c.events)
}
}
type ReliableChannel[T] =
TwoWay[Stamp, (T, Stamp)]
type ReliableChannel[T] =
TwoWay[Stamp, (T, Stamp)]
type ReliableEvents[T] =
TwoWay[(T, Stamp), Stamp]
def reliable(e: ReliableEvents[T]): Events[T]
def reliable(c: ReliableChannel[T]): Channel[T]
type BackpressureEvents[T] =
(Events[Boolean], Queue[T])
type BackpressureEvents[T] =
(Events[Boolean], Queue[T])
def backpressure[T]
(t: TwoWay[T, Int]): BackpressureEvents[T]
type BackpressureChannel[T] =
(Channel[T], Events[Boolean])
type BackpressureChannel[T] =
(Channel[T], Events[Boolean])
def backpressure[T](t: TwoWay[Int, T]):
BackpressureChannel[T]
Channel[TwoWay.Server[Int, T]]
type Stream[T] =
Channel[TwoWay.Server[Int, T]]
def map[T, S](
s: Stream[T], f: T => S): Stream[S]
def filter[T](
s: Stream[T], f: T => Boolean): Stream[T]
def scanLeft[T,S](
s: Stream[T], z: S,
op: (S, T) => S): Stream[S]
def batch(s: Stream[T], x: Int): Stream[Seq[T]]
def lift[T, S]
(f: Events[T] => Events[S]):
Stream[T] => Stream[S]
1. Backpressure not a core primitive.
2. However, compiler optimizations might be necessary.
Use a spacebar or arrow keys to navigate