Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mobile): use efficient sync #8842

Merged
merged 9 commits into from
May 14, 2024
4 changes: 0 additions & 4 deletions mobile/lib/modules/home/views/home_page.dart
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ class HomePage extends HookConsumerWidget {
() {
ref.read(websocketProvider.notifier).connect();
Future(() => ref.read(assetProvider.notifier).getAllAsset());
ref.read(assetProvider.notifier).getPartnerAssets();
ref.read(albumProvider.notifier).getAllAlbums();
ref.read(sharedAlbumProvider.notifier).getAllSharedAlbums();
ref.read(serverInfoProvider.notifier).getServerInfo();
Expand Down Expand Up @@ -85,9 +84,6 @@ class HomePage extends HookConsumerWidget {
Future<void> refreshAssets() async {
final fullRefresh = refreshCount.value > 0;
await ref.read(assetProvider.notifier).getAllAsset(clear: fullRefresh);
if (timelineUsers.length > 1) {
await ref.read(assetProvider.notifier).getPartnerAssets();
}
if (fullRefresh) {
// refresh was forced: user requested another refresh within 2 seconds
refreshCount.value = 0;
Expand Down
5 changes: 2 additions & 3 deletions mobile/lib/modules/partner/views/partner_detail_page.dart
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class PartnerDetailPage extends HookConsumerWidget {

useEffect(
() {
ref.read(assetProvider.notifier).getPartnerAssets(partner);
ref.read(assetProvider.notifier).getAllAsset();
return null;
},
[],
Expand Down Expand Up @@ -78,8 +78,7 @@ class PartnerDetailPage extends HookConsumerWidget {
),
body: MultiselectGrid(
renderListProvider: assetsProvider(partner.isarId),
onRefresh: () =>
ref.read(assetProvider.notifier).getPartnerAssets(partner),
onRefresh: () => ref.read(assetProvider.notifier).getAllAsset(),
deleteEnabled: false,
favoriteEnabled: false,
),
Expand Down
2 changes: 1 addition & 1 deletion mobile/lib/routing/tab_navigation_observer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class TabNavigationObserver extends AutoRouterObserver {

if (route.name == 'SharingRoute') {
ref.read(sharedAlbumProvider.notifier).getAllSharedAlbums();
ref.read(assetProvider.notifier).getPartnerAssets();
Future(() => ref.read(assetProvider.notifier).getAllAsset());
}

if (route.name == 'LibraryRoute') {
Expand Down
3 changes: 1 addition & 2 deletions mobile/lib/shared/providers/app_state.provider.dart
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,10 @@ class AppStateNotiifer extends StateNotifier<AppStateEnum> {
switch (_ref.read(tabProvider)) {
case TabEnum.home:
_ref.read(assetProvider.notifier).getAllAsset();
_ref.read(assetProvider.notifier).getPartnerAssets();
case TabEnum.search:
// nothing to do
case TabEnum.sharing:
_ref.read(assetProvider.notifier).getPartnerAssets();
_ref.read(assetProvider.notifier).getAllAsset();
_ref.read(sharedAlbumProvider.notifier).getAllSharedAlbums();
case TabEnum.library:
_ref.read(albumProvider.notifier).getAllAlbums();
Expand Down
28 changes: 4 additions & 24 deletions mobile/lib/shared/providers/asset.provider.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/modules/album/services/album.service.dart';
import 'package:immich_mobile/shared/models/exif_info.dart';
import 'package:immich_mobile/shared/models/store.dart';
import 'package:immich_mobile/shared/models/user.dart';
import 'package:immich_mobile/shared/providers/db.provider.dart';
import 'package:immich_mobile/shared/providers/user.provider.dart';
import 'package:immich_mobile/shared/services/asset.service.dart';
Expand All @@ -26,7 +25,6 @@ class AssetNotifier extends StateNotifier<bool> {
final log = Logger('AssetNotifier');
bool _getAllAssetInProgress = false;
bool _deleteInProgress = false;
bool _getPartnerAssetsInProgress = false;

AssetNotifier(
this._assetService,
Expand All @@ -49,9 +47,12 @@ class AssetNotifier extends StateNotifier<bool> {
await clearAssetsAndAlbums(_db);
log.info("Manual refresh requested, cleared assets and albums from db");
}
final bool changedUsers = await _userService.refreshUsers();
final bool newRemote = await _assetService.refreshRemoteAssets();
final bool newLocal = await _albumService.refreshDeviceAlbums();
debugPrint("newRemote: $newRemote, newLocal: $newLocal");
debugPrint(
"changedUsers: $changedUsers, newRemote: $newRemote, newLocal: $newLocal",
);

log.info("Load assets: ${stopwatch.elapsedMilliseconds}ms");
} finally {
Expand All @@ -60,27 +61,6 @@ class AssetNotifier extends StateNotifier<bool> {
}
}

Future<void> getPartnerAssets([User? partner]) async {
if (_getPartnerAssetsInProgress) return;
try {
final stopwatch = Stopwatch()..start();
_getPartnerAssetsInProgress = true;
if (partner == null) {
await _userService.refreshUsers();
final List<User> partners =
await _db.users.filter().isPartnerSharedWithEqualTo(true).findAll();
for (User u in partners) {
await _assetService.refreshRemoteAssets(u);
}
} else {
await _assetService.refreshRemoteAssets(partner);
}
log.info("Load partner assets: ${stopwatch.elapsedMilliseconds}ms");
} finally {
_getPartnerAssetsInProgress = false;
}
}

Future<void> clearAllAsset() {
return clearAssetsAndAlbums(_db);
}
Expand Down
2 changes: 2 additions & 0 deletions mobile/lib/shared/services/api.service.dart
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class ApiService {
late PersonApi personApi;
late AuditApi auditApi;
late SharedLinkApi sharedLinkApi;
late SyncApi syncApi;
late SystemConfigApi systemConfigApi;
late ActivityApi activityApi;
late DownloadApi downloadApi;
Expand Down Expand Up @@ -53,6 +54,7 @@ class ApiService {
personApi = PersonApi(_apiClient);
auditApi = AuditApi(_apiClient);
sharedLinkApi = SharedLinkApi(_apiClient);
syncApi = SyncApi(_apiClient);
systemConfigApi = SystemConfigApi(_apiClient);
activityApi = ActivityApi(_apiClient);
downloadApi = DownloadApi(_apiClient);
Expand Down
80 changes: 43 additions & 37 deletions mobile/lib/shared/services/asset.service.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ import 'dart:async';
import 'package:flutter/material.dart';
import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/shared/models/asset.dart';
import 'package:immich_mobile/shared/models/etag.dart';
import 'package:immich_mobile/shared/models/exif_info.dart';
import 'package:immich_mobile/shared/models/store.dart';
import 'package:immich_mobile/shared/models/user.dart';
import 'package:immich_mobile/shared/providers/api.provider.dart';
import 'package:immich_mobile/shared/providers/db.provider.dart';
import 'package:immich_mobile/shared/services/api.service.dart';
import 'package:immich_mobile/shared/services/sync.service.dart';
import 'package:immich_mobile/shared/services/user.service.dart';
import 'package:isar/isar.dart';
import 'package:logging/logging.dart';
import 'package:maplibre_gl/maplibre_gl.dart';
Expand All @@ -21,46 +22,57 @@ final assetServiceProvider = Provider(
(ref) => AssetService(
ref.watch(apiServiceProvider),
ref.watch(syncServiceProvider),
ref.watch(userServiceProvider),
ref.watch(dbProvider),
),
);

class AssetService {
final ApiService _apiService;
final SyncService _syncService;
final UserService _userService;
final log = Logger('AssetService');
final Isar _db;

AssetService(
this._apiService,
this._syncService,
this._userService,
this._db,
);

/// Checks the server for updated assets and updates the local database if
/// required. Returns `true` if there were any changes.
Future<bool> refreshRemoteAssets([User? user]) async {
user ??= Store.get<User>(StoreKey.currentUser);
Future<bool> refreshRemoteAssets() async {
final syncedUserIds = await _db.eTags.where().idProperty().findAll();
final List<User> syncedUsers = syncedUserIds.isEmpty
? []
: await _db.users
.where()
.anyOf(syncedUserIds, (q, id) => q.idEqualTo(id))
.findAll();
final Stopwatch sw = Stopwatch()..start();
final bool changes = await _syncService.syncRemoteAssetsToDb(
user,
_getRemoteAssetChanges,
_getRemoteAssets,
users: syncedUsers,
getChangedAssets: _getRemoteAssetChanges,
loadAssets: _getRemoteAssets,
refreshUsers: _userService.getUsersFromServer,
);
debugPrint("refreshRemoteAssets full took ${sw.elapsedMilliseconds}ms");
return changes;
}

/// Returns `(null, null)` if changes are invalid -> requires full sync
Future<(List<Asset>? toUpsert, List<String>? toDelete)>
_getRemoteAssetChanges(User user, DateTime since) async {
final deleted = await _apiService.auditApi
.getAuditDeletes(since, EntityType.ASSET, userId: user.id);
if (deleted == null || deleted.needsFullSync) return (null, null);
final assetDto = await _apiService.assetApi
.getAllAssets(userId: user.id, updatedAfter: since);
if (assetDto == null) return (null, null);
return (assetDto.map(Asset.remote).toList(), deleted.ids);
_getRemoteAssetChanges(List<User> users, DateTime since) async {
final dto = AssetDeltaSyncDto(
updatedAfter: since,
userIds: users.map((e) => e.id).toList(),
);
final changes = await _apiService.syncApi.getDeltaSync(dto);
return changes == null || changes.needsFullSync
? (null, null)
: (changes.upserted.map(Asset.remote).toList(), changes.deleted);
}

/// Returns the list of people of the given asset id.
Expand All @@ -85,38 +97,32 @@ class AssetService {
}

/// Returns `null` if the server state did not change, else list of assets
Future<List<Asset>?> _getRemoteAssets(User user) async {
Future<List<Asset>?> _getRemoteAssets(User user, DateTime until) async {
const int chunkSize = 10000;
try {
final DateTime now = DateTime.now().toUtc();
final List<Asset> allAssets = [];
for (int i = 0;; i += chunkSize) {
final List<AssetResponseDto>? assets =
await _apiService.assetApi.getAllAssets(
DateTime? lastCreationDate;
String? lastId;
// will break on error or once all assets are loaded
while (true) {
final dto = AssetFullSyncDto(
limit: chunkSize,
updatedUntil: until,
lastId: lastId,
lastCreationDate: lastCreationDate,
userId: user.id,
// updatedBefore is important! without it we could
// a) get the same Asset multiple times in different versions (when
// the asset is modified while the chunks are loaded from the server)
// b) miss assets when new assets are inserted in between the calls
updatedBefore: now,
skip: i,
take: chunkSize,
);
if (assets == null) {
return null;
}
final List<AssetResponseDto>? assets =
await _apiService.syncApi.getFullSyncForUser(dto);
if (assets == null) return null;
allAssets.addAll(assets.map(Asset.remote));
if (assets.length < chunkSize) {
break;
}
if (assets.length < chunkSize) break;
lastCreationDate = assets.last.fileCreatedAt;
lastId = assets.last.id;
}
return allAssets;
} catch (error, stack) {
log.severe(
'Error while getting remote assets',
error,
stack,
);
log.severe('Error while getting remote assets', error, stack);
return null;
}
}
Expand Down