Monday, July 28, 2014

Deep, Isolated Reactors in Dart


I think that I have decent sample code the Reactor Pattern in Dart. Furthermore, I think I have a decent explanation for why I cannot get closer to the C++ implementation in the original paper. I think, I think, I think… What I know is that I usually get into trouble when I think.

To have a better idea that I have replicated the spirit, if not the exact details, of the pattern described in the original paper, I would like to establish two communication channels between my isolate message sending code and the reactor loop from yesterday:
main() {
  // Spawn the message sending isolate and set its receive port as the source of
  // the fake select() messages
  var res = new ReceivePort();
  Select.source = res;
  Isolate.spawn(messageSender, res.sendPort);

  // Create (and register in constructor) event handler
  new WordAcceptor();

  // Reactor “loop” (handleEvent is recursive)
  new InitiationDispatcher().handleEvents();
}
Aside from the shenanigans with the select(), this looks very much like the main loop presented in the paper. The “word acceptor” waits for the reactor to inform it of new word sending connections, at which point it creates an instance of a “word handler” which simply prints words to STDOUT.

The problem with my current implementation is that I have the acceptor doing everything. Instead, it should follow the paper more closely—at least if I hope to better appreciate the implications. So I start with a much simpler WordAcceptor class:
class WordAcceptor implements EventHandler {
  WordAcceptor() {
    new InitiationDispatcher().registerHandler(this, 'word_connect');
  }

  void handleEvent(connection) {
    if (connection.type != 'word_connect') return;
    new WordPrinter(connection.value);
  }
}
As in the source paper, this registers itself with a singleton instance of the initiation dispatcher. It is looking to accept incoming connections (connections in this setup come from a separate isolate of code) with a type of word_connect. That is, it is accepting “word” client connections.

The initiation dispatcher invokes the acceptor's handleEvent() method whenever such an event occurs. I will supply a value of a “receive port.” The port in this case is a connection back into the calling isolate—a way to communicate back with the isolate. That value is supplied to the WordPrinter which, until now, has not been pulling its weight.

The WordPrinter will mimic the functionality of the Logging_Handler from the paper. I will not have the low-level C interfaces into network devices, but it can use the isolate ports as a substitute:
class WordPrinter implements EventHandler {
  SendPort sendPort;
  ReceivePort receivePort;

  StreamSubscription handle;

  final timeout = const Duration(milliseconds: 2);

  WordPrinter(this.sendPort) {
    new InitiationDispatcher()
      ..registerHandler(this, 'word')
      ..registerHandler(this, 'word_close');
    // ...
  }
  // ...
}
Like the WordAcceptor, the WordPrinter needs to register itself with the reactor's dispatcher. In this case, it will handle word connections (to signal when words are available to be read) and word_close connections (to signal that there are no more words).

The sendPort is the mechanism by which this printer can communicate back to the isolate sending words to be printed. The WordPrinter only needs to send a means for the isolate to send words directly back to the WordPrinter. This is very much in the spirit of the Logging_Handler establishing a server connection for clients once the Logging_Acceptor receives a connection request. The WordPrinter establishes a local port via which a client (the message sending isolate) can send data.

To accomplish this, the WordPrinter needs its own receive port. So I add that in the constructor:
class WordPrinter implements EventHandler {
  // ...
  WordPrinter(this.sendPort) {
    new InitiationDispatcher()
      ..registerHandler(this, 'word')
      ..registerHandler(this, 'word_close');

    receivePort = new ReceivePort();
    sendPort.send(receivePort.sendPort);

    handle = receivePort.
      asBroadcastStream().
      timeout(timeout, onTimeout: (_){ handle.pause(); }).
      listen(write)
      ..pause();
  }
  // ...
}
The timeout and the pause are how I hope to better mimic the Reactor Pattern's usage of select() in C++. When data is ready to be read in the C++ version, the Reactor signals the concrete event handler to read as much data is available for non-blocking read. It then yields control back to the reactor so that it can wait for more data—either from the same connection or from somewhere else. This is the very heart of the pattern, so it seems useful to try to simulate it here.

So I pause the listener on the stream as soon as it is established so that control can go back to the reactor loop, which will wait until it sees a word event. When it does, this dispatcher invokes handleEvent(), which resumes the subscription handle, which reads all data available on the stream, sending it to write()::
class WordPrinter implements EventHandler {
  // ...
  void handleEvent(event) {
    if (event.type == 'word') {
      read();
    }
    // ...
  }

  void read() {
    handle.resume();
  }

  void write(word) {
    print('[WordPrinter.read] $word');
  }
}
With that, I think I have a decent replication of the spirit of the Logging_Handler class from the paper, but now in Dart.

To call this code, I use the isolate's sendPort to send the word_connect message. The WordAcceptor then creates a WordPrinter which replies back with a send port of its own:
main() {
  // Spawn the message sending isolate and set its receive port as the source of
  // the fake select() messages
  var res = new ReceivePort();
  Select.source = res;
  Isolate.spawn(messageSender, res.sendPort);

  // Create (and register in constructor) event handler
  new WordAcceptor();

  // Reactor “loop” (handleEvent is recursive)
  new InitiationDispatcher().handleEvents();

  // NOTREACHED
  return 0;
}

void messageSender(SendPort port) {
  var wordSender = new ReceivePort();
  port.send({'type': 'word_connect', 'value': wordSender.sendPort});
  wordSender.
    first.
    then((SendPort s1) {
      s1.send({'type': 'word', 'value': 'howdy'});
      s1.send({'type': 'word', 'value': 'chris'});
      port.send({'type': 'word'});
    });
}
Once the WorkPrinter's send port comes back, I can send as many words as I like. To actually tell the reactor that non-blocking data is ready, I send a connection of type word.

And that actually works. The reactor sees the messages, triggers the handle_event() methods in its registered event handlers, which eventually results in the right connections and ultimately words being printed. It even works if I delay sending some of the words (and then send a follow-up word connection):
$ ./bin/reactor.dart
[handleEvents] word_connect
[handleEvents] word
[WordPrinter.read] {type: word, value: howdy}
[WordPrinter.read] {type: word, value: chris}
[handleEvents] word
[WordPrinter.read] {type: word, value: delayed}
[WordPrinter.read] {type: word, value: howdy}
[WordPrinter.read] {type: word, value: chris}
[handleEvents] word_close
To be sure, this is a loooong way to go to print out words, but it does a decent job of capturing the intent of the Reactor Pattern in Dart. Of course, Dart itself is a Reactor Pattern, so absolutely none of this is needed. But it was still fun to implement.



Day #136

No comments:

Post a Comment