Streaming
In this section, we are going to demonstrate how to implement streaming data in our
pattern. In addition, we are also discussing Note.keepWhen.
In fact, the way we handle streaming is very similar to data fetching. See Data Fetching
Add highlighted contents to the corresponding files:
...
class NetworkDaoCoinsChangesStreamParam extends NetworkDaoParam<String> {
String get key => ids.key.toString();
final ListModel<String> ids;
NetworkDaoCoinsChangesStreamParam(List<String> ids)
: ids = ListModel<String>(ids);
}
...
class NetworkDaoCoinPricesChanging extends NetworkDaoPaper {
final MapModel<String, double> prices;
NetworkDaoCoinPricesChanging(this.prices);
}
class NetworkDaoCoinPricesStreamStarting extends NetworkDaoPaper {
final NetworkDaoCoinsChangesStreamParam param;
NetworkDaoCoinPricesStreamStarting(this.param);
}
class NetworkDaoCoinPricesStreamCanceling extends NetworkDaoPaper {}
...
class NetworkDaoScript extends Script<NetworkDaoPaper, NetworkDaoState> {
void map() => on<NetworkDaoDataFetching>(onDataFetching)
?.on<NetworkDaoCoinPricesStreamStarting>(onCoinPricesStreamStarting)
?.on<NetworkDaoCoinPricesStreamCanceling>(onCoinPricesStreamCanceling);
...
void onCoinPricesStreamStarting(
NetworkDaoCoinPricesStreamStarting p,
NetworkDaoState s,
SourceVerifier ifFrom,
) {
s.listenToPriceChange(p.param);
}
void onCoinPricesStreamCanceling(
NetworkDaoCoinPricesStreamCanceling p,
NetworkDaoState s,
SourceVerifier ifFrom,
) {
s.coinPricesTimer?.cancel();
s.coinPricesTimer = null;
}
}
...
import 'dart:math';
...
class NetworkDaoState extends UnitState<NetworkDaoPaper, NetworkDao> {
...
late final coinPrices = <String, double>{};
Future<void> listenToPriceChange(NetworkDaoCoinsChangesStreamParam param) async {
try {
await Future.delayed(const Duration(seconds: 1));
final response = await rootBundle
.loadString(
'assets/data/coins_list.json',
)
.timeout(
const Duration(seconds: 2),
onTimeout: () => throw NetworkDaoExceptionCode.timeout,
);
final json = jsonDecode(response);
if (json is List<Map<String, dynamic>>) {
throw NetworkDaoExceptionCode.wrongFormat;
}
final coinsList = JsonCoinsList(json, key: param.key);
for (var i = 0; i < coinsList.length; i++) {
final coin = coinsList.item(i);
if (coin == null) continue;
final id = coin.id();
if (id == null) continue;
final price = coin.currentPrice();
if (price == null) continue;
coinPrices[id] = price.toDouble();
}
coinPricesTimer = Timer.periodic(Duration(milliseconds: 500), (_) {
final random = Random();
for (final key in coinPrices.keys) {
final change = random.nextInt(10) / 100;
final direction = random.nextBool() ? 1 : -1;
final price = coinPrices[key]!;
coinPrices[key] = price * (1 + change * direction);
}
report(NetworkDaoCoinPricesChanging(MapModel(coinPrices)));
});
} on NetworkDaoExceptionCode catch (e) {
print(e.toString());
}
}
}
...
class AppCoinPricesChanging extends AppPaper {
final MapModel<String, double> prices;
AppCoinPricesChanging(this.prices);
}
...
class AppScript extends Script<AppPaper, AppState> {
void map() => ...
?.on<AppCoinPricesChanging>(onCoinPricesChanging);
...
Future<void> onLoginSelection(
AppLoginSelection p,
AppState s,
SourceVerifier ifFrom,
) async {
...
s.networkDao.process(
NetworkDaoCoinPricesStreamStarting(
NetworkDaoCoinsChangesStreamParam(
[
'bitcoin',
'ethereum',
'ripple',
'tether',
'solana',
'binancecoin',
'usd-coin',
'dogecoin',
'cardano',
'staked-ether',
],
),
),
);
}
Future<void> onSignOutSelection(
AppSignOutSelection p,
AppState s,
SourceVerifier ifFrom,
) async {
...
s.networkDao.process(NetworkDaoCoinPricesStreamCanceling());
}
void onCoinPricesChanging(
AppCoinPricesChanging p,
AppState s,
SourceVerifier ifFrom,
) {
s.coinsListing.process(CoinsListingPriceChanges(p.prices));
}
}
...
class AppState extends UnitWidgetState<AppPaper, App> {
...
PaperListener<NetworkDaoPaper, AppPaper> get networkDaoListener => reporter(
(r) => r
...
?.on<NetworkDaoCoinPricesChanging>(
(p) => AppCoinPricesChanging(p.prices),
),
);
}
...
class CoinsListingPriceChanges extends CoinsListingPaper {
final MapModel<String, double> prices;
CoinsListingPriceChanges(this.prices);
}
...
class CoinsListingScript extends Script<CoinsListingPaper, CoinsListingState> {
void map() =>
...
?.on<CoinsListingPriceChanges>(onPriceChanges);
...
void onPriceChanges(
CoinsListingPriceChanges p,
CoinsListingState s,
SourceVerifier ifFrom,
) {
final coins = <CoinOverview>[];
s.coins?.forEach((e) {
final price = p.prices[e.key];
if (price == null) return;
coins.add(e.copyWith(currentPrice: Figure(price)));
});
s.coins = coins;
s.render();
}
}
...
import '../../model/figure.dart';
...
Run:
Breakdown - Streaming
NetworkDao
-
The streaming is set up in
listenToPriceChange. You can try debugging the body of the function to understand the implementation. Please take note that the function is merely for simulating streaming from WebSocket. -
As mentioned earlier, think about streaming data as quite similar to fetching data: an imagined UI is displayed to the "back-end" for them to submit the data. One different thing is the imagined UI will be dismissed, popped, or destroyed after fetching data; meanwhile the UI will still be active while streaming data changes.
-
And because of the above, there should be a way to "dismiss" the imagined UI. In particular, it is done by
NetworkDaoCoinPricesStreamCanceling:
void onCoinPricesStreamCanceling(
NetworkDaoCoinPricesStreamCanceling p,
NetworkDaoState s,
SourceVerifier ifFrom,
) {
s.coinPricesTimer?.cancel();
s.coinPricesTimer = null;
}
- Whenever the prices are simulated to update,
NetworkDaoreportsNetworkDaoCoinPricesChangingto carry the changes of prices to its parent -Appuntil the stream is closed byNetworkDaoCoinPricesStreamCanceling.
App
-
After successfully granting authentication for the user by
AppLoginSelection,AppcommandsNetworkDaoto start streaming coins' price changes. -
Whenever
AppreceivesNetworkDaoCoinPricesChangingfromNetworkDao, it processesAppCoinPricesChanging, in which it commandsCoinsListingto update the price viaCoinsListingPriceChanges.
void onCoinPricesChanging(
AppCoinPricesChanging p,
AppState s,
SourceVerifier ifFrom,
) {
s.coinsListing.process(CoinsListingPriceChanges(p.prices));
}
Note (continued)
Now, with the current code base, there should be a bug.
- Restart the app → Tap the profile icon in
CoinsListing→ Enter authentication info and log in.
The app authorizes and brings you back to CoinsListing as expected after
implementing Note.
- Tap the profile icon → Log out
The app just logs you out and does nothing, which does not meet the requirement stated in the Note section.
This is because whenever a Unit processes a Paper, it will dispose its
Notes. In our case specifically, whenever App processes
AppCoinPricesChanging.
In order to maintain the value, AppState.tabAfterLoginNote should be
constructed with a declaration of keepWhen:
...
class AppState extends UnitWidgetState<AppPaper, App> {
...
late final tabAfterLoginNote = Note<int>(this, keepWhen: (p) {
return p is AppCoinPricesChanging;
});
...
}