Your browser doesn't support the features required by impress.js, so you are presented with a simplified version of this presentation.

For the best experience please use the latest Chrome, Safari or Firefox browser.

Bridging the Gap Towards

High-Level Distributed Computing

Aleksandar Prokopec
@alexprokopec

APL



APL had great syntax

(~R∈R∘.✕R)/R←1↓ιR

APL had great syntax

but it also had a flaw

Lisp



Lisp is more uniform

What the user writes feels like Lisp...

Lisp is more uniform

What the user writes feels like Lisp...
...assuming that user has good taste.

Lisp is minimal



Lisp is minimal

Everything is a list.

Lisp is minimal

Everything is a list.
(1 2 (3 4))

Lisp is minimal

Expression is a named list.
(+ 3 4)

Lisp is minimal

Statement is an expression.
(if (< 0 1) (1 2 3) ())

Lisp is minimal

Named functions are expressions.
(defun S (n) (+ n 1))

Lisp can grow

(defstruct pair
  (x 0)
  (y 0))

Lisp can grow

(defclass vec2 (pair) ())
(defmethod abs ((v vec2))
  (+ (* :x :x) (* :y :y)))

Lisp can grow

Think multiple inheritance, first-class generic functions, multimethods, first-class classes, Beta...

Lisp does it right.



Lisp does it right.

It starts with a few minimal concepts,
and uses them to build powerful stuff.

Lisp does it right.

But Lisp is not popular today.

PC loser-ing

Once, there was the MIT approach, and the New Jersey approach.

C is everywhere.

But Lisp is not.
How was Lisp then the right thing?

Lesson learned

It is undesirable to go for the right thing first. It is better to get half of the right thing available so that it spreads like a virus. Once people are hooked on it, take the time to improve it to 90% of the right thing.

R.G.

Lesson learned

A language should plan for growth.
It should start small and grow as the set of users grows.


G.L.S.
        
class AnalysisReactor
extends Reactor[String] {
  var totalLength = 0
}
spawn(Proto[AnalysisReactor])
        
      
        
val ch: Channel[String]
ch ! "event payload"
        
      
        
var totalLength = 0
main
  .events
  .onEvent {
    x => totalLength += x.length
  }
        
      
        
                    // state
             // main connector
             // event stream
             // event handler


        
      
        
var numSch = 0
sysEvents
  .onMatch {
    case Scheduled =>
      numSch += 1
    case Terminated =>
      println(totalLength / numSch)
  }
        
      
        
var numSch = 0
sysEvents  // Events[SysEvent]
  .onMatch {
    case Scheduled =>
      numSch += 1
    case Terminated =>
      println(totalLength / numSch)
  }
        
      

Serializability

For any single reactor,
at most one handler
can be observably active
at any point in time.
No data races!
        
def router(p: T => Channel[T]) = {
  val connector = open[T]




}








        
      
        
def router(p: T => Channel[T]) = {
  val connector = open[T]
  connector.events onEvent { x =>
    p(x) ! x
  }

}








        
      
        
def router(p: T => Channel[T]) = {
  val connector = open[T]
  connector.events onEvent { x =>
    p(x) ! x
  }
  connector.channel
}








        
      
        
def router(p: T => Channel[T]) = {
  val connector = open[T]
  connector.events onEvent { x =>
    p(x) ! x
  }
  connector.channel
}

def roundRobin(chs: Channel[T]*) = {
  var i = -1
  (x: T) => {
    i = (i + 1) % chs.length
    chs(i)
  }
}
        
      
        
val input: Channel[T] =
  router(roundRobin(outputs))
        
      

Reactor model

spawn starts a reactor.
Channel[T] send an event.
Events[T] delivers events.
open[T] creates a channel.
        
type Req[T, S] = (T, Channel[S])

type Server[T, S] = Channel[Req[T, S]]








        
      
        
type Req[T, S] = (T, Channel[S])

type Server[T, S] = Channel[Req[T, S]]

def server(f: T => S) = {
  val c = open[Req[T, S]]




}
        
      
        
type Req[T, S] = (T, Channel[S])

type Server[T, S] = Channel[Req[T, S]]

def server(f: T => S) = {
  val c = open[Req[T, S]]
  c.events onMatch {
    case (x, ch) => ch ! f(x)
  }
  c.channel
}
        
      
        
def ?[T, S]
  (s: Server[T, S], x: T): Events[S] = {



}







        
      
        
def ?[T, S]
  (s: Server[T, S], x: T): Events[S] = {
  val reply = open[S]


}







        
      
        
def ?[T, S]
  (s: Server[T, S], x: T): Events[S] = {
  val reply = open[S]
  s ! (x, reply.channel)
  reply.events
}







        
      
        
def ?[T, S]
  (s: Server[T, S], x: T): Events[S] = {
  val reply = open[S]
  s ! (x, reply.channel)
  reply.events
}

val data = mutable.Map(1 -> 2, 3 -> 4)
val s = server(data)




        
      
        
def ?[T, S]
  (s: Server[T, S], x: T): Events[S] = {
  val reply = open[S]
  s ! (x, reply.channel)
  reply.events
}

val data = mutable.Map(1 -> 2, 3 -> 4)
val s = server(data)

(s ? 1) onEvent { x =>
  println(x)
}
        
      
        
type TwoWay.Req[I, O] =
  Req[Channel[I], Channel[O]]

type TwoWay.Server[I, O] =
  Channel[TwoWay.Req[I, O]]
        
      
        
type TwoWay[I, O] = (Channel[I], Events[O])

def twoWayServer[I, O]:
  (TwoWay.Server[I, O], Events[TwoWay[O, I]]) = {
  val c = open[TwoWay.Req[I, O]]







}
        
      
        
type TwoWay[I, O] = (Channel[I], Events[O])

def twoWayServer[I, O]:
  (TwoWay.Server[I, O], Events[TwoWay[O, I]]) = {
  val c = open[TwoWay.Req[I, O]]
  val connections = c.events map {
    case (input, reply) =>



  }

}
        
      
        
type TwoWay[I, O] = (Channel[I], Events[O])

def twoWayServer[I, O]:
  (TwoWay.Server[I, O], Events[TwoWay[O, I]]) = {
  val c = open[TwoWay.Req[I, O]]
  val connections = c.events map {
    case (input, reply) =>
      val output = open[O]
      reply ! output.channel
      (input, output.events)
  }
  (c.channel, connections)
}
        
      
        
def connect[I, O](
  s: TwoWay.Server[I, O]
): Events[TwoWay[I, O]] = {
  val c = open[I]
  (s ? c) map { output =>
    (output, c.events)
  }
}
        
      

But wait.

What about reliability...
        
type Reliable.Req[T] =
  TwoWay.Req[Long, (T, Long)]

type Reliable.Server[T] =
  TwoWay.Server[Long, (T, Long)]
        
      
        
def reliableServer[T]:
  (Reliable.Server[T], Events[Events[T]]) = {
  val (server, twoways) =
    twoWayServer[Long, (T, Long)]







}
        
      
        
def reliableServer[T]:
  (Reliable.Server[T], Events[Events[T]]) = {
  val (server, twoways) =
    twoWayServer[Long, (T, Long)]
  val connections = twoways map {
    case (acks, events) =>



  }
  (server, connections)
}
        
      
        
def reliableServer[T]:
  (Reliable.Server[T], Events[Events[T]]) = {
  val (server, twoways) =
    twoWayServer[Long, (T, Long)]
  val connections = twoways map {
    case (acks, events) =>
      val reliable = open[T]


  }
  (server, connections)
}
        
      
        
def reliableServer[T]:
  (Reliable.Server[T], Events[Events[T]]) = {
  val (server, twoways) =
    twoWayServer[Long, (T, Long)]
  val connections = twoways map {
    case (acks, events) =>
      val reliable = open[T]
      reorder(events, reliable.channel, acks)
      reliable.events
  }
  (server, connections)
}
        
      
        
def reorder(
  events: Events[(T, Long)], r: Channel[T],
  acks: Channel[Long]
): Unit = {
  var nextStamp = 1L
  val queue = new BinaryHeap[(T, Long)]()










        
      
        
def reorder(
  events: Events[(T, Long)], r: Channel[T],
  acks: Channel[Long]
): Unit = {
  var nextStamp = 1L
  val queue = new BinaryHeap[(T, Long)]()
  events onMatch { case v @ (x, stamp) =>
      if (stamp == nextStamp) {
        acks ! nextStamp
        nextStamp += 1
        r ! x
        while (nextStamp == queue.head._2) {
          acks ! nextStamp
          nextStamp += 1
          r ! y }
      } else queue.enqueue(v) } }
        
      
        
def openReliable[T](server: Reliable.Server[T]):
  Events[Channel[T]] = {
  server.connect() map {
    case (output, acks) =>



  }
}
        
      
        
def openReliable[T](server: Reliable.Server[T]):
  Events[Channel[T]] = {
  server.connect() map {
    case (output, acks) =>
      val reliable = open[T]
      install(reliable.events, output, acks)
      reliable.channel
  }
}
        
      

Wait...

There is no backpressure here...
        
type Pump[T] = (Queue[T], Events[T])








        
      
        
type Pump[T] = (Queue[T], Events[T])

def backpressurePump[T]
  (twoway: TwoWay[T, Int]): Pump[T] = {
  val (in, pressure) = twoway



}
        
      
        
type Pump[T] = (Queue[T], Events[T])

def backpressurePump[T]
  (twoway: TwoWay[T, Int]): Pump[T] = {
  val (in, pressure) = twoway
  val (queue, xs) = in.toPump


}
        
      
        
type Pump[T] = (Queue[T], Events[T])

def backpressurePump[T]
  (twoway: TwoWay[T, Int]): Pump[T] = {
  val (in, pressure) = twoway
  val (queue, xs) = in.toPump
  xs.onEvent(x => pressure ! x)
  (queue, xs)
}
        
      
        
type Valve[T] = (Channel[T], Signal[Boolean])












        
      
        
type Valve[T] = (Channel[T], Signal[Boolean])

def backpressureValve[T]
  (twoway: TwoWay[Int, T]): Valve[T] = {
  val (pressure, out) = twoway







}
        
      
        
type Valve[T] = (Channel[T], Signal[Boolean])

def backpressureValve[T]
  (twoway: TwoWay[Int, T]): Valve[T] = {
  val (pressure, out) = twoway
  val c = open[T]
  val sends = c.events.map(x => -1)





}
        
      
        
type Valve[T] = (Channel[T], Signal[Boolean])

def backpressureValve[T]
  (twoway: TwoWay[Int, T]): Valve[T] = {
  val (pressure, out) = twoway
  val c = open[T]
  val sends = c.events.map(x => -1)
  val available = (pressure union sends)
    .scanPast(0)(_ + _).toSignal



}
        
      
        
type Valve[T] = (Channel[T], Signal[Boolean])

def backpressureValve[T]
  (twoway: TwoWay[Int, T]): Valve[T] = {
  val (pressure, out) = twoway
  val c = open[T]
  val sends = c.events.map(x => -1)
  val available = (pressure union sends)
    .scanPast(0)(_ + _).toSignal
  c.events.filter(x => available())
    .onEvent(x => out ! x)

}
        
      
        
type Valve[T] = (Channel[T], Signal[Boolean])

def backpressureValve[T]
  (twoway: TwoWay[Int, T]): Valve[T] = {
  val (pressure, out) = twoway
  val c = open[T]
  val sends = c.events.map(x => -1)
  val available = (pressure union sends)
    .scanPast(0)(_ + _).toSignal
  c.events.filter(x => available())
    .onEvent(x => out ! x)
  (c.channel, available)
}
        
      
        
val bp = Reactor.backpressureServer()
val s = system.spawn(bp)

s.openBackpressure() onEvent { valve =>
  while (valve.available) {
    valve ! "request"
  }
}
        
      

And from there...

Thank you


http://github.com/reactors-io/reactors

http://reactors.io
      

Use a spacebar or arrow keys to navigate