Wednesday, January 4, 2012

Message Passing in Dart

‹prev | My Chain | next›

Having gotten the hang of basic Futures in Dart, today I move onto another interesting feature of the core language: ReceivePort. This class, along with the companion SendPort, encapsulates a means for message passing, especially between separate, isolated processes. At least until I am comfortable with message passing, I am going to ignore the separate processes part of the equation.

My first attempt will go something like this: (1) create receiver object and associated sender, (2) construct the callback that will be called when the receiver gets a message, and (3) send a message. This ends up looking like:
main () {
  // (1) receiver and associated sender
  final receiver = new ReceivePort()
      , sender = receiver.toSendPort();


  // (2) callback that the receiver invoked upon message receipt
  receiver.receive((message) {
    print("I got a message: '" + message + "'.");
  });

  // (3) send the message
  sender.send("The Sender says howdy!");
}
Easy enough... except it does not work:
➜  command_line git:(master) ✗ dart receive_port01.dart
Unhandled exception:
Closure argument mismatch
 0. Function: '::function' url: '/home/cstrom/repos/dart-site/examples/command_line/receive_port01.dart' line:1 col:1
 1. Function: 'ReceivePortImpl._handleMessage@924b4b8' url: 'bootstrap_impl' line:1705 col:22
Hrm... Line 1? That's the main() declaration, how could I have screwed that up? Ah... that is the first line of my anonymous function, not of the entire script. The tip-off is '::function' rather that a real name. If I intentionally throw an exception:
main () {
  throw(new Exception());

  // ...
}
Then the exception comes from '::main':
➜  command_line git:(master) ✗ dart receive_port01.dart
Unhandled exception:
Exception
 0. Function: '::main' url: '/home/cstrom/repos/dart-site/examples/command_line/receive_port01.dart' line:2 col:3
I am going to need to keep an eye on that "Function" name to keep these straight. I intend to write much broken Dart code in the near future.

Anyway, checking the documentation a little closer, I see that the receive() callback needs to accept two arguments—the second being a reply-to mechanism:
main () {
  final receiver = new ReceivePort()
      , sender = receiver.toSendPort();

  receiver.receive((message, replyTo) {
    print("I got a message: '" + message + "'.");
  });

  sender.send("The Sender says howdy!");
}
(try.dartlang.org)

Running that, I get the following output:
➜  command_line git:(master) ✗ dart receive_port01.dart
I got a message: 'The Sender says howdy!'.
Interestingly, the output just hangs there. I suppose that I should have expected that, given that the documentation states that receive() is "for receiving pending or future messages". That method could hardly listen for pending messages if it closed right away.

To get the script to close right away, I can create a "single shot" receiver:
main () {
  final receiver = new ReceivePort.singleShot()
      , sender = receiver.toSendPort();

  receiver.receive((message, replyTo) {
    print("I got a message: '" + message + "'.");
  });

  sender.send("The Sender says howdy!", sender);
  sender.send("Yo.");
}
I can also make it close by establishing a control message that triggers the receiver to explicitly close():
main () {
  final receiver = new ReceivePort()
      , sender = receiver.toSendPort();

  receiver.receive((message, replyTo) {
    if (message == "bye") receiver.close();

    print("I got a message: '" + message + "'.");
  });

  sender.send("The Sender says howdy!", sender);
  sender.send("Yo.");
  sender.send("bye");
}
One thing I have not figured out is how to send messages back to the sender. The documentation and some sample code suggests that it is possible, but I have not had success yet. Simply calling call() or send() on the replyTo is silently ignored. There must be a mechanism to establishe a callback on the sender in addition to the receiver. Something to investigate tomorrow.

Update: Figured it out how to send messages back with call():
main () {
  final receiver = new ReceivePort()
      , sender = receiver.toSendPort();

  receiver.receive((message, replyTo) {
    if (replyTo != null) {
      print("replying...");
      replyTo.send("Howdy back at ya!");
    }

    print("I got a message: '" + message + "'.");
  });

  sender.call("The Sender says howdy!").receive((message, replyTo) {
    print("Reply: " + message);
  });
  sender.send("Yo.");
  sender.send("bye");
}
(try.dartlang.org)

Day #255

5 comments:

  1. I was playing with ports too, right over here: https://github.com/Ladicek/dart-playground/blob/master/port.dart but I didn't know about sendPort.call, thanks!

    ReplyDelete
  2. @id Thanks! It bugs me a little that, inside the receive() callback, we have to use the local variable to close the receiver (e.g. `receiver`, `a`, or `b`). The `call` method gives some illusion of encapsulation, but I'm still not sure about the whole thing. Hopefully it'll click into place once I start playing with isolates.

    ReplyDelete
  3. A lot of things about Ports and Isolates starts to make sense only when looking backwards. It was like that for me anyway.

    The need to close the ReceivePort is obvious once you realize that each isolate is actually an event loop. And having an open ReceivePort means that more events can come. One-shot ReceivePorts for transfering a single message are so common that there is a special constructor for that, as you've noticed.

    I was myself mostly puzzled by the possibility to have more SendPorts for one ReceivePort, but it too started make sense. For example, one master spawning a lot of workers, each worker gets one SendPort to communicate with the master, but there is only one ReceivePort in the master that gets all messages from all workers. Etc. It's quite low-level, I'd say, but well thought-out.

    Ladicek (this auth system will probably rename me to "id" again...)

    ReplyDelete
  4. Great port examples Chris. I have some additional examples of this over here http://goo.gl/aGFHW

    ReplyDelete
  5. @Adam Nice! But, at the same time, aw man! I was gonna try fibonacci numbers next ;)

    Guess I'll have to come up with something original for long running processes. Or maybe I'll just add sleep statements.

    Thanks for the link --- pretty cool stuff!

    ReplyDelete