Saturday, July 26, 2014

(Not Quite) Simulating Select for a Dart Reactor


I still hope to build an illustrative implementation of the Reactor Pattern in Dart. The ultimate goal is to begin to understand the right abstraction level which Design Patterns in Dart should approach concurrency patterns (or if it even should include them). Many concurrency patterns come baked into Dart (as does the Reactor Pattern), so it is very much an open question if I ought to cover them. Still, it is worth exploring…

After last night, I have the dispatcher and event handler interfaces fairly well defined as:
abstract class Dispatcher {
  void handleEvents([int timeout=0]);
  void registerHandler(EventHandler);
  void removeHandler(EventHandler);
}

abstract class EventHandler {
  void handleEvent(type);
  get handle;
}
The Dispatcher class serves as the interface for the InitiationDispatcher in the original paper. Similarly, EventHandler has the same structure as the interface in the paper.

From there, things got a little more difficult for me—party because Dart is single threaded and already built on the Reactor Pattern but mostly because I still lack a deep understanding of the pattern. In the end, I was able to somewhat demonstrate the pattern with a STDIN KeyHandler:
main() {
  // Create (and register in constructor) event handler
  new HandleKey();

  // Reactor loop...
  for(;;) {
    new InitiationDispatcher().handleEvents();
  }
}
STDIN is a poor way to demonstrate a demultiplexing pattern since only one thing can be sent to it at a time. Also, the simple implementations of STDIN block in Dart, which rather defeats the purpose of the tight event loop. So let's see if I can come up with a more illustrative example.

I start by defining select() as a Queue on which I can push “system events”:
class Select {
  static final Select _select = new Select._internal();
  factory Select()=> _select;
  Select._internal();

  Queue queue = new Queue();

  void add(String type) { queue.addLast(type); }
  SystemEvent fetch() {
    if (queue.isEmpty) return null;
    return queue.removeFirst();
  }
}

select()=> new Select();
The Select class is a singleton, so whenever new stuff is pushed on there, it is pushed onto the same queue. Hopefully this will mimic the system() call built into Linux sufficiently for my purposes.

The handles that are read from (and written to) after an OS select have an ID. I again try to mimic that:
class Handle {
  static int nextNumber = 1;
  static Map<int,Handle> lookup = {};

  int number;
  String type;
  var stream;
  Handle(this.type, this.stream){
    number = Handle.nextNumber++;
    Handle.lookup[number] = this;
  }
}
Instead of storing the ID in the operating system, I am storing it in the class itself for later lookup.

The other side of the equation is building an “acceptor” and “processor” pair. This is not strictly necessary for demonstrating the Reactor Pattern, but it follows the original paper closely and—hopefully—will help me better understand things.

First up the acceptor, which accepts “word” connections:
class WordAcceptor implements EventHandler {
  WordAcceptor() {
    new InitiationDispatcher().registerHandler(this, 'word');
  }

  void handleEvent(event) {
    if (event.type != 'word') return;
    new WordPrinter(event.handleId);
  }
  get handle;
}
As with other reactor event handlers, it registers itself with the dispatcher and then handles events by creating a processor instance—in this case a simple word printer that will print the message to STDOUT. Yes, I am going a long way simply to print things out...

The process reads from simple streams:
class WordPrinter  {
  int handleId;
  Handle handle;

  WordPrinter(this.handleId) {
    handle = Handle.lookup[handleId];
    read();
  }

  void read() {
    handle.stream.listen((word){
      print('[WordPrinter.read] $word');
    });
  }
}
Hopefully streams will serve as a nice stand-in for reading from low-level sockets.

Finally, I change the reactor loop to listen for “word” connections and start up a message sender:
main() {
  _startRandomMessageSender();

  // Create (and register in constructor) event handler
  new WordAcceptor();

  // Reactor loop...
  for(;;) {
    new InitiationDispatcher().handleEvents();
  }
}
But here, I am stumped. The reactor loop is too tight for my fake file system streams to be processed. I am effectively blocking in the reactor loop. I can get the message acceptor to fire because it adds to the System queue that is inside the reactor loop. But streams are outside of it.

I could probably replace the streams with something that also gets placed onto the system queue (or something similar). But I am still stumped about how to simulate random data coming in over time. If I try a Timer.run() in the _startRandomMessageSender():
void _startRandomMessageSender() {
  // ...
  new Timer(
    new Duration(seconds: 1),
    () { print('yo'); )
  );
}
Then the print statement is never reached. Again, the reactor loop is too tight. And if I cannot print something 1 second later, how can I send messages to processors?

Bleh. This may be the hazards of attempting to simulate the Reactor Pattern on a platform that itself implements the Reactor Pattern. But, I think it may be more a matter of rethinking how I want to simulate the low-level handles. I will ruminate on this some more and approach it fresh tomorrow.



Day #134

4 comments:

  1. Or should I say, use Isolates as a source of events ??

    ReplyDelete
    Replies
    1. I'll probably give this a try. I'm a bit worried that it'll add to the complexity of the example too much (obscuring rather than illustrating the pattern), but I haven't come up with any better ideas yet....

      Delete
    2. Yeah, it's all I can think of at the moment too, and I know it's not ideal.

      Delete