Tuesday, July 29, 2014

Minimum Viable Reactor Pattern (in a Reactor Pattern)


After last night, I believe that I finally have good handle on what the Reactor Pattern is. I also have a decent implementation to illustrate the pattern in Dart (at 039b2b0b3b in the git repo). But it is a bit obtuse.

I patterned the code after the original implementation from the paper that introduced the Reactor Pattern. As the paper noted, the pattern does not apply directly to a single threaded environment like Dart. It does not help that Dart itself is based on the Reactor Pattern. Even so, I think that I have Dart code that mimics the spirit, if not the exact implementation, of the implementation from the paper. But how much, exactly, of that spirit is really necessary?

After some thought, I think I can get rid of two things: a “re-imagining” of select() and the use of Dart isolates. Neither removal is a slam dunk—both serve multiple purposes in the sample code. But both also introduce additional concepts into an already concept-laden design pattern. If I have learned anything in my writings, it is that pretty much everything can (and should) be sacrificed at the alter of minimizing the number of concepts. So let's slaughter a couple of sacrificial concepts and see if I am rewarded...

First up is removing the Dart isolate code, which can been see on the very first line of the main entry point:
main() {
  // Spawn the message sending isolate and set its receive port as the source of
  // the fake select() messages
  Isolate.spawn(messageSender, connection());

  // Create (and register in constructor) event handler
  new LoggingAcceptor();

  // Reactor “loop” (handleEvent is recursive)
  new InitiationDispatcher().handleEvents();

  // NOTREACHED
  return 0;
}
Part of the benefit of using an isolate here is that communication takes place over “ports” which sound very much like the networking ports from the original C++ example. The comparison is not exactly one-to-one, however, and the send vs. receive ports can be confusing:
void messageSender(SendPort server) {
  var logClient = new ReceivePort();
  server.send({'type': 'log_connect', 'value': logClient.sendPort});
  logClient.
    first.
    then((SendPort s1) {
      // Send messages to the connected “application“ port
    });
}
The isolate was introduced to solve a problem that it did not actually solve. I kept it for the “ports,” but perhaps streams would suffice? It is impossible to escape streams in Dart, so using them here would hardly count as a new concept.

The problem is that, in order to mimic the network nature of the Reactor Pattern sample, I need different connections for different services. I had not realized it, but the isolates fit this fairly well. I can convert to streams, but the change does not seem to improve the readability of the example that much:
void messageSender(StreamController server) {
  var logClient = new StreamController();
  server.add({'type': 'log_connect', 'value': logClient});
  logClient.
    stream.
    first.
    then((StreamController s1) {
      s1.add({'type': 'log', 'value': 'howdy'});
      s1.add({'type': 'log', 'value': 'chris'});
      server.add({'type': 'log'});
      // ...
    });
}
Even with isolates, the send and receive ports are ultimately streams themselves, so maybe this pure stream approach is the best option. Still, actually using the companion nature of send and receive ports lends itself to networking discussion. I will have to think about this.

OK, so with the isolates removed, next up is the select() re-imagining. This is a tough thing to remove because it as the heart of the Reactor Pattern. The real select() call is responsible for demultiplexing systems calls likes TCP/IP server connections, network messages, and system I/O. Ultimately these system messages go into a queue that can be processed via select() calls inside of the reactor loop. In both the article and my sample code, this takes the form of the handleEvents() method of the dispatcher:
class InitiationDispatcher implements Dispatcher {
  // ...
  handleEvents() {
    var event = select().fetch();
    if (event == null) { /* Jump to next loop to select again... */ }
    events[event.type].forEach((h)=> h.handleEvent(event));
  }
}
My select() is not even a real function—it just returns an object whose fetch() method behaves like the real select(). I ought to remove it just for that bit of confusion, but… this fake select() does have the advantage of being a very explicit Queue of events:
class Select {
  StreamController<Map> connection;

  static final Select _select = new Select._internal();
  factory Select()=> _select;
  Select._internal() {
    connection = new StreamController();

    connection.
      stream.
      listen((m) {
        add(new MessageEvent(m['type'], m['value']));
      });
  }

  Queue<MessageEvent> queue = new Queue();

  void add(MessageEvent e) { queue.addLast(e); }
  MessageEvent fetch() {
    if (queue.isEmpty) return null;
    return queue.removeFirst();
  }
}
That said, streams themselves are decent demultiplexers. So perhaps I can get rid of all of the Select queuing stuff and replace it with a simple stream?
StreamController _connect;
connect() {
  if (_connect == null) {
    _connect = new StreamController.broadcast();
  }
  return _connect;
}
The main() entry point and the message sending code does not have to change, but, unfortunately, the actual handleEvents() method that is called in the heart of the reactor is not longer a loop or a recursive function. Instead, it just becomes a stream listener:
class InitiationDispatcher implements Dispatcher {
  // ...
  handleEvents([int timeout=10]) {
    connect().
      stream.
      listen((event){
        print('[handleEvents] ${event}');
        events[event['type']].forEach((h)=> h.handleEvent(event));
      });
  }
}
At this point, the sample code is no longer demonstrating the Reactor Pattern. Instead it is a roundabout way to illustrate that Dart is using the Reactor Pattern under the covers. Even worse, I cannot send the messages ahead of time since they will no longer queue. Instead I have to further deviate from the example in the original paper and move the message sending after the dispatcher is created:
main() {
  new LoggingAcceptor();
  new InitiationDispatcher().handleEvents();
  messageSender(connect());
}
That is not a show stopper, but the lack of an actual loop (or recursive function) probably is.

So in the end, last night's isolate and fake select() code was probably fairly close to what I need. There is still some code that I might clean up in there, but maybe not as many concepts as I had thought.


Day #137

No comments:

Post a Comment