Make profile loading concurrent across non-dependent requests

This commit is contained in:
Aidan 2024-10-23 12:20:17 -07:00
parent f25f27348b
commit d15dcb3725

View File

@ -23,6 +23,7 @@ import com.futo.polycentric.core.getClaimIfValid
import com.futo.polycentric.core.getValidClaims import com.futo.polycentric.core.getValidClaims
import com.google.protobuf.ByteString import com.google.protobuf.ByteString
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.Deferred import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async import kotlinx.coroutines.async
@ -56,83 +57,102 @@ class PolycentricCache {
private val _taskGetProfile = BatchedTaskHandler<PublicKey, CachedPolycentricProfile>(_scope, private val _taskGetProfile = BatchedTaskHandler<PublicKey, CachedPolycentricProfile>(_scope,
{ system -> { system ->
val signedEventsList = ApiMethods.getQueryLatest( coroutineScope {
SERVER, // Launch concurrent requests for getQueryLatest and getQueryIndex
system.toProto(), val signedEventsDeferred = async {
listOf( ApiMethods.getQueryLatest(
ContentType.BANNER.value, SERVER,
ContentType.AVATAR.value, system.toProto(),
ContentType.USERNAME.value, listOf(
ContentType.DESCRIPTION.value, ContentType.BANNER.value,
ContentType.STORE.value, ContentType.AVATAR.value,
ContentType.SERVER.value, ContentType.USERNAME.value,
ContentType.STORE_DATA.value, ContentType.DESCRIPTION.value,
ContentType.PROMOTION_BANNER.value, ContentType.STORE.value,
ContentType.PROMOTION.value, ContentType.SERVER.value,
ContentType.MEMBERSHIP_URLS.value, ContentType.STORE_DATA.value,
ContentType.DONATION_DESTINATIONS.value ContentType.PROMOTION_BANNER.value,
) ContentType.PROMOTION.value,
).eventsList.map { e -> SignedEvent.fromProto(e) }; ContentType.MEMBERSHIP_URLS.value,
ContentType.DONATION_DESTINATIONS.value
val signedProfileEvents = signedEventsList.groupBy { e -> e.event.contentType } )
.map { (_, events) -> events.maxBy { it.event.unixMilliseconds ?: 0 } }; ).eventsList.map { e -> SignedEvent.fromProto(e) }
val storageSystemState = StorageTypeSystemState.create()
for (signedEvent in signedProfileEvents) {
storageSystemState.update(signedEvent.event)
}
val signedClaimEvents = ApiMethods.getQueryIndex(
SERVER,
system.toProto(),
ContentType.CLAIM.value,
limit = 200
).eventsList.map { e -> SignedEvent.fromProto(e) };
val ownedClaims: ArrayList<OwnedClaim> = arrayListOf()
for (signedEvent in signedClaimEvents) {
if (signedEvent.event.contentType != ContentType.CLAIM.value) {
continue;
} }
val response = ApiMethods.getQueryReferences( val signedClaimEventsDeferred = async {
SERVER, ApiMethods.getQueryIndex(
Protocol.Reference.newBuilder() SERVER,
.setReference(signedEvent.toPointer().toProto().toByteString()) system.toProto(),
.setReferenceType(2) ContentType.CLAIM.value,
.build(), limit = 200
null, ).eventsList.map { e -> SignedEvent.fromProto(e) }
Protocol.QueryReferencesRequestEvents.newBuilder()
.setFromType(ContentType.VOUCH.value)
.build()
);
val ownedClaim = response.itemsList.map { SignedEvent.fromProto(it.event) }.getClaimIfValid(signedEvent);
if (ownedClaim != null) {
ownedClaims.add(ownedClaim);
} }
}
Logger.i(TAG, "Retrieved profile (ownedClaims = $ownedClaims)"); // Await both requests concurrently
val systemState = SystemState.fromStorageTypeSystemState(storageSystemState); val signedEventsList = signedEventsDeferred.await()
return@BatchedTaskHandler CachedPolycentricProfile(PolycentricProfile(system, systemState, ownedClaims)); val signedClaimEvents = signedClaimEventsDeferred.await()
// Process profile events
val signedProfileEvents = signedEventsList
.groupBy { it.event.contentType }
.map { (_, events) -> events.maxByOrNull { it.event.unixMilliseconds ?: 0 } }
.filterNotNull()
val storageSystemState = StorageTypeSystemState.create().apply {
signedProfileEvents.forEach { update(it.event) }
}
// Launch concurrent requests for getQueryReferences
val ownedClaimsDeferred = signedClaimEvents.mapNotNull { signedEvent ->
if (signedEvent.event.contentType != ContentType.CLAIM.value) {
null
} else {
async {
try {
val response = ApiMethods.getQueryReferences(
SERVER,
Protocol.Reference.newBuilder()
.setReference(signedEvent.toPointer().toProto().toByteString())
.setReferenceType(2)
.build(),
null,
Protocol.QueryReferencesRequestEvents.newBuilder()
.setFromType(ContentType.VOUCH.value)
.build()
)
response.itemsList
.map { SignedEvent.fromProto(it.event) }
.getClaimIfValid(signedEvent)
} catch (e: Exception) {
Logger.e(TAG, "Failed to get query references for ${signedEvent.toPointer()}", e)
null
}
}
}
}.filterNotNull()
val ownedClaims = ownedClaimsDeferred.mapNotNull { it.await() }.toCollection(ArrayList())
Logger.i(TAG, "Retrieved profile (ownedClaims = $ownedClaims)")
val systemState = SystemState.fromStorageTypeSystemState(storageSystemState)
CachedPolycentricProfile(PolycentricProfile(system, systemState, ownedClaims))
}
}, },
{ system -> return@BatchedTaskHandler getCachedProfile(system); }, { system -> return@BatchedTaskHandler getCachedProfile(system); },
{ system, result -> { system, result ->
synchronized(_cache) { synchronized(_cache) {
_profileCache[system] = result; _profileCache[system] = result
if (result.profile != null) { result.profile?.ownedClaims?.forEach { claim ->
for (claim in result.profile.ownedClaims) { claim.claim.resolveChannelUrls().forEach { url ->
val urls = claim.claim.resolveChannelUrls(); _profileUrlCache.map[url] = result
for (url in urls)
_profileUrlCache.map[url] = result;
} }
} }
_profileUrlCache.save(); _profileUrlCache.save()
} }
}); })
private val _batchTaskGetClaims = BatchedTaskHandler<PlatformID, CachedOwnedClaims>(_scope, private val _batchTaskGetClaims = BatchedTaskHandler<PlatformID, CachedOwnedClaims>(_scope,
{ id -> { id ->