RxDart Tutorial for Flutter: Getting Started

Learn how to develop Flutter apps using the Reactive Programming paradigm with RxDart. By Ayush.

3 (2) · 1 Review

Download materials
Save for later
Share
You are currently viewing page 2 of 3 of this article. Click here to view the first page.

Stream Lifecycle Extensions

The interval function and map are just the beginning when it comes to extension functions in RxDart. Other useful examples include:

  • delay: Emits the stream events after a specified delay.
  • startWith/endWith: These extension functions allow you to add an event at the beginning or the end of a stream.
  • doOn_: These are extensions that specify various callbacks at different points of the stream lifecycle. For example, doOnData triggers on new events while doOnDone triggers on stream completion.

Speaking of doOnDone, you need to set the tetrimino into the grid once it can’t progress any further. In lib/player.dart, replace TODO: replace with animatingPlayerWithCompletionStream and the next statement with:

//TODO: replace with infiniteAnimatingPlayerWithCompletionStream
stream: _engine.animatingPlayerWithCompletionStream(),

A quick look at animatingPlayerWithCompletionStream in lib/engine.dart will tell you all need to know how doOnDone is being used to define the tetrimino’s behaviour when it is blocked.

return animatingPlayerStream()
    .takeWhile((_transformedPiece) => _canTetriminoMove(_transformedPiece))
    .doOnDone(() {
  _onTetriminoSet(Tetrimino(
      current: Piece.I,
      yOffset: (effectiveHeight ~/ extent - 1).toDouble(),
      origin: const Point(0, 0)));
});

Here’s a breakdown of the code above:

  1. takeWhile is an example of a extension function available from the Dart Stream API, along with map. It forces the stream to complete when it encounters the first event that does not satisfy its predicate. When the stream completes, the doOnDone callback is triggered.
  2. _onTetriminoSet is a utility function you use to check if the stacked pieces have reached the top of the board. If not, you use _gameController to set the last tetrimino to the board.

Build and run the app. You will see that once the tetrimino reaches the bottom of the board, the internal game state is updated to indicate that the portion of the board that this tetrimino lands rests on is now ‘blocked’.
Gif showing a tetrimino being blocked by the board edge

You now have a portion of a basic game with a block animating from the top of the board to the end. Let’s see how you can add more features, like moving the tetriminos with user input and tetrimino creation, once the previous tetrimino reaches the end of the board.

Combining Streams

The RangeStream you’ve been using up to this point is only meant to move the tetrimino, and the game now has to generate another random one to continue. It also has to handle user input so you can control the trajectory of the tetrimino while it travels down the board.

Generating New Tetriminos

The game needs to generate a new tetrimino when the current one is blocked by the board’s bounds or another tetrimino. This new one will logically be the same as the previous; a RangeStream with its events mapped to new positions on the board.

This time, you need to create a new stream to listen to events representing new tetriminos and return a RangeStream that helps to animate it.

So, how can you create a new stream from the events of another? RxDart provides various options for this use case.

One of the options you’ll use is switchMap, it converts events of a stream to new streams using a mapper function. On each new tetrimino event of _playerController.stream, you can add or convert it to a RangeStream which you saw in action earlier.

In lib/player.dart, replace TODO: replace with infiniteAnimatingPlayerWithCompletionStream and the statement after it with the following code snippet:

//TODO: replace with infiniteAnimatingPlayerWithCompletionStreamWithInput
stream: _engine.infiniteAnimatingPlayerWithCompletionStream(),

Next, you’ll take a look at how this stream operates. Open lib/engine.dart and take a look at the implementation of infiniteAnimatingPlayerWithCompletionStream.

//1
return _playerController.stream.switchMap((value) {
  //2
  var _current = value;
  return RangeStream(0, effectiveHeight ~/ extent - 1)
      .interval(const Duration(milliseconds: 500))
      .map(
        (value) => Tetrimino(
          angle: 0.0,
          current: _current.current,
          origin: const Point(0, 0),
          yOffset: value.toDouble(),
          xOffset: 0,
        ),
      )
      .takeWhile(
        (_transformedPiece) => _canTetriminoMove(_transformedPiece),
      )
      //3
      .doOnData((_validTransformedPiece) {
    _current = _validTransformedPiece;
  }).doOnDone(() {
    _onTetriminoSet(_current);
  });
});

Here’s a quick breakdown of the preceeding snippet:

  1. switchMap is a extension function which is used in this case to convert the events of its input stream into new individual streams.
  2. _current is used to store a refrence to the current event so that it can be used in the doOnDone extension to be used for game logic. In particular, it is used to check if the whether or not the stacked tetriminos have reached the top of the board.
  3. Similar to doOnDone, doOnData is another extension function called when an event is consumed by a sink.

From this point on, you’ll now always see a randomized tetrimino. Hurray!

The main source of the generated tetrimino is the switchMap extension function. This function takes an event and maps it into a new stream. In this case, you create a new RangeStream every time the previous one reaches the end of its journey.

Build and run the app to see new tetriminos generated once the previous ones have been blocked:

generating new tetriminos using the second stream controller

Now that the game generates more tetriminos after each round, you’ll focus on user input.

Taking User Input

How do you factor in user input that makes games interesting? The answer is using StreamController from the Dart Stream API. StreamController allows you to create a stream you can add events to and listen to for updates.

It also exposes various pieces of information about the stream itself such as state and number of subscribers. More on them later. For now lets handle the input events using CombineLatestStream.combine2. In lib/player.dart, replace //TODO: replace with infiniteAnimatingPlayerWithCompletionStream and the next line of code with the following:

stream: _engine.infiniteAnimatingPlayerWithCompletionStreamWithInput(),

Look at the implementation for infiniteAnimatingPlayerWithCompletionStreamWithInput in lib/engine.dart:

return _playerController.stream.switchMap((value) {
  var _current = value;
  return CombineLatestStream.combine2<UserInput, int, Tetrimino>(
    _inputController.stream
        .startWith(UserInput(angle: 0, xOffset: 0, yOffset: 0))
        //TODO: add the debounceTime operator
        .map((input) => _offsetUserInput(input, _current)),
    RangeStream(0, effectiveHeight ~/ extent - 1)
        .interval(const Duration(milliseconds: 500)),
    (userInput, yOffset) =>
        _transformedTetrimino(_current, userInput, yOffset),
  )
      .takeWhile(
          (_transformedPiece) => _canTetriminoMove(_transformedPiece))
      .doOnData((_validTransformedPiece) {
    _current = _validTransformedPiece;
  }).doOnDone(() {
    _onTetriminoSet(_current);
  });
});

As you can see, CombineLatestStream.combine2 takes two other streams as input and creates a third stream using the latest events from both streams. Since you need to combine the latest events from _inputController and _playerController, the current tetrimino, CombineLatestStream is perfect for this scenario.

Build and run the app. With the two streams now combined, you can control the falling tetriminos using the left/right buttons and rotate a piece using the rotate button.

an input controller introduced  into the game to  stream input events