Streaming

In this section, we are going to demonstrate how to implement streaming data in our pattern. In addition, we are also discussing Note.keepWhen.

In fact, the way we handle streaming is very similar to data fetching. See Data Fetching

Add highlighted contents to the corresponding files:

lib/src/unit/network_dao/model/param.dart
...

class NetworkDaoCoinsChangesStreamParam extends NetworkDaoParam<String> {
  
  String get key => ids.key.toString();

  final ListModel<String> ids;

  NetworkDaoCoinsChangesStreamParam(List<String> ids)
      : ids = ListModel<String>(ids);
}
lib/src/unit/network_dao/network_dao_paper.dart
...

class NetworkDaoCoinPricesChanging extends NetworkDaoPaper {
  final MapModel<String, double> prices;

  NetworkDaoCoinPricesChanging(this.prices);
}

class NetworkDaoCoinPricesStreamStarting extends NetworkDaoPaper {
  final NetworkDaoCoinsChangesStreamParam param;

  NetworkDaoCoinPricesStreamStarting(this.param);
}

class NetworkDaoCoinPricesStreamCanceling extends NetworkDaoPaper {}
lib/src/unit/network_dao/network_dao_script.dart
...

class NetworkDaoScript extends Script<NetworkDaoPaper, NetworkDaoState> {
  
  void map() => on<NetworkDaoDataFetching>(onDataFetching)
      ?.on<NetworkDaoCoinPricesStreamStarting>(onCoinPricesStreamStarting)
      ?.on<NetworkDaoCoinPricesStreamCanceling>(onCoinPricesStreamCanceling);

  ...

  void onCoinPricesStreamStarting(
    NetworkDaoCoinPricesStreamStarting p,
    NetworkDaoState s,
    SourceVerifier ifFrom,
  ) {
    s.listenToPriceChange(p.param);
  }

  void onCoinPricesStreamCanceling(
    NetworkDaoCoinPricesStreamCanceling p,
    NetworkDaoState s,
    SourceVerifier ifFrom,
  ) {
    s.coinPricesTimer?.cancel();
    s.coinPricesTimer = null;
  }
}
lib/src/unit/network_dao/network_dao.dart
...

import 'dart:math';

...

class NetworkDaoState extends UnitState<NetworkDaoPaper, NetworkDao> {
  ...

  late final coinPrices = <String, double>{};

  Future<void> listenToPriceChange(NetworkDaoCoinsChangesStreamParam param) async {
    try {
      await Future.delayed(const Duration(seconds: 1));
      final response = await rootBundle
          .loadString(
            'assets/data/coins_list.json',
          )
          .timeout(
            const Duration(seconds: 2),
            onTimeout: () => throw NetworkDaoExceptionCode.timeout,
          );

      final json = jsonDecode(response);

      if (json is List<Map<String, dynamic>>) {
        throw NetworkDaoExceptionCode.wrongFormat;
      }

      final coinsList = JsonCoinsList(json, key: param.key);

      for (var i = 0; i < coinsList.length; i++) {
        final coin = coinsList.item(i);
        if (coin == null) continue;

        final id = coin.id();
        if (id == null) continue;

        final price = coin.currentPrice();
        if (price == null) continue;

        coinPrices[id] = price.toDouble();
      }

      coinPricesTimer = Timer.periodic(Duration(milliseconds: 500), (_) {
        final random = Random();

        for (final key in coinPrices.keys) {
          final change = random.nextInt(10) / 100;
          final direction = random.nextBool() ? 1 : -1;
          final price = coinPrices[key]!;

          coinPrices[key] = price * (1 + change * direction);
        }

        report(NetworkDaoCoinPricesChanging(MapModel(coinPrices)));
      });
    } on NetworkDaoExceptionCode catch (e) {
      print(e.toString());
    }
  }
}
lib/src/unit_widget/app/app_paper.dart
...

class AppCoinPricesChanging extends AppPaper {
  final MapModel<String, double> prices;

  AppCoinPricesChanging(this.prices);
}
lib/src/unit_widget/app/app_script.dart
...

class AppScript extends Script<AppPaper, AppState> {
    
    void map() => ...
      ?.on<AppCoinPricesChanging>(onCoinPricesChanging);

    ...

  Future<void> onLoginSelection(
    AppLoginSelection p,
    AppState s,
    SourceVerifier ifFrom,
  ) async {
        ...

    s.networkDao.process(
      NetworkDaoCoinPricesStreamStarting(
        NetworkDaoCoinsChangesStreamParam(
          [
            'bitcoin',
            'ethereum',
            'ripple',
            'tether',
            'solana',
            'binancecoin',
            'usd-coin',
            'dogecoin',
            'cardano',
            'staked-ether',
          ],
        ),
      ),
    );
  }

  Future<void> onSignOutSelection(
    AppSignOutSelection p,
    AppState s,
    SourceVerifier ifFrom,
  ) async {

    ...

    s.networkDao.process(NetworkDaoCoinPricesStreamCanceling());
  }

  void onCoinPricesChanging(
    AppCoinPricesChanging p,
    AppState s,
    SourceVerifier ifFrom,
  ) {
    s.coinsListing.process(CoinsListingPriceChanges(p.prices));
  }
}
lib/src/unit_widget/app/app.dart
...

class AppState extends UnitWidgetState<AppPaper, App> {
    ...

    PaperListener<NetworkDaoPaper, AppPaper> get networkDaoListener => reporter(
        (r) => r
            ...
            ?.on<NetworkDaoCoinPricesChanging>(
              (p) => AppCoinPricesChanging(p.prices),
            ),
    );
}
lib/src/unit_widget/coin_listing/coins_listing_paper.dart
...

class CoinsListingPriceChanges extends CoinsListingPaper {
  final MapModel<String, double> prices;

  CoinsListingPriceChanges(this.prices);
}
lib/src/unit_widget/coin_listing/coins_listing_script.dart
...

class CoinsListingScript extends Script<CoinsListingPaper, CoinsListingState> {
  
  void map() =>
      ...
      ?.on<CoinsListingPriceChanges>(onPriceChanges);
  ...

  void onPriceChanges(
    CoinsListingPriceChanges p,
    CoinsListingState s,
    SourceVerifier ifFrom,
  ) {
    final coins = <CoinOverview>[];

    s.coins?.forEach((e) {
      final price = p.prices[e.key];
      if (price == null) return;
      coins.add(e.copyWith(currentPrice: Figure(price)));
    });

    s.coins = coins;

    s.render();
  }
}
lib/src/unit_widget/coin_listing/coins_listing.dart
...

import '../../model/figure.dart';

...

Run:

Breakdown - Streaming

NetworkDao

  • The streaming is set up in listenToPriceChange. You can try debugging the body of the function to understand the implementation. Please take note that the function is merely for simulating streaming from WebSocket.

  • As mentioned earlier, think about streaming data as quite similar to fetching data: an imagined UI is displayed to the "back-end" for them to submit the data. One different thing is the imagined UI will be dismissed, popped, or destroyed after fetching data; meanwhile the UI will still be active while streaming data changes.

  • And because of the above, there should be a way to "dismiss" the imagined UI. In particular, it is done by NetworkDaoCoinPricesStreamCanceling:

lib/src/unit/network_dao/network_dao_script.dart
 void onCoinPricesStreamCanceling(
    NetworkDaoCoinPricesStreamCanceling p,
    NetworkDaoState s,
    SourceVerifier ifFrom,
  ) {
    s.coinPricesTimer?.cancel();
    s.coinPricesTimer = null;
  }
  • Whenever the prices are simulated to update, NetworkDao reports NetworkDaoCoinPricesChanging to carry the changes of prices to its parent - App until the stream is closed by NetworkDaoCoinPricesStreamCanceling.

App

  • After successfully granting authentication for the user by AppLoginSelection, App commands NetworkDao to start streaming coins' price changes.

  • Whenever App receives NetworkDaoCoinPricesChanging from NetworkDao, it processes AppCoinPricesChanging, in which it commands CoinsListing to update the price via CoinsListingPriceChanges.

lib/src/unit_widget/app/app_script.dart
void onCoinPricesChanging(
    AppCoinPricesChanging p,
    AppState s,
    SourceVerifier ifFrom,
  ) {
    s.coinsListing.process(CoinsListingPriceChanges(p.prices));
  }

Note (continued)

Now, with the current code base, there should be a bug.

  • Restart the app → Tap the profile icon in CoinsListing → Enter authentication info and log in.

The app authorizes and brings you back to CoinsListing as expected after implementing Note.

  • Tap the profile icon → Log out

The app just logs you out and does nothing, which does not meet the requirement stated in the Note section.

This is because whenever a Unit processes a Paper, it will dispose its Notes. In our case specifically, whenever App processes AppCoinPricesChanging.

In order to maintain the value, AppState.tabAfterLoginNote should be constructed with a declaration of keepWhen:

lib/src/unit_widget/app/app.dart
...

class AppState extends UnitWidgetState<AppPaper, App> {
...

  late final tabAfterLoginNote = Note<int>(this, keepWhen: (p) {
    return p is AppCoinPricesChanging;
  });

...
}