I think that I have decent sample code the Reactor Pattern in Dart. Furthermore, I think I have a decent explanation for why I cannot get closer to the C++ implementation in the original paper. I think, I think, I think… What I know is that I usually get into trouble when I think.
To have a better idea that I have replicated the spirit, if not the exact details, of the pattern described in the original paper, I would like to establish two communication channels between my isolate message sending code and the reactor loop from yesterday:
main() { // Spawn the message sending isolate and set its receive port as the source of // the fake select() messages var res = new ReceivePort(); Select.source = res; Isolate.spawn(messageSender, res.sendPort); // Create (and register in constructor) event handler new WordAcceptor(); // Reactor “loop” (handleEvent is recursive) new InitiationDispatcher().handleEvents(); }Aside from the shenanigans with the select(), this looks very much like the main loop presented in the paper. The “word acceptor” waits for the reactor to inform it of new word sending connections, at which point it creates an instance of a “word handler” which simply prints words to STDOUT.
The problem with my current implementation is that I have the acceptor doing everything. Instead, it should follow the paper more closely—at least if I hope to better appreciate the implications. So I start with a much simpler
WordAcceptor
class:class WordAcceptor implements EventHandler { WordAcceptor() { new InitiationDispatcher().registerHandler(this, 'word_connect'); } void handleEvent(connection) { if (connection.type != 'word_connect') return; new WordPrinter(connection.value); } }As in the source paper, this registers itself with a singleton instance of the initiation dispatcher. It is looking to accept incoming connections (connections in this setup come from a separate isolate of code) with a type of
word_connect
. That is, it is accepting “word” client connections.The initiation dispatcher invokes the acceptor's
handleEvent()
method whenever such an event occurs. I will supply a value of a “receive port.” The port in this case is a connection back into the calling isolate—a way to communicate back with the isolate. That value is supplied to the WordPrinter
which, until now, has not been pulling its weight.The
WordPrinter
will mimic the functionality of the Logging_Handler
from the paper. I will not have the low-level C interfaces into network devices, but it can use the isolate ports as a substitute:class WordPrinter implements EventHandler { SendPort sendPort; ReceivePort receivePort; StreamSubscription handle; final timeout = const Duration(milliseconds: 2); WordPrinter(this.sendPort) { new InitiationDispatcher() ..registerHandler(this, 'word') ..registerHandler(this, 'word_close'); // ... } // ... }Like the
WordAcceptor
, the WordPrinter
needs to register itself with the reactor's dispatcher. In this case, it will handle word
connections (to signal when words are available to be read) and word_close
connections (to signal that there are no more words).The
sendPort
is the mechanism by which this printer can communicate back to the isolate sending words to be printed. The WordPrinter
only needs to send a means for the isolate to send words directly back to the WordPrinter
. This is very much in the spirit of the Logging_Handler
establishing a server connection for clients once the Logging_Acceptor
receives a connection request. The WordPrinter
establishes a local port via which a client (the message sending isolate) can send data.To accomplish this, the
WordPrinter
needs its own receive port. So I add that in the constructor:class WordPrinter implements EventHandler { // ... WordPrinter(this.sendPort) { new InitiationDispatcher() ..registerHandler(this, 'word') ..registerHandler(this, 'word_close'); receivePort = new ReceivePort(); sendPort.send(receivePort.sendPort); handle = receivePort. asBroadcastStream(). timeout(timeout, onTimeout: (_){ handle.pause(); }). listen(write) ..pause(); } // ... }The timeout and the pause are how I hope to better mimic the Reactor Pattern's usage of
select()
in C++. When data is ready to be read in the C++ version, the Reactor signals the concrete event handler to read as much data is available for non-blocking read. It then yields control back to the reactor so that it can wait for more data—either from the same connection or from somewhere else. This is the very heart of the pattern, so it seems useful to try to simulate it here. So I pause the listener on the stream as soon as it is established so that control can go back to the reactor loop, which will wait until it sees a
word
event. When it does, this dispatcher invokes handleEvent()
, which resumes the subscription handle, which reads all data available on the stream, sending it to write()
::class WordPrinter implements EventHandler { // ... void handleEvent(event) { if (event.type == 'word') { read(); } // ... } void read() { handle.resume(); } void write(word) { print('[WordPrinter.read] $word'); } }With that, I think I have a decent replication of the spirit of the
Logging_Handler
class from the paper, but now in Dart.To call this code, I use the isolate's sendPort to send the
word_connect
message. The WordAcceptor
then creates a WordPrinter
which replies back with a send port of its own:main() { // Spawn the message sending isolate and set its receive port as the source of // the fake select() messages var res = new ReceivePort(); Select.source = res; Isolate.spawn(messageSender, res.sendPort); // Create (and register in constructor) event handler new WordAcceptor(); // Reactor “loop” (handleEvent is recursive) new InitiationDispatcher().handleEvents(); // NOTREACHED return 0; } void messageSender(SendPort port) { var wordSender = new ReceivePort(); port.send({'type': 'word_connect', 'value': wordSender.sendPort}); wordSender. first. then((SendPort s1) { s1.send({'type': 'word', 'value': 'howdy'}); s1.send({'type': 'word', 'value': 'chris'}); port.send({'type': 'word'}); }); }Once the
WorkPrinter
's send port comes back, I can send as many words as I like. To actually tell the reactor that non-blocking data is ready, I send a connection of type word
.And that actually works. The reactor sees the messages, triggers the
handle_event()
methods in its registered event handlers, which eventually results in the right connections and ultimately words being printed. It even works if I delay sending some of the words (and then send a follow-up word
connection):$ ./bin/reactor.dart [handleEvents] word_connect [handleEvents] word [WordPrinter.read] {type: word, value: howdy} [WordPrinter.read] {type: word, value: chris} [handleEvents] word [WordPrinter.read] {type: word, value: delayed} [WordPrinter.read] {type: word, value: howdy} [WordPrinter.read] {type: word, value: chris} [handleEvents] word_closeTo be sure, this is a loooong way to go to print out words, but it does a decent job of capturing the intent of the Reactor Pattern in Dart. Of course, Dart itself is a Reactor Pattern, so absolutely none of this is needed. But it was still fun to implement.
Day #136
No comments:
Post a Comment