History sync support

This commit is contained in:
Kelvin 2024-10-01 14:30:33 +02:00
parent f5d9b2ba41
commit 790331e798
9 changed files with 171 additions and 9 deletions

View File

@ -4,6 +4,6 @@ import kotlinx.serialization.json.Json
class Serializer { class Serializer {
companion object { companion object {
val json = Json { ignoreUnknownKeys = true; encodeDefaults = true; }; val json = Json { ignoreUnknownKeys = true; encodeDefaults = true; coerceInputValues = true };
} }
} }

View File

@ -19,9 +19,9 @@ open class SerializedPlatformVideo(
override val thumbnails: Thumbnails, override val thumbnails: Thumbnails,
override val author: PlatformAuthorLink, override val author: PlatformAuthorLink,
@kotlinx.serialization.Serializable(with = OffsetDateTimeNullableSerializer::class) @kotlinx.serialization.Serializable(with = OffsetDateTimeNullableSerializer::class)
override val datetime: OffsetDateTime?, override val datetime: OffsetDateTime? = null,
override val url: String, override val url: String,
override val shareUrl: String, override val shareUrl: String = "",
override val duration: Long, override val duration: Long,
override val viewCount: Long, override val viewCount: Long,

View File

@ -1477,7 +1477,7 @@ class VideoDetailView : ConstraintLayout {
val historyItem = getHistoryIndex(videoDetail) ?: return@launch; val historyItem = getHistoryIndex(videoDetail) ?: return@launch;
withContext(Dispatchers.Main) { withContext(Dispatchers.Main) {
_historicalPosition = StateHistory.instance.updateHistoryPosition(video, historyItem,false, (toResume.toFloat() / 1000.0f).toLong()); _historicalPosition = StateHistory.instance.updateHistoryPosition(video, historyItem,false, (toResume.toFloat() / 1000.0f).toLong(), null, true);
Logger.i(TAG, "Historical position: $_historicalPosition, last position: $lastPositionMilliseconds"); Logger.i(TAG, "Historical position: $_historicalPosition, last position: $lastPositionMilliseconds");
if (_historicalPosition > 60 && video.duration - _historicalPosition > 5 && Math.abs(_historicalPosition - lastPositionMilliseconds / 1000) > 5.0) { if (_historicalPosition > 60 && video.duration - _historicalPosition > 5 && Math.abs(_historicalPosition - lastPositionMilliseconds / 1000) > 5.0) {
_layoutResume.visibility = View.VISIBLE; _layoutResume.visibility = View.VISIBLE;
@ -2497,7 +2497,7 @@ class VideoDetailView : ConstraintLayout {
if (v !is TutorialFragment.TutorialVideo) { if (v !is TutorialFragment.TutorialVideo) {
fragment.lifecycleScope.launch(Dispatchers.IO) { fragment.lifecycleScope.launch(Dispatchers.IO) {
val history = getHistoryIndex(v) ?: return@launch; val history = getHistoryIndex(v) ?: return@launch;
StateHistory.instance.updateHistoryPosition(v, history, true, (positionMilliseconds.toFloat() / 1000.0f).toLong()); StateHistory.instance.updateHistoryPosition(v, history, true, (positionMilliseconds.toFloat() / 1000.0f).toLong(), null, true);
} }
} }
_lastPositionSaveTime = currentTime; _lastPositionSaveTime = currentTime;

View File

@ -12,9 +12,13 @@ import com.futo.platformplayer.stores.FragmentedStorage
import com.futo.platformplayer.stores.db.ManagedDBStore import com.futo.platformplayer.stores.db.ManagedDBStore
import com.futo.platformplayer.stores.db.types.DBHistory import com.futo.platformplayer.stores.db.types.DBHistory
import com.futo.platformplayer.stores.v2.ReconstructStore import com.futo.platformplayer.stores.v2.ReconstructStore
import com.futo.platformplayer.sync.internal.GJSyncOpcodes
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import java.time.OffsetDateTime import java.time.OffsetDateTime
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentMap import java.util.concurrent.ConcurrentMap
import kotlin.math.min
class StateHistory { class StateHistory {
//Legacy //Legacy
@ -56,7 +60,7 @@ class StateHistory {
} }
fun updateHistoryPosition(liveObj: IPlatformVideo, index: DBHistory.Index, updateExisting: Boolean, position: Long = -1L): Long { fun updateHistoryPosition(liveObj: IPlatformVideo, index: DBHistory.Index, updateExisting: Boolean, position: Long = -1L, date: OffsetDateTime? = null, isUserAction: Boolean = false): Long {
val pos = if(position < 0) 0 else position; val pos = if(position < 0) 0 else position;
val historyVideo = index.obj; val historyVideo = index.obj;
@ -76,16 +80,49 @@ class StateHistory {
historyVideo.video = SerializedPlatformVideo.fromVideo(liveObj); historyVideo.video = SerializedPlatformVideo.fromVideo(liveObj);
historyVideo.position = pos; historyVideo.position = pos;
historyVideo.date = OffsetDateTime.now(); historyVideo.date = date ?: OffsetDateTime.now();
_historyDBStore.update(index.id!!, historyVideo); _historyDBStore.update(index.id!!, historyVideo);
onHistoricVideoChanged.emit(liveObj, pos); onHistoricVideoChanged.emit(liveObj, pos);
} }
if(isUserAction) {
StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) {
if(StateSync.instance.hasAtLeastOneOnlineDevice()) {
Logger.i(TAG, "SyncHistory playback broadcasted (${liveObj.name}: ${position})");
StateSync.instance.broadcastJson(
GJSyncOpcodes.syncHistory,
listOf(historyVideo)
);
}
};
}
return positionBefore;
}
return positionBefore; return positionBefore;
} }
return positionBefore; fun getRecentHistory(minDate: OffsetDateTime, max: Int = 1000): List<HistoryVideo> {
val pager = getHistoryPager();
val videos = pager.getResults().filter { it.date > minDate }.toMutableList();
while(pager.hasMorePages() && videos.size < max) {
pager.nextPage();
val newResults = pager.getResults();
var foundEnd = false;
for(item in newResults) {
if(item.date < minDate) {
foundEnd = true;
break;
} }
else
videos.add(item);
}
if(foundEnd)
break;
}
return videos;
}
fun getHistoryPager(): IPager<HistoryVideo> { fun getHistoryPager(): IPager<HistoryVideo> {
return _historyDBStore.getObjectPager(); return _historyDBStore.getObjectPager();
} }

View File

@ -20,6 +20,8 @@ import com.futo.platformplayer.stores.FragmentedStorage
import com.futo.platformplayer.stores.StringStringMapStorage import com.futo.platformplayer.stores.StringStringMapStorage
import com.futo.platformplayer.stores.StringArrayStorage import com.futo.platformplayer.stores.StringArrayStorage
import com.futo.platformplayer.stores.StringStorage import com.futo.platformplayer.stores.StringStorage
import com.futo.platformplayer.stores.StringTMapStorage
import com.futo.platformplayer.sync.SyncSessionData
import com.futo.platformplayer.sync.internal.GJSyncOpcodes import com.futo.platformplayer.sync.internal.GJSyncOpcodes
import com.futo.platformplayer.sync.internal.SyncDeviceInfo import com.futo.platformplayer.sync.internal.SyncDeviceInfo
import com.futo.platformplayer.sync.internal.SyncKeyPair import com.futo.platformplayer.sync.internal.SyncKeyPair
@ -44,6 +46,7 @@ class StateSync {
private val _authorizedDevices = FragmentedStorage.get<StringArrayStorage>("authorized_devices") private val _authorizedDevices = FragmentedStorage.get<StringArrayStorage>("authorized_devices")
private val _syncKeyPair = FragmentedStorage.get<StringStorage>("sync_key_pair") private val _syncKeyPair = FragmentedStorage.get<StringStorage>("sync_key_pair")
private val _lastAddressStorage = FragmentedStorage.get<StringStringMapStorage>("sync_last_address_storage") private val _lastAddressStorage = FragmentedStorage.get<StringStringMapStorage>("sync_last_address_storage")
private val _syncSessionData = FragmentedStorage.get<StringTMapStorage<SyncSessionData>>("syncSessionData")
private var _serverSocket: ServerSocket? = null private var _serverSocket: ServerSocket? = null
private var _thread: Thread? = null private var _thread: Thread? = null
@ -190,6 +193,16 @@ class StateSync {
}; };
} }
fun getSyncSessionData(key: String): SyncSessionData {
return _syncSessionData.get(key) ?: SyncSessionData(key);
}
fun getSyncSessionDataString(key: String): String {
return Json.encodeToString(getSyncSessionData(key));
}
fun saveSyncSessionData(data: SyncSessionData){
_syncSessionData.setAndSave(data.publicKey, data);
}
private fun handleServiceUpdated(services: List<DnsService>) { private fun handleServiceUpdated(services: List<DnsService>) {
if (!Settings.instance.synchronization.connectDiscovered) { if (!Settings.instance.synchronization.connectDiscovered) {
return return
@ -343,6 +356,9 @@ class StateSync {
}) })
} }
inline fun <reified T> broadcastJson(opcode: UByte, data: T) {
broadcast(opcode, Json.encodeToString(data));
}
fun broadcast(opcode: UByte, data: String) { fun broadcast(opcode: UByte, data: String) {
broadcast(opcode, data.toByteArray(Charsets.UTF_8)); broadcast(opcode, data.toByteArray(Charsets.UTF_8));
} }
@ -363,7 +379,7 @@ class StateSync {
val time = measureTimeMillis { val time = measureTimeMillis {
//val export = StateBackup.export(); //val export = StateBackup.export();
//session.send(GJSyncOpcodes.syncExport, export.asZip()); //session.send(GJSyncOpcodes.syncExport, export.asZip());
session.send(GJSyncOpcodes.syncSubscriptions, StateSubscriptions.instance.getSyncSubscriptionsPackageString()); session.send(GJSyncOpcodes.syncStateExchange, getSyncSessionDataString(session.remotePublicKey));
} }
Logger.i(TAG, "Generated and sent sync export in ${time}ms"); Logger.i(TAG, "Generated and sent sync export in ${time}ms");
} }
@ -398,6 +414,11 @@ class StateSync {
return _authorizedDevices.values.isNotEmpty() return _authorizedDevices.values.isNotEmpty()
} }
} }
fun hasAtLeastOneOnlineDevice(): Boolean {
synchronized(_sessions) {
return _sessions.any{ it.value.connected && it.value.isAuthorized };
}
}
fun getAll(): List<String> { fun getAll(): List<String> {
synchronized(_authorizedDevices) { synchronized(_authorizedDevices) {

View File

@ -0,0 +1,29 @@
package com.futo.platformplayer.stores
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
@kotlinx.serialization.Serializable
class StringTMapStorage<T> : FragmentedStorageFileJson() {
var map: HashMap<String, T> = hashMapOf()
override fun encode(): String {
return Json.encodeToString(this)
}
fun get(key: String): T? {
return map[key]
}
fun setAndSave(key: String, value: T): T {
map[key] = value
save()
return value
}
fun setAndSaveBlocking(key: String, value: T): T {
map[key] = value
saveBlocking()
return value
}
}

View File

@ -4,8 +4,12 @@ class GJSyncOpcodes {
companion object { companion object {
val sendToDevices: UByte = 101.toUByte(); val sendToDevices: UByte = 101.toUByte();
val syncStateExchange: UByte = 150.toUByte();
val syncExport: UByte = 201.toUByte(); val syncExport: UByte = 201.toUByte();
val syncSubscriptions: UByte = 202.toUByte(); val syncSubscriptions: UByte = 202.toUByte();
val syncHistory: UByte = 203.toUByte(); val syncHistory: UByte = 203.toUByte();
} }
} }

View File

@ -0,0 +1,13 @@
package com.futo.platformplayer.sync
import com.futo.platformplayer.serializers.OffsetDateTimeSerializer
import kotlinx.serialization.Serializable
import java.time.OffsetDateTime
@Serializable
class SyncSessionData(var publicKey: String) {
@kotlinx.serialization.Serializable(with = OffsetDateTimeSerializer::class)
var lastHistory: OffsetDateTime = OffsetDateTime.MIN;
@kotlinx.serialization.Serializable(with = OffsetDateTimeSerializer::class)
var lastSubscription: OffsetDateTime = OffsetDateTime.MIN;
}

View File

@ -4,19 +4,25 @@ import com.futo.platformplayer.UIDialogs
import com.futo.platformplayer.activities.MainActivity import com.futo.platformplayer.activities.MainActivity
import com.futo.platformplayer.api.media.Serializer import com.futo.platformplayer.api.media.Serializer
import com.futo.platformplayer.logging.Logger import com.futo.platformplayer.logging.Logger
import com.futo.platformplayer.models.HistoryVideo
import com.futo.platformplayer.models.Subscription import com.futo.platformplayer.models.Subscription
import com.futo.platformplayer.states.StateApp import com.futo.platformplayer.states.StateApp
import com.futo.platformplayer.states.StateBackup import com.futo.platformplayer.states.StateBackup
import com.futo.platformplayer.states.StateHistory
import com.futo.platformplayer.states.StatePlayer import com.futo.platformplayer.states.StatePlayer
import com.futo.platformplayer.states.StateSubscriptions import com.futo.platformplayer.states.StateSubscriptions
import com.futo.platformplayer.states.StateSync
import com.futo.platformplayer.sync.SyncSessionData
import com.futo.platformplayer.sync.internal.SyncSocketSession.Opcode import com.futo.platformplayer.sync.internal.SyncSocketSession.Opcode
import com.futo.platformplayer.sync.models.SendToDevicePackage import com.futo.platformplayer.sync.models.SendToDevicePackage
import com.futo.platformplayer.sync.models.SyncSubscriptionsPackage import com.futo.platformplayer.sync.models.SyncSubscriptionsPackage
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
import java.io.ByteArrayInputStream import java.io.ByteArrayInputStream
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.time.OffsetDateTime
interface IAuthorizable { interface IAuthorizable {
val isAuthorized: Boolean val isAuthorized: Boolean
@ -142,6 +148,22 @@ class SyncSession : IAuthorizable {
}; };
} }
GJSyncOpcodes.syncStateExchange -> {
val dataBody = ByteArray(data.remaining());
data.get(dataBody);
val json = String(dataBody, Charsets.UTF_8);
val syncSessionData = Serializer.json.decodeFromString<SyncSessionData>(json);
Logger.i(TAG, "Received SyncSessionData from " + remotePublicKey);
send(GJSyncOpcodes.syncSubscriptions, StateSubscriptions.instance.getSyncSubscriptionsPackageString());
val recentHistory = StateHistory.instance.getRecentHistory(syncSessionData.lastHistory);
if(recentHistory.size > 0)
sendJson(GJSyncOpcodes.syncHistory, recentHistory);
}
GJSyncOpcodes.syncExport -> { GJSyncOpcodes.syncExport -> {
val dataBody = ByteArray(data.remaining()); val dataBody = ByteArray(data.remaining());
val bytesStr = ByteArrayInputStream(data.array(), data.position(), data.remaining()); val bytesStr = ByteArrayInputStream(data.array(), data.position(), data.remaining());
@ -173,6 +195,39 @@ class SyncSession : IAuthorizable {
val json = String(dataBody, Charsets.UTF_8); val json = String(dataBody, Charsets.UTF_8);
val subPackage = Serializer.json.decodeFromString<SyncSubscriptionsPackage>(json); val subPackage = Serializer.json.decodeFromString<SyncSubscriptionsPackage>(json);
handleSyncSubscriptionPackage(this, subPackage); handleSyncSubscriptionPackage(this, subPackage);
val newestSub = subPackage.subscriptions.maxOf { it.creationTime };
val sesData = StateSync.instance.getSyncSessionData(remotePublicKey);
if(newestSub > sesData.lastSubscription) {
sesData.lastSubscription = newestSub;
StateSync.instance.saveSyncSessionData(sesData);
}
}
GJSyncOpcodes.syncHistory -> {
val dataBody = ByteArray(data.remaining());
data.get(dataBody);
val json = String(dataBody, Charsets.UTF_8);
val history = Serializer.json.decodeFromString<List<HistoryVideo>>(json);
Logger.i(TAG, "SyncHistory received ${history.size} videos from ${remotePublicKey}");
var lastHistory = OffsetDateTime.MIN;
for(video in history){
val hist = StateHistory.instance.getHistoryByVideo(video.video, true, video.date);
if(hist != null)
StateHistory.instance.updateHistoryPosition(video.video, hist, true, video.position, video.date)
if(lastHistory < video.date)
lastHistory = video.date;
}
if(lastHistory != OffsetDateTime.MIN && history.size > 1) {
val sesData = StateSync.instance.getSyncSessionData(remotePublicKey);
if (lastHistory > sesData.lastHistory) {
sesData.lastHistory = lastHistory;
StateSync.instance.saveSyncSessionData(sesData);
}
}
} }
} }
} }
@ -214,6 +269,9 @@ class SyncSession : IAuthorizable {
} }
inline fun <reified T> sendJson(opcode: UByte, data: T) {
send(opcode, Json.encodeToString<T>(data));
}
fun send(opcode: UByte, data: String) { fun send(opcode: UByte, data: String) {
send(opcode, data.toByteArray(Charsets.UTF_8)); send(opcode, data.toByteArray(Charsets.UTF_8));
} }