From b460f9915d4306eecb4ff08c9fe99588dedbe855 Mon Sep 17 00:00:00 2001 From: Koen J Date: Mon, 14 Apr 2025 14:41:47 +0200 Subject: [PATCH] Added settings for enabling/disabling remote sync features. Fixed device pairing success showing too early. --- .../java/com/futo/platformplayer/Settings.kt | 9 + .../futo/platformplayer/states/StateSync.kt | 310 +++++++++--------- .../sync/internal/SyncSocketSession.kt | 18 +- app/src/main/res/values/strings.xml | 6 + 4 files changed, 188 insertions(+), 155 deletions(-) diff --git a/app/src/main/java/com/futo/platformplayer/Settings.kt b/app/src/main/java/com/futo/platformplayer/Settings.kt index 2bd95905..b9b81ec6 100644 --- a/app/src/main/java/com/futo/platformplayer/Settings.kt +++ b/app/src/main/java/com/futo/platformplayer/Settings.kt @@ -936,6 +936,15 @@ class Settings : FragmentedStorageFileJson() { @FormField(R.string.connect_last, FieldForm.TOGGLE, R.string.connect_last_description, 3) var connectLast: Boolean = true; + + @FormField(R.string.discover_through_relay, FieldForm.TOGGLE, R.string.discover_through_relay_description, 3) + var discoverThroughRelay: Boolean = true; + + @FormField(R.string.pair_through_relay, FieldForm.TOGGLE, R.string.pair_through_relay_description, 3) + var pairThroughRelay: Boolean = true; + + @FormField(R.string.connect_through_relay, FieldForm.TOGGLE, R.string.connect_through_relay_description, 3) + var connectThroughRelay: Boolean = true; } @FormField(R.string.info, FieldForm.GROUP, -1, 21) diff --git a/app/src/main/java/com/futo/platformplayer/states/StateSync.kt b/app/src/main/java/com/futo/platformplayer/states/StateSync.kt index 57bcde5f..471f9de6 100644 --- a/app/src/main/java/com/futo/platformplayer/states/StateSync.kt +++ b/app/src/main/java/com/futo/platformplayer/states/StateSync.kt @@ -32,11 +32,11 @@ import com.futo.platformplayer.sync.internal.ChannelSocket import com.futo.platformplayer.sync.internal.GJSyncOpcodes import com.futo.platformplayer.sync.internal.IAuthorizable import com.futo.platformplayer.sync.internal.IChannel +import com.futo.platformplayer.sync.internal.LinkType import com.futo.platformplayer.sync.internal.Opcode import com.futo.platformplayer.sync.internal.SyncDeviceInfo import com.futo.platformplayer.sync.internal.SyncKeyPair import com.futo.platformplayer.sync.internal.SyncSession -import com.futo.platformplayer.sync.internal.SyncSession.Companion import com.futo.platformplayer.sync.internal.SyncSocketSession import com.futo.platformplayer.sync.models.SendToDevicePackage import com.futo.platformplayer.sync.models.SyncPlaylistsPackage @@ -52,13 +52,11 @@ import kotlinx.coroutines.withContext import kotlinx.serialization.encodeToString import kotlinx.serialization.json.Json import java.io.ByteArrayInputStream -import java.lang.Thread.sleep import java.net.InetAddress import java.net.InetSocketAddress import java.net.ServerSocket import java.net.Socket import java.nio.ByteBuffer -import java.nio.channels.Channel import java.time.Instant import java.time.OffsetDateTime import java.time.ZoneOffset @@ -88,6 +86,7 @@ class StateSync { val pairingCode: String? get() = _pairingCode private var _relaySession: SyncSocketSession? = null private var _threadRelay: Thread? = null + private val _remotePendingStatusUpdate = mutableMapOf Unit>() var keyPair: DHState? = null var publicKey: String? = null @@ -157,10 +156,7 @@ class StateSync { while (_started) { val socket = serverSocket.accept() - val session = createSocketSession(socket, true) { session -> - - } - + val session = createSocketSession(socket, true) session.startAsResponder() } } catch (e: Throwable) { @@ -219,137 +215,124 @@ class StateSync { }.apply { start() } } - _threadRelay = Thread { - while (_started) { - try { - Log.i(TAG, "Starting relay session...") + if (Settings.instance.synchronization.discoverThroughRelay) { + _threadRelay = Thread { + while (_started) { + try { + Log.i(TAG, "Starting relay session...") - var socketClosed = false; - val socket = Socket(RELAY_SERVER, 9000) - _relaySession = SyncSocketSession( - (socket.remoteSocketAddress as InetSocketAddress).address.hostAddress!!, - keyPair!!, - LittleEndianDataInputStream(socket.getInputStream()), - LittleEndianDataOutputStream(socket.getOutputStream()), - isHandshakeAllowed = { _, pk, pairingCode -> - Log.v(TAG, "Check if handshake allowed from '$pk'.") - if (pk == RELAY_PUBLIC_KEY) - return@SyncSocketSession true - - synchronized(_authorizedDevices) { - if (_authorizedDevices.values.contains(pk)) - return@SyncSocketSession true - } - - Log.v(TAG, "Check if handshake allowed with pairing code '$pairingCode' with active pairing code '$_pairingCode'.") - if (_pairingCode == null || pairingCode.isNullOrEmpty()) - return@SyncSocketSession false - - _pairingCode == pairingCode - }, - onNewChannel = { _, c -> - val remotePublicKey = c.remotePublicKey - if (remotePublicKey == null) { - Log.e(TAG, "Remote public key should never be null in onNewChannel.") - return@SyncSocketSession - } - - Log.i(TAG, "New channel established from relay (pk: '$remotePublicKey').") - - var session: SyncSession? - synchronized(_sessions) { - session = _sessions[remotePublicKey] - if (session == null) { - val remoteDeviceName = synchronized(_nameStorage) { - _nameStorage.get(remotePublicKey) - } - session = createNewSyncSession(remotePublicKey, remoteDeviceName) { } - _sessions[remotePublicKey] = session!! + var socketClosed = false; + val socket = Socket(RELAY_SERVER, 9000) + _relaySession = SyncSocketSession( + (socket.remoteSocketAddress as InetSocketAddress).address.hostAddress!!, + keyPair!!, + LittleEndianDataInputStream(socket.getInputStream()), + LittleEndianDataOutputStream(socket.getOutputStream()), + isHandshakeAllowed = { linkType, syncSocketSession, publicKey, pairingCode -> isHandshakeAllowed(linkType, syncSocketSession, publicKey, pairingCode) }, + onNewChannel = { _, c -> + val remotePublicKey = c.remotePublicKey + if (remotePublicKey == null) { + Log.e(TAG, "Remote public key should never be null in onNewChannel.") + return@SyncSocketSession } - session!!.addChannel(c) - } - c.setDataHandler { _, channel, opcode, subOpcode, data -> - session?.handlePacket(opcode, subOpcode, data) - } - c.setCloseHandler { channel -> - session?.removeChannel(channel) - } - }, - onChannelEstablished = { _, channel, isResponder -> - handleAuthorization(channel, isResponder) - }, - onClose = { socketClosed = true }, - onHandshakeComplete = { relaySession -> - Thread { - try { - while (_started && !socketClosed) { - val unconnectedAuthorizedDevices = synchronized(_authorizedDevices) { - _authorizedDevices.values.filter { !isConnected(it) }.toTypedArray() + Log.i(TAG, "New channel established from relay (pk: '$remotePublicKey').") + + var session: SyncSession? + synchronized(_sessions) { + session = _sessions[remotePublicKey] + if (session == null) { + val remoteDeviceName = synchronized(_nameStorage) { + _nameStorage.get(remotePublicKey) } + session = createNewSyncSession(remotePublicKey, remoteDeviceName) + _sessions[remotePublicKey] = session!! + } + session!!.addChannel(c) + } - relaySession.publishConnectionInformation(unconnectedAuthorizedDevices, PORT, true, false, false, true) + c.setDataHandler { _, channel, opcode, subOpcode, data -> + session?.handlePacket(opcode, subOpcode, data) + } + c.setCloseHandler { channel -> + session?.removeChannel(channel) + } + }, + onChannelEstablished = { _, channel, isResponder -> + handleAuthorization(channel, isResponder) + }, + onClose = { socketClosed = true }, + onHandshakeComplete = { relaySession -> + Thread { + try { + while (_started && !socketClosed) { + val unconnectedAuthorizedDevices = synchronized(_authorizedDevices) { + _authorizedDevices.values.filter { !isConnected(it) }.toTypedArray() + } - val connectionInfos = runBlocking { relaySession.requestBulkConnectionInfo(unconnectedAuthorizedDevices) } + relaySession.publishConnectionInformation(unconnectedAuthorizedDevices, PORT, Settings.instance.synchronization.discoverThroughRelay, false, false, Settings.instance.synchronization.discoverThroughRelay && Settings.instance.synchronization.connectThroughRelay) - for ((targetKey, connectionInfo) in connectionInfos) { - val potentialLocalAddresses = connectionInfo.ipv4Addresses.union(connectionInfo.ipv6Addresses) - .filter { it != connectionInfo.remoteIp } - if (connectionInfo.allowLocalDirect) { - Thread { + val connectionInfos = runBlocking { relaySession.requestBulkConnectionInfo(unconnectedAuthorizedDevices) } + + for ((targetKey, connectionInfo) in connectionInfos) { + val potentialLocalAddresses = connectionInfo.ipv4Addresses.union(connectionInfo.ipv6Addresses) + .filter { it != connectionInfo.remoteIp } + if (connectionInfo.allowLocalDirect) { + Thread { + try { + Log.v(TAG, "Attempting to connect directly, locally to '$targetKey'.") + connect(potentialLocalAddresses.map { it }.toTypedArray(), PORT, targetKey, null) + } catch (e: Throwable) { + Log.e(TAG, "Failed to start direct connection using connection info with $targetKey.", e) + } + }.start() + } + + if (connectionInfo.allowRemoteDirect) { + // TODO: Implement direct remote connection if needed + } + + if (connectionInfo.allowRemoteHolePunched) { + // TODO: Implement hole punching if needed + } + + if (connectionInfo.allowRemoteRelayed && Settings.instance.synchronization.connectThroughRelay) { try { - Log.v(TAG, "Attempting to connect directly, locally to '$targetKey'.") - connect(potentialLocalAddresses.map { it }.toTypedArray(), PORT, targetKey, null) + Log.v(TAG, "Attempting relayed connection with '$targetKey'.") + runBlocking { relaySession.startRelayedChannel(targetKey, null) } } catch (e: Throwable) { - Log.e(TAG, "Failed to start direct connection using connection info with $targetKey.", e) + Log.e(TAG, "Failed to start relayed channel with $targetKey.", e) } - }.start() - } - - if (connectionInfo.allowRemoteDirect) { - // TODO: Implement direct remote connection if needed - } - - if (connectionInfo.allowRemoteHolePunched) { - // TODO: Implement hole punching if needed - } - - if (connectionInfo.allowRemoteProxied) { - try { - Log.v(TAG, "Attempting relayed connection with '$targetKey'.") - runBlocking { relaySession.startRelayedChannel(targetKey, null) } - } catch (e: Throwable) { - Log.e(TAG, "Failed to start relayed channel with $targetKey.", e) } } + + Thread.sleep(15000) } - - Thread.sleep(15000) + } catch (e: Throwable) { + Log.e(TAG, "Unhandled exception in relay session.", e) + relaySession.stop() } - } catch (e: Throwable) { - Log.e(TAG, "Unhandled exception in relay session.", e) - relaySession.stop() - } - }.start() + }.start() + } + ) + + _relaySession!!.authorizable = object : IAuthorizable { + override val isAuthorized: Boolean get() = true } - ) - _relaySession!!.authorizable = object : IAuthorizable { - override val isAuthorized: Boolean get() = true + _relaySession!!.startAsInitiator(RELAY_PUBLIC_KEY, null) + + Log.i(TAG, "Started relay session.") + } catch (e: Throwable) { + Log.e(TAG, "Relay session failed.", e) + Thread.sleep(5000) + } finally { + _relaySession?.stop() + _relaySession = null } - - _relaySession!!.startAsInitiator(RELAY_PUBLIC_KEY, null) - - Log.i(TAG, "Started relay session.") - } catch (e: Throwable) { - Log.e(TAG, "Relay session failed.", e) - Thread.sleep(5000) - } finally { - _relaySession?.stop() - _relaySession = null } - } - }.apply { start() } + }.apply { start() } + } } private fun getDeviceName(): String { @@ -680,7 +663,19 @@ class StateSync { } } - private fun createNewSyncSession(remotePublicKey: String, remoteDeviceName: String?, onAuthorized: ((SyncSession) -> Unit)?): SyncSession { + private fun onAuthorized(remotePublicKey: String) { + synchronized(_remotePendingStatusUpdate) { + _remotePendingStatusUpdate.remove(remotePublicKey)?.invoke(true, "Authorized") + } + } + + private fun onUnuthorized(remotePublicKey: String) { + synchronized(_remotePendingStatusUpdate) { + _remotePendingStatusUpdate.remove(remotePublicKey)?.invoke(false, "Unauthorized") + } + } + + private fun createNewSyncSession(remotePublicKey: String, remoteDeviceName: String?): SyncSession { return SyncSession( remotePublicKey, onAuthorized = { it, isNewlyAuthorized, isNewSession -> @@ -694,8 +689,8 @@ class StateSync { } } - Logger.i(TAG, "${remotePublicKey} authorized (name: ${it.displayName})") - onAuthorized?.invoke(it) + Logger.i(TAG, "$remotePublicKey authorized (name: ${it.displayName})") + onAuthorized(remotePublicKey) _authorizedDevices.addDistinct(remotePublicKey) _authorizedDevices.save() deviceUpdatedOrAdded.emit(it.remotePublicKey, it) @@ -705,13 +700,16 @@ class StateSync { onUnauthorized = { unauthorize(remotePublicKey) + Logger.i(TAG, "$remotePublicKey unauthorized (name: ${it.displayName})") + onUnuthorized(remotePublicKey) + synchronized(_sessions) { it.close() _sessions.remove(remotePublicKey) } }, onConnectedChanged = { it, connected -> - Logger.i(TAG, "$remotePublicKey connected: " + connected) + Logger.i(TAG, "$remotePublicKey connected: $connected") deviceUpdatedOrAdded.emit(it.remotePublicKey, it) }, onClose = { @@ -723,6 +721,10 @@ class StateSync { } deviceRemoved.emit(it.remotePublicKey) + + synchronized(_remotePendingStatusUpdate) { + _remotePendingStatusUpdate.remove(remotePublicKey)?.invoke(false, "Connection closed") + } }, dataHandler = { it, opcode, subOpcode, data -> handleData(it, opcode, subOpcode, data) @@ -731,7 +733,30 @@ class StateSync { ) } - private fun createSocketSession(socket: Socket, isResponder: Boolean, onAuthorized: (session: SyncSession) -> Unit): SyncSocketSession { + private fun isHandshakeAllowed(linkType: LinkType, syncSocketSession: SyncSocketSession, publicKey: String, pairingCode: String?): Boolean { + Log.v(TAG, "Check if handshake allowed from '$publicKey'.") + if (publicKey == RELAY_PUBLIC_KEY) + return true + + synchronized(_authorizedDevices) { + if (_authorizedDevices.values.contains(publicKey)) { + if (linkType == LinkType.Relayed && !Settings.instance.synchronization.connectThroughRelay) + return false + return true + } + } + + Log.v(TAG, "Check if handshake allowed with pairing code '$pairingCode' with active pairing code '$_pairingCode'.") + if (_pairingCode == null || pairingCode.isNullOrEmpty()) + return false + + if (linkType == LinkType.Relayed && !Settings.instance.synchronization.pairThroughRelay) + return false + + return _pairingCode == pairingCode + } + + private fun createSocketSession(socket: Socket, isResponder: Boolean): SyncSocketSession { var session: SyncSession? = null var channelSocket: ChannelSocket? = null return SyncSocketSession( @@ -743,21 +768,7 @@ class StateSync { if (channelSocket != null) session?.removeChannel(channelSocket!!) }, - isHandshakeAllowed = { _, pk, pairingCode -> - Logger.v(TAG, "Check if handshake allowed from '${pk}'.") - - synchronized (_authorizedDevices) - { - if (_authorizedDevices.values.contains(pk)) - return@SyncSocketSession true - } - - Logger.v(TAG, "Check if handshake allowed with pairing code '${pairingCode}' with active pairing code '${_pairingCode}'."); - if (_pairingCode == null || pairingCode.isNullOrEmpty()) - return@SyncSocketSession false - - return@SyncSocketSession _pairingCode == pairingCode - }, + isHandshakeAllowed = { linkType, syncSocketSession, publicKey, pairingCode -> isHandshakeAllowed(linkType, syncSocketSession, publicKey, pairingCode) }, onHandshakeComplete = { s -> val remotePublicKey = s.remotePublicKey if (remotePublicKey == null) { @@ -780,7 +791,7 @@ class StateSync { _lastAddressStorage.setAndSave(remotePublicKey, s.remoteAddress) } - session = createNewSyncSession(remotePublicKey, remoteDeviceName, onAuthorized) + session = createNewSyncSession(remotePublicKey, remoteDeviceName) _sessions[remotePublicKey] = session!! } session!!.addChannel(channelSocket!!) @@ -912,15 +923,19 @@ class StateSync { } catch (e: Throwable) { Logger.e(TAG, "Failed to connect directly", e) val relaySession = _relaySession - if (relaySession != null) { + if (relaySession != null && Settings.instance.synchronization.pairThroughRelay) { onStatusUpdate?.invoke(null, "Connecting via relay...") runBlocking { + if (onStatusUpdate != null) { + synchronized(_remotePendingStatusUpdate) { + _remotePendingStatusUpdate[deviceInfo.publicKey] = onStatusUpdate + } + } relaySession.startRelayedChannel(deviceInfo.publicKey, deviceInfo.pairingCode) - onStatusUpdate?.invoke(true, "Connected") } } else { - throw Exception("Failed to connect.") + throw e } } } @@ -930,8 +945,11 @@ class StateSync { val socket = getConnectedSocket(addresses.map { InetAddress.getByName(it) }, port) ?: throw Exception("Failed to connect") onStatusUpdate?.invoke(null, "Handshaking...") - val session = createSocketSession(socket, false) { s -> - onStatusUpdate?.invoke(true, "Authorized") + val session = createSocketSession(socket, false) + if (onStatusUpdate != null) { + synchronized(_remotePendingStatusUpdate) { + _remotePendingStatusUpdate[publicKey] = onStatusUpdate + } } session.startAsInitiator(publicKey, pairingCode) diff --git a/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSocketSession.kt b/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSocketSession.kt index c8f4f683..2b3a7e10 100644 --- a/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSocketSession.kt +++ b/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSocketSession.kt @@ -38,7 +38,7 @@ class SyncSocketSession { private val _onHandshakeComplete: ((session: SyncSocketSession) -> Unit)? private val _onNewChannel: ((session: SyncSocketSession, channel: ChannelRelayed) -> Unit)? private val _onChannelEstablished: ((session: SyncSocketSession, channel: ChannelRelayed, isResponder: Boolean) -> Unit)? - private val _isHandshakeAllowed: ((session: SyncSocketSession, remotePublicKey: String, pairingCode: String?) -> Boolean)? + private val _isHandshakeAllowed: ((linkType: LinkType, session: SyncSocketSession, remotePublicKey: String, pairingCode: String?) -> Boolean)? private var _cipherStatePair: CipherStatePair? = null private var _remotePublicKey: String? = null val remotePublicKey: String? get() = _remotePublicKey @@ -74,7 +74,7 @@ class SyncSocketSession { val allowLocalDirect: Boolean, val allowRemoteDirect: Boolean, val allowRemoteHolePunched: Boolean, - val allowRemoteProxied: Boolean + val allowRemoteRelayed: Boolean ) constructor( @@ -87,7 +87,7 @@ class SyncSocketSession { onData: ((session: SyncSocketSession, opcode: UByte, subOpcode: UByte, data: ByteBuffer) -> Unit)? = null, onNewChannel: ((session: SyncSocketSession, channel: ChannelRelayed) -> Unit)? = null, onChannelEstablished: ((session: SyncSocketSession, channel: ChannelRelayed, isResponder: Boolean) -> Unit)? = null, - isHandshakeAllowed: ((session: SyncSocketSession, remotePublicKey: String, pairingCode: String?) -> Boolean)? = null + isHandshakeAllowed: ((linkType: LinkType, session: SyncSocketSession, remotePublicKey: String, pairingCode: String?) -> Boolean)? = null ) { _inputStream = inputStream _outputStream = outputStream @@ -278,7 +278,7 @@ class SyncSocketSession { responder.remotePublicKey.getPublicKey(remoteKeyBytes, 0) _remotePublicKey = Base64.getEncoder().encodeToString(remoteKeyBytes) - return (_remotePublicKey != _localPublicKey && (_isHandshakeAllowed?.invoke(this, _remotePublicKey!!, pairingCode) ?: true)).also { + return (_remotePublicKey != _localPublicKey && (_isHandshakeAllowed?.invoke(LinkType.Direct, this, _remotePublicKey!!, pairingCode) ?: true)).also { if (!it) stop() } } @@ -420,7 +420,7 @@ class SyncSocketSession { val length = pairingProtocol.readMessage(pairingMessage, 0, pairingMessageLength, plaintext, 0) String(plaintext, 0, length, Charsets.UTF_8) } else null - val isAllowed = publicKey != _localPublicKey && (_isHandshakeAllowed?.invoke(this, publicKey, pairingCode) ?: true) + val isAllowed = publicKey != _localPublicKey && (_isHandshakeAllowed?.invoke(LinkType.Relayed, this, publicKey, pairingCode) ?: true) if (!isAllowed) { val rp = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN) rp.putInt(2) // Status code for not allowed @@ -649,8 +649,8 @@ class SyncSocketSession { val allowLocalDirect = info.get() != 0.toByte() val allowRemoteDirect = info.get() != 0.toByte() val allowRemoteHolePunched = info.get() != 0.toByte() - val allowRemoteProxied = info.get() != 0.toByte() - return ConnectionInfo(port, name, remoteIp, ipv4Addresses, ipv6Addresses, allowLocalDirect, allowRemoteDirect, allowRemoteHolePunched, allowRemoteProxied) + val allowRemoteRelayed = info.get() != 0.toByte() + return ConnectionInfo(port, name, remoteIp, ipv4Addresses, ipv6Addresses, allowLocalDirect, allowRemoteDirect, allowRemoteHolePunched, allowRemoteRelayed) } private fun handleNotify(subOpcode: UByte, data: ByteBuffer, sourceChannel: ChannelRelayed?) { @@ -920,7 +920,7 @@ class SyncSocketSession { allowLocalDirect: Boolean, allowRemoteDirect: Boolean, allowRemoteHolePunched: Boolean, - allowRemoteProxied: Boolean + allowRemoteRelayed: Boolean ) { if (authorizedKeys.size > 255) throw IllegalArgumentException("Number of authorized keys exceeds 255") @@ -960,7 +960,7 @@ class SyncSocketSession { data.put(if (allowLocalDirect) 1 else 0) data.put(if (allowRemoteDirect) 1 else 0) data.put(if (allowRemoteHolePunched) 1 else 0) - data.put(if (allowRemoteProxied) 1 else 0) + data.put(if (allowRemoteRelayed) 1 else 0) val handshakeSize = 48 // Noise handshake size for N pattern diff --git a/app/src/main/res/values/strings.xml b/app/src/main/res/values/strings.xml index e7c101c9..c6546d70 100644 --- a/app/src/main/res/values/strings.xml +++ b/app/src/main/res/values/strings.xml @@ -372,6 +372,12 @@ Allow device to search for and initiate connection with known paired devices Try connect last Allow device to automatically connect to last known + Discover through relay + Allow paired devices to be discovered and connected to through the relay + Pair through relay + Allow devices to be paired through the relay + Connection through relay + Allow devices to be connected to through the relay Gesture controls Volume slider Enable slide gesture to change volume