From d15dcb3725e3cb4b832797eb11f34deb9934ee9b Mon Sep 17 00:00:00 2001 From: Aidan Date: Wed, 23 Oct 2024 12:20:17 -0700 Subject: [PATCH] Make profile loading concurrent across non-dependent requests --- .../polycentric/PolycentricCache.kt | 148 ++++++++++-------- 1 file changed, 84 insertions(+), 64 deletions(-) diff --git a/app/src/main/java/com/futo/platformplayer/polycentric/PolycentricCache.kt b/app/src/main/java/com/futo/platformplayer/polycentric/PolycentricCache.kt index abe4ca8e..abd58031 100644 --- a/app/src/main/java/com/futo/platformplayer/polycentric/PolycentricCache.kt +++ b/app/src/main/java/com/futo/platformplayer/polycentric/PolycentricCache.kt @@ -23,6 +23,7 @@ import com.futo.polycentric.core.getClaimIfValid import com.futo.polycentric.core.getValidClaims import com.google.protobuf.ByteString import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.Deferred import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.async @@ -56,83 +57,102 @@ class PolycentricCache { private val _taskGetProfile = BatchedTaskHandler(_scope, { system -> - val signedEventsList = ApiMethods.getQueryLatest( - SERVER, - system.toProto(), - listOf( - ContentType.BANNER.value, - ContentType.AVATAR.value, - ContentType.USERNAME.value, - ContentType.DESCRIPTION.value, - ContentType.STORE.value, - ContentType.SERVER.value, - ContentType.STORE_DATA.value, - ContentType.PROMOTION_BANNER.value, - ContentType.PROMOTION.value, - ContentType.MEMBERSHIP_URLS.value, - ContentType.DONATION_DESTINATIONS.value - ) - ).eventsList.map { e -> SignedEvent.fromProto(e) }; - - val signedProfileEvents = signedEventsList.groupBy { e -> e.event.contentType } - .map { (_, events) -> events.maxBy { it.event.unixMilliseconds ?: 0 } }; - - val storageSystemState = StorageTypeSystemState.create() - for (signedEvent in signedProfileEvents) { - storageSystemState.update(signedEvent.event) - } - - val signedClaimEvents = ApiMethods.getQueryIndex( - SERVER, - system.toProto(), - ContentType.CLAIM.value, - limit = 200 - ).eventsList.map { e -> SignedEvent.fromProto(e) }; - - val ownedClaims: ArrayList = arrayListOf() - for (signedEvent in signedClaimEvents) { - if (signedEvent.event.contentType != ContentType.CLAIM.value) { - continue; + coroutineScope { + // Launch concurrent requests for getQueryLatest and getQueryIndex + val signedEventsDeferred = async { + ApiMethods.getQueryLatest( + SERVER, + system.toProto(), + listOf( + ContentType.BANNER.value, + ContentType.AVATAR.value, + ContentType.USERNAME.value, + ContentType.DESCRIPTION.value, + ContentType.STORE.value, + ContentType.SERVER.value, + ContentType.STORE_DATA.value, + ContentType.PROMOTION_BANNER.value, + ContentType.PROMOTION.value, + ContentType.MEMBERSHIP_URLS.value, + ContentType.DONATION_DESTINATIONS.value + ) + ).eventsList.map { e -> SignedEvent.fromProto(e) } } - val response = ApiMethods.getQueryReferences( - SERVER, - Protocol.Reference.newBuilder() - .setReference(signedEvent.toPointer().toProto().toByteString()) - .setReferenceType(2) - .build(), - null, - Protocol.QueryReferencesRequestEvents.newBuilder() - .setFromType(ContentType.VOUCH.value) - .build() - ); - - val ownedClaim = response.itemsList.map { SignedEvent.fromProto(it.event) }.getClaimIfValid(signedEvent); - if (ownedClaim != null) { - ownedClaims.add(ownedClaim); + val signedClaimEventsDeferred = async { + ApiMethods.getQueryIndex( + SERVER, + system.toProto(), + ContentType.CLAIM.value, + limit = 200 + ).eventsList.map { e -> SignedEvent.fromProto(e) } } - } - Logger.i(TAG, "Retrieved profile (ownedClaims = $ownedClaims)"); - val systemState = SystemState.fromStorageTypeSystemState(storageSystemState); - return@BatchedTaskHandler CachedPolycentricProfile(PolycentricProfile(system, systemState, ownedClaims)); + // Await both requests concurrently + val signedEventsList = signedEventsDeferred.await() + val signedClaimEvents = signedClaimEventsDeferred.await() + + // Process profile events + val signedProfileEvents = signedEventsList + .groupBy { it.event.contentType } + .map { (_, events) -> events.maxByOrNull { it.event.unixMilliseconds ?: 0 } } + .filterNotNull() + + val storageSystemState = StorageTypeSystemState.create().apply { + signedProfileEvents.forEach { update(it.event) } + } + + // Launch concurrent requests for getQueryReferences + val ownedClaimsDeferred = signedClaimEvents.mapNotNull { signedEvent -> + if (signedEvent.event.contentType != ContentType.CLAIM.value) { + null + } else { + async { + try { + val response = ApiMethods.getQueryReferences( + SERVER, + Protocol.Reference.newBuilder() + .setReference(signedEvent.toPointer().toProto().toByteString()) + .setReferenceType(2) + .build(), + null, + Protocol.QueryReferencesRequestEvents.newBuilder() + .setFromType(ContentType.VOUCH.value) + .build() + ) + response.itemsList + .map { SignedEvent.fromProto(it.event) } + .getClaimIfValid(signedEvent) + } catch (e: Exception) { + Logger.e(TAG, "Failed to get query references for ${signedEvent.toPointer()}", e) + null + } + } + } + }.filterNotNull() + + val ownedClaims = ownedClaimsDeferred.mapNotNull { it.await() }.toCollection(ArrayList()) + + Logger.i(TAG, "Retrieved profile (ownedClaims = $ownedClaims)") + + val systemState = SystemState.fromStorageTypeSystemState(storageSystemState) + CachedPolycentricProfile(PolycentricProfile(system, systemState, ownedClaims)) + } }, { system -> return@BatchedTaskHandler getCachedProfile(system); }, { system, result -> synchronized(_cache) { - _profileCache[system] = result; + _profileCache[system] = result - if (result.profile != null) { - for (claim in result.profile.ownedClaims) { - val urls = claim.claim.resolveChannelUrls(); - for (url in urls) - _profileUrlCache.map[url] = result; + result.profile?.ownedClaims?.forEach { claim -> + claim.claim.resolveChannelUrls().forEach { url -> + _profileUrlCache.map[url] = result } } - _profileUrlCache.save(); + _profileUrlCache.save() } - }); + }) private val _batchTaskGetClaims = BatchedTaskHandler(_scope, { id ->