Fixes to sync.

This commit is contained in:
Koen J 2025-01-06 15:56:31 +01:00
parent 7ce437d50a
commit 561d5ec7ab
6 changed files with 110 additions and 61 deletions

View File

@ -649,17 +649,8 @@ class VideoDetailView : ConstraintLayout {
}; };
var hadDevice = false; var hadDevice = false;
StateSync.instance.deviceUpdatedOrAdded.subscribe(this) { id, session -> val devicesChanged = { id: String ->
val hasDevice = StateSync.instance.hasAtLeastOneOnlineDevice(); val hasDevice = StateSync.instance.hasAuthorizedDevice();
if(hasDevice != hadDevice) {
hadDevice = hasDevice;
fragment.lifecycleScope.launch(Dispatchers.Main) {
updateMoreButtons();
}
}
};
StateSync.instance.deviceRemoved.subscribe(this) { id ->
val hasDevice = StateSync.instance.hasAtLeastOneOnlineDevice();
if (hasDevice != hadDevice) { if (hasDevice != hadDevice) {
hadDevice = hasDevice; hadDevice = hasDevice;
fragment.lifecycleScope.launch(Dispatchers.Main) { fragment.lifecycleScope.launch(Dispatchers.Main) {
@ -668,6 +659,9 @@ class VideoDetailView : ConstraintLayout {
} }
} }
StateSync.instance.deviceUpdatedOrAdded.subscribe(this) { id, _ -> devicesChanged(id) };
StateSync.instance.deviceRemoved.subscribe(this) { id -> devicesChanged(id) };
MediaControlReceiver.onLowerVolumeReceived.subscribe(this) { handleLowerVolume() }; MediaControlReceiver.onLowerVolumeReceived.subscribe(this) { handleLowerVolume() };
MediaControlReceiver.onPlayReceived.subscribe(this) { handlePlay() }; MediaControlReceiver.onPlayReceived.subscribe(this) { handlePlay() };
MediaControlReceiver.onPauseReceived.subscribe(this) { handlePause() }; MediaControlReceiver.onPauseReceived.subscribe(this) { handlePause() };
@ -922,18 +916,25 @@ class VideoDetailView : ConstraintLayout {
}; };
_slideUpOverlay?.hide(); _slideUpOverlay?.hide();
}, },
if(StateSync.instance.hasAtLeastOneOnlineDevice()) { if (StateSync.instance.hasAuthorizedDevice()) {
RoundButton(context, R.drawable.ic_device, context.getString(R.string.send_to_device), TAG_SEND_TO_DEVICE) { RoundButton(context, R.drawable.ic_device, context.getString(R.string.send_to_device), TAG_SEND_TO_DEVICE) {
val devices = StateSync.instance.getSessions(); val devices = StateSync.instance.getAuthorizedSessions();
val videoToSend = video ?: return@RoundButton; val videoToSend = video ?: return@RoundButton;
if(devices.size > 1) { if(devices.size > 1) {
//not implemented //not implemented
} } else if(devices.size == 1){
else if(devices.size == 1){
val device = devices.first(); val device = devices.first();
Logger.i(TAG, "Send to device? (public key: ${device.remotePublicKey}): " + videoToSend.url)
UIDialogs.showConfirmationDialog(context, "Would you like to open\n[${videoToSend.name}]\non ${device.remotePublicKey}" , { UIDialogs.showConfirmationDialog(context, "Would you like to open\n[${videoToSend.name}]\non ${device.remotePublicKey}" , {
Logger.i(TAG, "Send to device confirmed (public key: ${device.remotePublicKey}): " + videoToSend.url)
fragment.lifecycleScope.launch(Dispatchers.IO) { fragment.lifecycleScope.launch(Dispatchers.IO) {
device.sendJsonData(GJSyncOpcodes.sendToDevices, SendToDevicePackage(videoToSend.url, (lastPositionMilliseconds/1000).toInt())); try {
device.sendJsonData(GJSyncOpcodes.sendToDevices, SendToDevicePackage(videoToSend.url, (lastPositionMilliseconds / 1000).toInt()))
Logger.i(TAG, "Send to device packet sent (public key: ${device.remotePublicKey}): " + videoToSend.url)
} catch (e: Throwable) {
Logger.e(TAG, "Send to device packet failed to send", e)
}
} }
}) })
} }

View File

@ -8,6 +8,7 @@ import com.futo.platformplayer.constructs.Event2
import com.futo.platformplayer.logging.Logger import com.futo.platformplayer.logging.Logger
import com.futo.platformplayer.models.HistoryVideo import com.futo.platformplayer.models.HistoryVideo
import com.futo.platformplayer.models.ImportCache import com.futo.platformplayer.models.ImportCache
import com.futo.platformplayer.states.StatePlaylists.Companion
import com.futo.platformplayer.stores.FragmentedStorage import com.futo.platformplayer.stores.FragmentedStorage
import com.futo.platformplayer.stores.db.ManagedDBStore import com.futo.platformplayer.stores.db.ManagedDBStore
import com.futo.platformplayer.stores.db.types.DBHistory import com.futo.platformplayer.stores.db.types.DBHistory
@ -89,12 +90,14 @@ class StateHistory {
if(isUserAction && _lastHistoryBroadcast != historyBroadcastSig) { if(isUserAction && _lastHistoryBroadcast != historyBroadcastSig) {
_lastHistoryBroadcast = historyBroadcastSig; _lastHistoryBroadcast = historyBroadcastSig;
StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) { StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) {
if(StateSync.instance.hasAtLeastOneOnlineDevice()) { try {
Logger.i(TAG, "SyncHistory playback broadcasted (${liveObj.name}: ${position})"); Logger.i(TAG, "SyncHistory playback broadcasted (${liveObj.name}: ${position})");
StateSync.instance.broadcastJsonData( StateSync.instance.broadcastJsonData(
GJSyncOpcodes.syncHistory, GJSyncOpcodes.syncHistory,
listOf(historyVideo) listOf(historyVideo)
); );
} catch (e: Throwable) {
Logger.e(StatePlaylists.TAG, "Failed to broadcast sync history", e)
} }
}; };
} }

View File

@ -227,31 +227,50 @@ class StatePlaylists {
private fun broadcastWatchLater(orderOnly: Boolean = false) { private fun broadcastWatchLater(orderOnly: Boolean = false) {
StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) { StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) {
StateSync.instance.broadcastJsonData(GJSyncOpcodes.syncWatchLater, SyncWatchLaterPackage( try {
StateSync.instance.broadcastJsonData(
GJSyncOpcodes.syncWatchLater, SyncWatchLaterPackage(
if (orderOnly) listOf() else getWatchLater(), if (orderOnly) listOf() else getWatchLater(),
if (orderOnly) mapOf() else _watchLaterAdds.all(), if (orderOnly) mapOf() else _watchLaterAdds.all(),
if (orderOnly) mapOf() else _watchLaterRemovals.all(), if (orderOnly) mapOf() else _watchLaterRemovals.all(),
getWatchLaterLastReorderTime().toEpochSecond(), getWatchLaterLastReorderTime().toEpochSecond(),
_watchlistOrderStore.values.toList())); _watchlistOrderStore.values.toList()
)
);
} catch (e: Throwable) {
Logger.w(TAG, "Failed to broadcast watch later", e)
}
}; };
} }
private fun broadcastWatchLaterAddition(video: SerializedPlatformVideo, time: OffsetDateTime) { private fun broadcastWatchLaterAddition(video: SerializedPlatformVideo, time: OffsetDateTime) {
StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) { StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) {
StateSync.instance.broadcastJsonData(GJSyncOpcodes.syncWatchLater, SyncWatchLaterPackage( try {
StateSync.instance.broadcastJsonData(
GJSyncOpcodes.syncWatchLater, SyncWatchLaterPackage(
listOf(video), listOf(video),
mapOf(Pair(video.url, time.toEpochSecond())), mapOf(Pair(video.url, time.toEpochSecond())),
mapOf(), mapOf(),
)) )
)
} catch (e: Throwable) {
Logger.w(TAG, "Failed to broadcast watch later addition", e)
}
}; };
} }
private fun broadcastWatchLaterRemoval(url: String, time: OffsetDateTime) { private fun broadcastWatchLaterRemoval(url: String, time: OffsetDateTime) {
StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) { StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) {
StateSync.instance.broadcastJsonData(GJSyncOpcodes.syncWatchLater, SyncWatchLaterPackage( try {
StateSync.instance.broadcastJsonData(
GJSyncOpcodes.syncWatchLater, SyncWatchLaterPackage(
listOf(), listOf(),
mapOf(), mapOf(),
mapOf(Pair(url, time.toEpochSecond())) mapOf(Pair(url, time.toEpochSecond()))
)) )
)
} catch (e: Throwable) {
Logger.w(TAG, "Failed to broadcast watch later removal", e)
}
}; };
} }
@ -300,12 +319,14 @@ class StatePlaylists {
private fun broadcastSyncPlaylist(playlist: Playlist){ private fun broadcastSyncPlaylist(playlist: Playlist){
StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) { StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) {
if(StateSync.instance.hasAtLeastOneOnlineDevice()) { try {
Logger.i(StateSubscriptionGroups.TAG, "SyncPlaylist (${playlist.name})"); Logger.i(StateSubscriptionGroups.TAG, "SyncPlaylist (${playlist.name})");
StateSync.instance.broadcastJsonData( StateSync.instance.broadcastJsonData(
GJSyncOpcodes.syncPlaylists, GJSyncOpcodes.syncPlaylists,
SyncPlaylistsPackage(listOf(playlist), mapOf()) SyncPlaylistsPackage(listOf(playlist), mapOf())
); );
} catch (e: Throwable) {
Logger.e(TAG, "Failed to broadcast sync playlist", e)
} }
}; };
} }
@ -319,12 +340,14 @@ class StatePlaylists {
_playlistRemoved.setAndSave(playlist.id, OffsetDateTime.now()); _playlistRemoved.setAndSave(playlist.id, OffsetDateTime.now());
StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) { StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) {
if(StateSync.instance.hasAtLeastOneOnlineDevice()) { try {
Logger.i(StateSubscriptionGroups.TAG, "SyncPlaylist (${playlist.name})"); Logger.i(StateSubscriptionGroups.TAG, "SyncPlaylist (${playlist.name})");
StateSync.instance.broadcastJsonData( StateSync.instance.broadcastJsonData(
GJSyncOpcodes.syncPlaylists, GJSyncOpcodes.syncPlaylists,
SyncPlaylistsPackage(listOf(), mapOf(Pair(playlist.id, OffsetDateTime.now().toEpochSecond()))) SyncPlaylistsPackage(listOf(), mapOf(Pair(playlist.id, OffsetDateTime.now().toEpochSecond())))
); );
} catch (e: Throwable) {
Logger.e(TAG, "Failed to broadcast sync playlists", e)
} }
}; };
} }

View File

@ -79,12 +79,14 @@ class StateSubscriptionGroups {
onGroupsChanged.emit(); onGroupsChanged.emit();
if(!preventSync) { if(!preventSync) {
StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) { StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) {
if(StateSync.instance.hasAtLeastOneOnlineDevice()) { try {
Logger.i(TAG, "SyncSubscriptionGroup (${subGroup.name})"); Logger.i(TAG, "SyncSubscriptionGroup (${subGroup.name})");
StateSync.instance.broadcastJsonData( StateSync.instance.broadcastJsonData(
GJSyncOpcodes.syncSubscriptionGroups, GJSyncOpcodes.syncSubscriptionGroups,
SyncSubscriptionGroupsPackage(listOf(subGroup), mapOf()) SyncSubscriptionGroupsPackage(listOf(subGroup), mapOf())
); );
} catch (e: Throwable) {
Logger.e(TAG, "Failed to broadcast update subscription group", e)
} }
}; };
} }
@ -98,12 +100,14 @@ class StateSubscriptionGroups {
if(isUserInteraction) { if(isUserInteraction) {
_groupsRemoved.setAndSave(id, OffsetDateTime.now()); _groupsRemoved.setAndSave(id, OffsetDateTime.now());
StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) { StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) {
if(StateSync.instance.hasAtLeastOneOnlineDevice()) { try {
Logger.i(TAG, "SyncSubscriptionGroup delete (${group.name})"); Logger.i(TAG, "SyncSubscriptionGroup delete (${group.name})");
StateSync.instance.broadcastJsonData( StateSync.instance.broadcastJsonData(
GJSyncOpcodes.syncSubscriptionGroups, GJSyncOpcodes.syncSubscriptionGroups,
SyncSubscriptionGroupsPackage(listOf(), mapOf(Pair(id, OffsetDateTime.now().toEpochSecond()))) SyncSubscriptionGroupsPackage(listOf(), mapOf(Pair(id, OffsetDateTime.now().toEpochSecond())))
); );
} catch (e: Throwable) {
Logger.e(TAG, "Failed to delete subscription group", e)
} }
}; };
} }

View File

@ -65,6 +65,12 @@ class StateSync {
val deviceRemoved: Event1<String> = Event1() val deviceRemoved: Event1<String> = Event1()
val deviceUpdatedOrAdded: Event2<String, SyncSession> = Event2() val deviceUpdatedOrAdded: Event2<String, SyncSession> = Event2()
fun hasAuthorizedDevice(): Boolean {
synchronized(_sessions) {
return _sessions.any{ it.value.connected && it.value.isAuthorized };
}
}
fun start() { fun start() {
if (_started) { if (_started) {
Logger.i(TAG, "Already started.") Logger.i(TAG, "Already started.")
@ -216,6 +222,11 @@ class StateSync {
return _sessions.values.toList() return _sessions.values.toList()
}; };
} }
fun getAuthorizedSessions(): List<SyncSession> {
return synchronized(_sessions) {
return _sessions.values.filter { it.isAuthorized }.toList()
};
}
fun getSyncSessionData(key: String): SyncSessionData { fun getSyncSessionData(key: String): SyncSessionData {
return _syncSessionData.get(key) ?: SyncSessionData(key); return _syncSessionData.get(key) ?: SyncSessionData(key);
@ -349,8 +360,12 @@ class StateSync {
scope.launch(Dispatchers.Main) { scope.launch(Dispatchers.Main) {
UIDialogs.showConfirmationDialog(activity, "Allow connection from ${remotePublicKey}?", action = { UIDialogs.showConfirmationDialog(activity, "Allow connection from ${remotePublicKey}?", action = {
scope.launch(Dispatchers.IO) { scope.launch(Dispatchers.IO) {
try {
session!!.authorize(s) session!!.authorize(s)
Logger.i(TAG, "Connection authorized for ${remotePublicKey} by confirmation") Logger.i(TAG, "Connection authorized for $remotePublicKey by confirmation")
} catch (e: Throwable) {
Logger.e(TAG, "Failed to send authorize", e)
}
} }
}, cancelAction = { }, cancelAction = {
scope.launch(Dispatchers.IO) { scope.launch(Dispatchers.IO) {
@ -404,12 +419,10 @@ class StateSync {
broadcast(opcode, subOpcode, data.toByteArray(Charsets.UTF_8)); broadcast(opcode, subOpcode, data.toByteArray(Charsets.UTF_8));
} }
fun broadcast(opcode: UByte, subOpcode: UByte, data: ByteArray) { fun broadcast(opcode: UByte, subOpcode: UByte, data: ByteArray) {
for(session in getSessions()) { for(session in getAuthorizedSessions()) {
try { try {
if (session.isAuthorized && session.connected) {
session.send(opcode, subOpcode, data); session.send(opcode, subOpcode, data);
} }
}
catch(ex: Exception) { catch(ex: Exception) {
Logger.w(TAG, "Failed to broadcast (opcode = ${opcode}, subOpcode = ${subOpcode}) to ${session.remotePublicKey}: ${ex.message}}", ex); Logger.w(TAG, "Failed to broadcast (opcode = ${opcode}, subOpcode = ${subOpcode}) to ${session.remotePublicKey}: ${ex.message}}", ex);
} }
@ -450,17 +463,6 @@ class StateSync {
return session return session
} }
fun hasAtLeastOneDevice(): Boolean {
synchronized(_authorizedDevices) {
return _authorizedDevices.values.isNotEmpty()
}
}
fun hasAtLeastOneOnlineDevice(): Boolean {
synchronized(_sessions) {
return _sessions.any{ it.value.connected && it.value.isAuthorized };
}
}
fun getAll(): List<String> { fun getAll(): List<String> {
synchronized(_authorizedDevices) { synchronized(_authorizedDevices) {
return _authorizedDevices.values.toList() return _authorizedDevices.values.toList()

View File

@ -398,7 +398,6 @@ class SyncSession : IAuthorizable {
} }
} }
inline fun <reified T> sendJsonData(subOpcode: UByte, data: T) { inline fun <reified T> sendJsonData(subOpcode: UByte, data: T) {
send(Opcode.DATA.value, subOpcode, Json.encodeToString<T>(data)); send(Opcode.DATA.value, subOpcode, Json.encodeToString<T>(data));
} }
@ -409,12 +408,29 @@ class SyncSession : IAuthorizable {
send(opcode, subOpcode, data.toByteArray(Charsets.UTF_8)); send(opcode, subOpcode, data.toByteArray(Charsets.UTF_8));
} }
fun send(opcode: UByte, subOpcode: UByte, data: ByteArray) { fun send(opcode: UByte, subOpcode: UByte, data: ByteArray) {
val sock = _socketSessions.firstOrNull(); val socketSessions = synchronized(_socketSessions) {
if(sock != null){ _socketSessions.toList()
sock.send(opcode, subOpcode, ByteBuffer.wrap(data)); }
if (socketSessions.isEmpty()) {
Logger.v(TAG, "Packet was not sent (opcode = ${opcode}, subOpcode = ${subOpcode}) due to no connected sockets")
return
}
var sent = false
for (socketSession in socketSessions) {
try {
socketSession.send(opcode, subOpcode, ByteBuffer.wrap(data))
sent = true
break
} catch (e: Throwable) {
Logger.w(TAG, "Packet failed to send (opcode = ${opcode}, subOpcode = ${subOpcode})", e)
}
}
if (!sent) {
throw Exception("Packet was not sent (opcode = ${opcode}, subOpcode = ${subOpcode}) due to send errors and no remaining candidates")
} }
else
throw IllegalStateException("Session has no active sockets");
} }
private companion object { private companion object {