Thursday, January 21, 2016

Full-Duplex Streams


I ask you, what kind of world do we live in where websockets are the easy answer?

Sure, my problems are my own and completely contrived, but still, websockets have proven to be the ideal medium to describe the remote proxy pattern. What makes them so nice is that I only have to supply a single socket to the proxy instance in order to enable it to control a real subject:
main() async {
  var socket = await WebSocket.connect('ws://localhost:4040/ws');
  ProxyCar car = new ProxyCar(socket);
  // ...
}
The reason that this works is that websockets are, under the covers, a full-duplex communication channel. That is, a single websocket supports messages going from the client to the server and from the server to the client.

Just as importantly, websockets send bi-directional messages rather than broadcast messages. Streams in Dart are either one-way or broadcast. I had previously attempted the remote proxy pattern with streams, but that required two streams, one for inbound and one for outbound:
main() async {
  var mainOut = new StreamController(),
      mainIn = new StreamController();
  ProxyCar car = new ProxyCar(mainOut, mainIn);
  // ...
}
It works and, if you think about the need for sending messages from the client to the server and vice versa, that approach makes sense. But if I am trying to describe this in a book like Design Patterns in Dart, I don't want readers to have to think about this—just the core concept being discussed.

And don't even get me started on communication via isolate workers. They are twice as conceptually complex due to the need to create communication channels over existing one-way channels.

And so, yes, I have created a world for myself in which websockets are the neat and clean answer. Armageddon must soon surely follow.

But seriously, I am not diametrically opposed to websockets for this case. They do make a certain amount of sense as a vehicle for remotely controlling cars or other objects. I had hoped to find another kind of stream in the standard Dart library that was full-duplex, but that would appear not possible.

Websockets in Dart are streams and they implement the Stream interface (for listening to messages) and the StreamSink interface (for sending messages). I had hoped that StreamSink might be implemented by another class that happened to be full-duplex. But it is only implemented by the StreamController that previously forced me into the undesirable two-stream-instances implementation.

But what if I try it with just a single stream instead?
main() async {
  var socket = new StreamController.broadcast();
  ProxyCar car = new ProxyCar(socket);
  // ...
}
The socket needs to be a broadcast stream so that both the proxy and real subject can listen to the same stream. Of course that is going to cause problems since ProxyCar will see messages from the real car and itself. So I need a way for ProxyCar to ignore its own messages.

I could use an enum here or some other lookup, but it turns out that I already have a convention in place. The ProxyCar class only sends commands to the real car. I am sending those as symbols instead of strings (because mirrors!):
class ProxyCar implements AsyncAuto {
  // ...
  Future drive() => _send(#drive);
  Future stop()  => _send(#stop);

  Future _send(message) {
    _socket.add(message);
    // ...
  }
}
So when ProxyCar listens for messages on the broadcast "socket," I can filter out any symbol messages with where():
class ProxyCar implements AsyncAuto {
  StreamController _socket;
  String _state = "???";

  ProxyCar(this._socket) {
    _socket.stream.where((m)=> m is! Symbol).listen((message) {
      print("[ProxyCar] $message");
      _state = message;
    });
  }
  // ...
}
Conversely, the real car, AsyncCar, only sends strings, so it can filter those out:
class AsyncCar implements AsyncAuto {
  StreamController _socket;
  Car _car;

  AsyncCar(this._socket) {
    _car = new Car();

    _socket.stream.where((m)=> m is! String).listen((message) {
      print("[AsyncCar] $message");
      if (message == #drive) _car.drive();
      if (message == #stop)  _car.stop();
      _socket.add(state);
    });
  }
  // ...
}
That works, and is nicer than creating two explicit instances. There is no need for mention of full-duplex communication should I include this in the book—I can merely mention that these classes need to ignore their own messages. The best part of this solution is that it works on DartPad:

https://dartpad.dartlang.org/aef966876bda192076c0

Still, crazy as it seems, this is not as nice as websockets which require no explanation other than "they send messages back and forth between client and server." So, unless I really need DartPad or another requirement presents itself, it looks like I love me some websockets.


Day #71

No comments:

Post a Comment