Implemented remote sync.

This commit is contained in:
Koen 2025-04-11 14:31:47 +00:00
parent dccdf72c73
commit 05230971b3
12 changed files with 2353 additions and 664 deletions

View File

@ -0,0 +1,266 @@
package com.futo.platformplayer
import com.futo.platformplayer.noise.protocol.Noise
import com.futo.platformplayer.sync.internal.*
import kotlinx.coroutines.*
import org.junit.Assert.*
import org.junit.Test
import java.net.Socket
import java.nio.ByteBuffer
import kotlin.random.Random
import kotlin.time.Duration.Companion.milliseconds
class SyncServerTests {
//private val relayHost = "relay.grayjay.app"
//private val relayKey = "xGbHRzDOvE6plRbQaFgSen82eijF+gxS0yeUaeEErkw="
private val relayKey = "XlUaSpIlRaCg0TGzZ7JYmPupgUHDqTZXUUBco2K7ejw="
private val relayHost = "192.168.1.175"
private val relayPort = 9000
/** Creates a client connected to the live relay server. */
private suspend fun createClient(
onHandshakeComplete: ((SyncSocketSession) -> Unit)? = null,
onData: ((SyncSocketSession, UByte, UByte, ByteBuffer) -> Unit)? = null,
onNewChannel: ((SyncSocketSession, ChannelRelayed) -> Unit)? = null,
isHandshakeAllowed: ((SyncSocketSession, String, String?) -> Boolean)? = null
): SyncSocketSession = withContext(Dispatchers.IO) {
val p = Noise.createDH("25519")
p.generateKeyPair()
val socket = Socket(relayHost, relayPort)
val inputStream = LittleEndianDataInputStream(socket.getInputStream())
val outputStream = LittleEndianDataOutputStream(socket.getOutputStream())
val tcs = CompletableDeferred<Boolean>()
val socketSession = SyncSocketSession(
relayHost,
p,
inputStream,
outputStream,
onClose = { socket.close() },
onHandshakeComplete = { s ->
onHandshakeComplete?.invoke(s)
tcs.complete(true)
},
onData = onData ?: { _, _, _, _ -> },
onNewChannel = onNewChannel ?: { _, _ -> },
isHandshakeAllowed = isHandshakeAllowed ?: { _, _, _ -> true }
)
socketSession.authorizable = AlwaysAuthorized()
socketSession.startAsInitiator(relayKey)
withTimeout(5000.milliseconds) { tcs.await() }
return@withContext socketSession
}
@Test
fun multipleClientsHandshake_Success() = runBlocking {
val client1 = createClient()
val client2 = createClient()
assertNotNull(client1.remotePublicKey, "Client 1 handshake failed")
assertNotNull(client2.remotePublicKey, "Client 2 handshake failed")
client1.stop()
client2.stop()
}
@Test
fun publishAndRequestConnectionInfo_Authorized_Success() = runBlocking {
val clientA = createClient()
val clientB = createClient()
val clientC = createClient()
clientA.publishConnectionInformation(arrayOf(clientB.localPublicKey), 12345, true, true, true, true)
delay(100.milliseconds)
val infoB = clientB.requestConnectionInfo(clientA.localPublicKey)
val infoC = clientC.requestConnectionInfo(clientA.localPublicKey)
assertNotNull("Client B should receive connection info", infoB)
assertEquals(12345.toUShort(), infoB!!.port)
assertNull("Client C should not receive connection info (unauthorized)", infoC)
clientA.stop()
clientB.stop()
clientC.stop()
}
@Test
fun relayedTransport_Bidirectional_Success() = runBlocking {
val tcsA = CompletableDeferred<ChannelRelayed>()
val tcsB = CompletableDeferred<ChannelRelayed>()
val clientA = createClient(onNewChannel = { _, c -> tcsA.complete(c) })
val clientB = createClient(onNewChannel = { _, c -> tcsB.complete(c) })
val channelTask = async { clientA.startRelayedChannel(clientB.localPublicKey) }
val channelA = withTimeout(5000.milliseconds) { tcsA.await() }
channelA.authorizable = AlwaysAuthorized()
val channelB = withTimeout(5000.milliseconds) { tcsB.await() }
channelB.authorizable = AlwaysAuthorized()
channelTask.await()
val tcsDataB = CompletableDeferred<ByteArray>()
channelB.setDataHandler { _, _, o, so, d ->
val b = ByteArray(d.remaining())
d.get(b)
if (o == Opcode.DATA.value && so == 0u.toUByte()) tcsDataB.complete(b)
}
channelA.send(Opcode.DATA.value, 0u, ByteBuffer.wrap(byteArrayOf(1, 2, 3)))
val tcsDataA = CompletableDeferred<ByteArray>()
channelA.setDataHandler { _, _, o, so, d ->
val b = ByteArray(d.remaining())
d.get(b)
if (o == Opcode.DATA.value && so == 0u.toUByte()) tcsDataA.complete(b)
}
channelB.send(Opcode.DATA.value, 0u, ByteBuffer.wrap(byteArrayOf(4, 5, 6)))
val receivedB = withTimeout(5000.milliseconds) { tcsDataB.await() }
val receivedA = withTimeout(5000.milliseconds) { tcsDataA.await() }
assertArrayEquals(byteArrayOf(1, 2, 3), receivedB)
assertArrayEquals(byteArrayOf(4, 5, 6), receivedA)
clientA.stop()
clientB.stop()
}
@Test
fun relayedTransport_MaximumMessageSize_Success() = runBlocking {
val MAX_DATA_PER_PACKET = SyncSocketSession.MAXIMUM_PACKET_SIZE - SyncSocketSession.HEADER_SIZE - 8 - 16 - 16
val maxSizeData = ByteArray(MAX_DATA_PER_PACKET).apply { Random.nextBytes(this) }
val tcsA = CompletableDeferred<ChannelRelayed>()
val tcsB = CompletableDeferred<ChannelRelayed>()
val clientA = createClient(onNewChannel = { _, c -> tcsA.complete(c) })
val clientB = createClient(onNewChannel = { _, c -> tcsB.complete(c) })
val channelTask = async { clientA.startRelayedChannel(clientB.localPublicKey) }
val channelA = withTimeout(5000.milliseconds) { tcsA.await() }
channelA.authorizable = AlwaysAuthorized()
val channelB = withTimeout(5000.milliseconds) { tcsB.await() }
channelB.authorizable = AlwaysAuthorized()
channelTask.await()
val tcsDataB = CompletableDeferred<ByteArray>()
channelB.setDataHandler { _, _, o, so, d ->
val b = ByteArray(d.remaining())
d.get(b)
if (o == Opcode.DATA.value && so == 0u.toUByte()) tcsDataB.complete(b)
}
channelA.send(Opcode.DATA.value, 0u, ByteBuffer.wrap(maxSizeData))
val receivedData = withTimeout(5000.milliseconds) { tcsDataB.await() }
assertArrayEquals(maxSizeData, receivedData)
clientA.stop()
clientB.stop()
}
@Test
fun publishAndGetRecord_Success() = runBlocking {
val clientA = createClient()
val clientB = createClient()
val clientC = createClient()
val data = byteArrayOf(1, 2, 3)
val success = clientA.publishRecords(listOf(clientB.localPublicKey), "testKey", data)
val recordB = clientB.getRecord(clientA.localPublicKey, "testKey")
val recordC = clientC.getRecord(clientA.localPublicKey, "testKey")
assertTrue(success)
assertNotNull(recordB)
assertArrayEquals(data, recordB!!.first)
assertNull("Unauthorized client should not access record", recordC)
clientA.stop()
clientB.stop()
clientC.stop()
}
@Test
fun getNonExistentRecord_ReturnsNull() = runBlocking {
val clientA = createClient()
val clientB = createClient()
val record = clientB.getRecord(clientA.localPublicKey, "nonExistentKey")
assertNull("Getting non-existent record should return null", record)
clientA.stop()
clientB.stop()
}
@Test
fun updateRecord_TimestampUpdated() = runBlocking {
val clientA = createClient()
val clientB = createClient()
val key = "updateKey"
val data1 = byteArrayOf(1)
val data2 = byteArrayOf(2)
clientA.publishRecords(listOf(clientB.localPublicKey), key, data1)
val record1 = clientB.getRecord(clientA.localPublicKey, key)
delay(1000.milliseconds)
clientA.publishRecords(listOf(clientB.localPublicKey), key, data2)
val record2 = clientB.getRecord(clientA.localPublicKey, key)
assertNotNull(record1)
assertNotNull(record2)
assertTrue(record2!!.second > record1!!.second)
assertArrayEquals(data2, record2.first)
clientA.stop()
clientB.stop()
}
@Test
fun deleteRecord_Success() = runBlocking {
val clientA = createClient()
val clientB = createClient()
val data = byteArrayOf(1, 2, 3)
clientA.publishRecords(listOf(clientB.localPublicKey), "toDelete", data)
val success = clientB.deleteRecords(clientA.localPublicKey, clientB.localPublicKey, listOf("toDelete"))
val record = clientB.getRecord(clientA.localPublicKey, "toDelete")
assertTrue(success)
assertNull(record)
clientA.stop()
clientB.stop()
}
@Test
fun listRecordKeys_Success() = runBlocking {
val clientA = createClient()
val clientB = createClient()
val keys = arrayOf("key1", "key2", "key3")
keys.forEach { key ->
clientA.publishRecords(listOf(clientB.localPublicKey), key, byteArrayOf(1))
}
val listedKeys = clientB.listRecordKeys(clientA.localPublicKey, clientB.localPublicKey)
assertArrayEquals(keys, listedKeys.map { it.first }.toTypedArray())
clientA.stop()
clientB.stop()
}
@Test
fun singleLargeMessageViaRelayedChannel_Success() = runBlocking {
val largeData = ByteArray(100000).apply { Random.nextBytes(this) }
val tcsA = CompletableDeferred<ChannelRelayed>()
val tcsB = CompletableDeferred<ChannelRelayed>()
val clientA = createClient(onNewChannel = { _, c -> tcsA.complete(c) })
val clientB = createClient(onNewChannel = { _, c -> tcsB.complete(c) })
val channelTask = async { clientA.startRelayedChannel(clientB.localPublicKey) }
val channelA = withTimeout(5000.milliseconds) { tcsA.await() }
channelA.authorizable = AlwaysAuthorized()
val channelB = withTimeout(5000.milliseconds) { tcsB.await() }
channelB.authorizable = AlwaysAuthorized()
channelTask.await()
val tcsDataB = CompletableDeferred<ByteArray>()
channelB.setDataHandler { _, _, o, so, d ->
val b = ByteArray(d.remaining())
d.get(b)
if (o == Opcode.DATA.value && so == 0u.toUByte()) tcsDataB.complete(b)
}
channelA.send(Opcode.DATA.value, 0u, ByteBuffer.wrap(largeData))
val receivedData = withTimeout(10000.milliseconds) { tcsDataB.await() }
assertArrayEquals(largeData, receivedData)
clientA.stop()
clientB.stop()
}
@Test
fun publishAndGetLargeRecord_Success() = runBlocking {
val largeData = ByteArray(1000000).apply { Random.nextBytes(this) }
val clientA = createClient()
val clientB = createClient()
val success = clientA.publishRecords(listOf(clientB.localPublicKey), "largeRecord", largeData)
val record = clientB.getRecord(clientA.localPublicKey, "largeRecord")
assertTrue(success)
assertNotNull(record)
assertArrayEquals(largeData, record!!.first)
clientA.stop()
clientB.stop()
}
}
class AlwaysAuthorized : IAuthorizable {
override val isAuthorized: Boolean get() = true
}

View File

@ -28,12 +28,11 @@ import com.futo.platformplayer.models.PlatformVideoWithTime
import com.futo.platformplayer.others.PlatformLinkMovementMethod import com.futo.platformplayer.others.PlatformLinkMovementMethod
import java.io.ByteArrayInputStream import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream import java.io.ByteArrayOutputStream
import java.io.File
import java.io.IOException import java.io.IOException
import java.io.InputStream import java.io.InputStream
import java.io.OutputStream import java.io.OutputStream
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.nio.ByteOrder import java.security.SecureRandom
import java.time.OffsetDateTime import java.time.OffsetDateTime
import java.util.* import java.util.*
import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.ThreadLocalRandom
@ -284,6 +283,18 @@ fun ByteBuffer.toUtf8String(): String {
return String(remainingBytes, Charsets.UTF_8) return String(remainingBytes, Charsets.UTF_8)
} }
fun generateReadablePassword(length: Int): String {
val validChars = "ABCDEFGHJKLMNPQRSTUVWXYZabcdefghjkmnpqrstuvwxyz23456789"
val secureRandom = SecureRandom()
val randomBytes = ByteArray(length)
secureRandom.nextBytes(randomBytes)
val sb = StringBuilder(length)
for (byte in randomBytes) {
val index = (byte.toInt() and 0xFF) % validChars.length
sb.append(validChars[index])
}
return sb.toString()
}
fun ByteArray.toGzip(): ByteArray { fun ByteArray.toGzip(): ByteArray {
if (this == null || this.isEmpty()) return ByteArray(0) if (this == null || this.isEmpty()) return ByteArray(0)

View File

@ -100,7 +100,8 @@ class SyncHomeActivity : AppCompatActivity() {
private fun updateDeviceView(syncDeviceView: SyncDeviceView, publicKey: String, session: SyncSession?): SyncDeviceView { private fun updateDeviceView(syncDeviceView: SyncDeviceView, publicKey: String, session: SyncSession?): SyncDeviceView {
val connected = session?.connected ?: false val connected = session?.connected ?: false
syncDeviceView.setLinkType(if (connected) LinkType.Local else LinkType.None)
syncDeviceView.setLinkType(session?.linkType ?: LinkType.None)
.setName(session?.displayName ?: StateSync.instance.getCachedName(publicKey) ?: publicKey) .setName(session?.displayName ?: StateSync.instance.getCachedName(publicKey) ?: publicKey)
//TODO: also display public key? //TODO: also display public key?
.setStatus(if (connected) "Connected" else "Disconnected") .setStatus(if (connected) "Connected" else "Disconnected")

View File

@ -109,9 +109,9 @@ class SyncPairActivity : AppCompatActivity() {
lifecycleScope.launch(Dispatchers.IO) { lifecycleScope.launch(Dispatchers.IO) {
try { try {
StateSync.instance.connect(deviceInfo) { session, complete, message -> StateSync.instance.connect(deviceInfo) { complete, message ->
lifecycleScope.launch(Dispatchers.Main) { lifecycleScope.launch(Dispatchers.Main) {
if (complete) { if (complete != null && complete) {
_layoutPairingSuccess.visibility = View.VISIBLE _layoutPairingSuccess.visibility = View.VISIBLE
_layoutPairing.visibility = View.GONE _layoutPairing.visibility = View.GONE
} else { } else {

View File

@ -67,7 +67,7 @@ class SyncShowPairingCodeActivity : AppCompatActivity() {
} }
val ips = getIPs() val ips = getIPs()
val selfDeviceInfo = SyncDeviceInfo(StateSync.instance.publicKey!!, ips.toTypedArray(), StateSync.PORT) val selfDeviceInfo = SyncDeviceInfo(StateSync.instance.publicKey!!, ips.toTypedArray(), StateSync.PORT, StateSync.instance.pairingCode)
val json = Json.encodeToString(selfDeviceInfo) val json = Json.encodeToString(selfDeviceInfo)
val base64 = Base64.encodeToString(json.toByteArray(), Base64.URL_SAFE or Base64.NO_PADDING or Base64.NO_WRAP) val base64 = Base64.encodeToString(json.toByteArray(), Base64.URL_SAFE or Base64.NO_PADDING or Base64.NO_WRAP)
val url = "grayjay://sync/${base64}" val url = "grayjay://sync/${base64}"

View File

@ -6,38 +6,62 @@ import com.futo.platformplayer.LittleEndianDataInputStream
import com.futo.platformplayer.LittleEndianDataOutputStream import com.futo.platformplayer.LittleEndianDataOutputStream
import com.futo.platformplayer.Settings import com.futo.platformplayer.Settings
import com.futo.platformplayer.UIDialogs import com.futo.platformplayer.UIDialogs
import com.futo.platformplayer.activities.MainActivity
import com.futo.platformplayer.activities.SyncShowPairingCodeActivity import com.futo.platformplayer.activities.SyncShowPairingCodeActivity
import com.futo.platformplayer.api.media.Serializer
import com.futo.platformplayer.constructs.Event1 import com.futo.platformplayer.constructs.Event1
import com.futo.platformplayer.constructs.Event2 import com.futo.platformplayer.constructs.Event2
import com.futo.platformplayer.encryption.GEncryptionProvider import com.futo.platformplayer.encryption.GEncryptionProvider
import com.futo.platformplayer.generateReadablePassword
import com.futo.platformplayer.getConnectedSocket import com.futo.platformplayer.getConnectedSocket
import com.futo.platformplayer.logging.Logger import com.futo.platformplayer.logging.Logger
import com.futo.platformplayer.mdns.DnsService import com.futo.platformplayer.mdns.DnsService
import com.futo.platformplayer.mdns.ServiceDiscoverer import com.futo.platformplayer.mdns.ServiceDiscoverer
import com.futo.platformplayer.models.HistoryVideo
import com.futo.platformplayer.models.Subscription
import com.futo.platformplayer.noise.protocol.DHState import com.futo.platformplayer.noise.protocol.DHState
import com.futo.platformplayer.noise.protocol.Noise import com.futo.platformplayer.noise.protocol.Noise
import com.futo.platformplayer.smartMerge
import com.futo.platformplayer.stores.FragmentedStorage import com.futo.platformplayer.stores.FragmentedStorage
import com.futo.platformplayer.stores.StringStringMapStorage import com.futo.platformplayer.stores.StringStringMapStorage
import com.futo.platformplayer.stores.StringArrayStorage import com.futo.platformplayer.stores.StringArrayStorage
import com.futo.platformplayer.stores.StringStorage import com.futo.platformplayer.stores.StringStorage
import com.futo.platformplayer.stores.StringTMapStorage import com.futo.platformplayer.stores.StringTMapStorage
import com.futo.platformplayer.sync.SyncSessionData import com.futo.platformplayer.sync.SyncSessionData
import com.futo.platformplayer.sync.internal.ChannelSocket
import com.futo.platformplayer.sync.internal.GJSyncOpcodes import com.futo.platformplayer.sync.internal.GJSyncOpcodes
import com.futo.platformplayer.sync.internal.IAuthorizable
import com.futo.platformplayer.sync.internal.IChannel
import com.futo.platformplayer.sync.internal.Opcode
import com.futo.platformplayer.sync.internal.SyncDeviceInfo import com.futo.platformplayer.sync.internal.SyncDeviceInfo
import com.futo.platformplayer.sync.internal.SyncKeyPair import com.futo.platformplayer.sync.internal.SyncKeyPair
import com.futo.platformplayer.sync.internal.SyncSession import com.futo.platformplayer.sync.internal.SyncSession
import com.futo.platformplayer.sync.internal.SyncSession.Companion
import com.futo.platformplayer.sync.internal.SyncSocketSession import com.futo.platformplayer.sync.internal.SyncSocketSession
import com.futo.platformplayer.sync.models.SendToDevicePackage
import com.futo.platformplayer.sync.models.SyncPlaylistsPackage
import com.futo.platformplayer.sync.models.SyncSubscriptionGroupsPackage
import com.futo.platformplayer.sync.models.SyncSubscriptionsPackage
import com.futo.platformplayer.sync.models.SyncWatchLaterPackage
import com.futo.polycentric.core.base64ToByteArray import com.futo.polycentric.core.base64ToByteArray
import com.futo.polycentric.core.toBase64 import com.futo.polycentric.core.toBase64
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
import kotlinx.serialization.encodeToString import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
import java.io.ByteArrayInputStream
import java.lang.Thread.sleep
import java.net.InetAddress import java.net.InetAddress
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.net.ServerSocket import java.net.ServerSocket
import java.net.Socket import java.net.Socket
import java.nio.ByteBuffer
import java.nio.channels.Channel
import java.time.Instant
import java.time.OffsetDateTime
import java.time.ZoneOffset
import java.util.Base64 import java.util.Base64
import java.util.Locale import java.util.Locale
import kotlin.system.measureTimeMillis import kotlin.system.measureTimeMillis
@ -59,13 +83,19 @@ class StateSync {
//TODO: Should sync mdns and casting mdns be merged? //TODO: Should sync mdns and casting mdns be merged?
//TODO: Decrease interval that devices are updated //TODO: Decrease interval that devices are updated
//TODO: Send less data //TODO: Send less data
val _serviceDiscoverer = ServiceDiscoverer(arrayOf("_gsync._tcp.local")) { handleServiceUpdated(it) } private val _serviceDiscoverer = ServiceDiscoverer(arrayOf("_gsync._tcp.local")) { handleServiceUpdated(it) }
private val _pairingCode: String? = generateReadablePassword(8)
val pairingCode: String? get() = _pairingCode
private var _relaySession: SyncSocketSession? = null
private var _threadRelay: Thread? = null
var keyPair: DHState? = null var keyPair: DHState? = null
var publicKey: String? = null var publicKey: String? = null
val deviceRemoved: Event1<String> = Event1() val deviceRemoved: Event1<String> = Event1()
val deviceUpdatedOrAdded: Event2<String, SyncSession> = Event2() val deviceUpdatedOrAdded: Event2<String, SyncSession> = Event2()
//TODO: Should authorize acknowledge be implemented?
fun hasAuthorizedDevice(): Boolean { fun hasAuthorizedDevice(): Boolean {
synchronized(_sessions) { synchronized(_sessions) {
return _sessions.any{ it.value.connected && it.value.isAuthorized }; return _sessions.any{ it.value.connected && it.value.isAuthorized };
@ -127,7 +157,7 @@ class StateSync {
while (_started) { while (_started) {
val socket = serverSocket.accept() val socket = serverSocket.accept()
val session = createSocketSession(socket, true) { session, socketSession -> val session = createSocketSession(socket, true) { session ->
} }
@ -164,7 +194,7 @@ class StateSync {
for (connectPair in addressesToConnect) { for (connectPair in addressesToConnect) {
try { try {
val syncDeviceInfo = SyncDeviceInfo(connectPair.first, arrayOf(connectPair.second), PORT) val syncDeviceInfo = SyncDeviceInfo(connectPair.first, arrayOf(connectPair.second), PORT, null)
val now = System.currentTimeMillis() val now = System.currentTimeMillis()
val lastConnectTime = synchronized(_lastConnectTimesIp) { val lastConnectTime = synchronized(_lastConnectTimesIp) {
@ -188,6 +218,138 @@ class StateSync {
} }
}.apply { start() } }.apply { start() }
} }
_threadRelay = Thread {
while (_started) {
try {
Log.i(TAG, "Starting relay session...")
var socketClosed = false;
val socket = Socket(RELAY_SERVER, 9000)
_relaySession = SyncSocketSession(
(socket.remoteSocketAddress as InetSocketAddress).address.hostAddress!!,
keyPair!!,
LittleEndianDataInputStream(socket.getInputStream()),
LittleEndianDataOutputStream(socket.getOutputStream()),
isHandshakeAllowed = { _, pk, pairingCode ->
Log.v(TAG, "Check if handshake allowed from '$pk'.")
if (pk == RELAY_PUBLIC_KEY)
return@SyncSocketSession true
synchronized(_authorizedDevices) {
if (_authorizedDevices.values.contains(pk))
return@SyncSocketSession true
}
Log.v(TAG, "Check if handshake allowed with pairing code '$pairingCode' with active pairing code '$_pairingCode'.")
if (_pairingCode == null || pairingCode.isNullOrEmpty())
return@SyncSocketSession false
_pairingCode == pairingCode
},
onNewChannel = { _, c ->
val remotePublicKey = c.remotePublicKey
if (remotePublicKey == null) {
Log.e(TAG, "Remote public key should never be null in onNewChannel.")
return@SyncSocketSession
}
Log.i(TAG, "New channel established from relay (pk: '$remotePublicKey').")
var session: SyncSession?
synchronized(_sessions) {
session = _sessions[remotePublicKey]
if (session == null) {
val remoteDeviceName = synchronized(_nameStorage) {
_nameStorage.get(remotePublicKey)
}
session = createNewSyncSession(remotePublicKey, remoteDeviceName) { }
_sessions[remotePublicKey] = session!!
}
session!!.addChannel(c)
}
c.setDataHandler { _, channel, opcode, subOpcode, data ->
session?.handlePacket(opcode, subOpcode, data)
}
c.setCloseHandler { channel ->
session?.removeChannel(channel)
}
},
onChannelEstablished = { _, channel, isResponder ->
handleAuthorization(channel, isResponder)
},
onClose = { socketClosed = true },
onHandshakeComplete = { relaySession ->
Thread {
try {
while (_started && !socketClosed) {
val unconnectedAuthorizedDevices = synchronized(_authorizedDevices) {
_authorizedDevices.values.filter { !isConnected(it) }.toTypedArray()
}
relaySession.publishConnectionInformation(unconnectedAuthorizedDevices, PORT, true, false, false, true)
val connectionInfos = runBlocking { relaySession.requestBulkConnectionInfo(unconnectedAuthorizedDevices) }
for ((targetKey, connectionInfo) in connectionInfos) {
val potentialLocalAddresses = connectionInfo.ipv4Addresses.union(connectionInfo.ipv6Addresses)
.filter { it != connectionInfo.remoteIp }
if (connectionInfo.allowLocalDirect) {
Thread {
try {
Log.v(TAG, "Attempting to connect directly, locally to '$targetKey'.")
connect(potentialLocalAddresses.map { it }.toTypedArray(), PORT, targetKey, null)
} catch (e: Throwable) {
Log.e(TAG, "Failed to start direct connection using connection info with $targetKey.", e)
}
}.start()
}
if (connectionInfo.allowRemoteDirect) {
// TODO: Implement direct remote connection if needed
}
if (connectionInfo.allowRemoteHolePunched) {
// TODO: Implement hole punching if needed
}
if (connectionInfo.allowRemoteProxied) {
try {
Log.v(TAG, "Attempting relayed connection with '$targetKey'.")
runBlocking { relaySession.startRelayedChannel(targetKey, null) }
} catch (e: Throwable) {
Log.e(TAG, "Failed to start relayed channel with $targetKey.", e)
}
}
}
Thread.sleep(15000)
}
} catch (e: Throwable) {
Log.e(TAG, "Unhandled exception in relay session.", e)
relaySession.stop()
}
}.start()
}
)
_relaySession!!.authorizable = object : IAuthorizable {
override val isAuthorized: Boolean get() = true
}
_relaySession!!.startAsInitiator(RELAY_PUBLIC_KEY, null)
Log.i(TAG, "Started relay session.")
} catch (e: Throwable) {
Log.e(TAG, "Relay session failed.", e)
Thread.sleep(5000)
} finally {
_relaySession?.stop()
_relaySession = null
}
}
}.apply { start() }
} }
private fun getDeviceName(): String { private fun getDeviceName(): String {
@ -219,14 +381,14 @@ class StateSync {
} }
} }
fun getSessions(): List<SyncSession> { fun getSessions(): List<SyncSession> {
return synchronized(_sessions) { synchronized(_sessions) {
return _sessions.values.toList() return _sessions.values.toList()
}; }
} }
fun getAuthorizedSessions(): List<SyncSession> { fun getAuthorizedSessions(): List<SyncSession> {
return synchronized(_sessions) { synchronized(_sessions) {
return _sessions.values.filter { it.isAuthorized }.toList() return _sessions.values.filter { it.isAuthorized }.toList()
}; }
} }
fun getSyncSessionData(key: String): SyncSessionData { fun getSyncSessionData(key: String): SyncSessionData {
@ -253,7 +415,7 @@ class StateSync {
val urlSafePkey = s.texts.firstOrNull { it.startsWith("pk=") }?.substring("pk=".length) ?: continue val urlSafePkey = s.texts.firstOrNull { it.startsWith("pk=") }?.substring("pk=".length) ?: continue
val pkey = Base64.getEncoder().encodeToString(Base64.getDecoder().decode(urlSafePkey.replace('-', '+').replace('_', '/'))) val pkey = Base64.getEncoder().encodeToString(Base64.getDecoder().decode(urlSafePkey.replace('-', '+').replace('_', '/')))
val syncDeviceInfo = SyncDeviceInfo(pkey, addresses, port) val syncDeviceInfo = SyncDeviceInfo(pkey, addresses, port, null)
val authorized = isAuthorized(pkey) val authorized = isAuthorized(pkey)
if (authorized && !isConnected(pkey)) { if (authorized && !isConnected(pkey)) {
@ -288,11 +450,313 @@ class StateSync {
deviceRemoved.emit(remotePublicKey) deviceRemoved.emit(remotePublicKey)
} }
private fun createSocketSession(socket: Socket, isResponder: Boolean, onAuthorized: (session: SyncSession, socketSession: SyncSocketSession) -> Unit): SyncSocketSession {
private fun handleSyncSubscriptionPackage(origin: SyncSession, pack: SyncSubscriptionsPackage) {
val added = mutableListOf<Subscription>()
for(sub in pack.subscriptions) {
if(!StateSubscriptions.instance.isSubscribed(sub.channel)) {
val removalTime = StateSubscriptions.instance.getSubscriptionRemovalTime(sub.channel.url);
if(sub.creationTime > removalTime) {
val newSub = StateSubscriptions.instance.addSubscription(sub.channel, sub.creationTime);
added.add(newSub);
}
}
}
if(added.size > 3)
UIDialogs.appToast("${added.size} Subscriptions from ${origin.remotePublicKey.substring(0, Math.min(8, origin.remotePublicKey.length))}");
else if(added.size > 0)
UIDialogs.appToast("Subscriptions from ${origin.remotePublicKey.substring(0, Math.min(8, origin.remotePublicKey.length))}:\n" +
added.map { it.channel.name }.joinToString("\n"));
if(pack.subscriptions.isNotEmpty()) {
for (subRemoved in pack.subscriptionRemovals) {
val removed = StateSubscriptions.instance.applySubscriptionRemovals(pack.subscriptionRemovals);
if(removed.size > 3) {
UIDialogs.appToast("Removed ${removed.size} Subscriptions from ${origin.remotePublicKey.substring(0, 8.coerceAtMost(origin.remotePublicKey.length))}");
} else if(removed.isNotEmpty()) {
UIDialogs.appToast("Subscriptions removed from ${origin.remotePublicKey.substring(0, 8.coerceAtMost(origin.remotePublicKey.length))}:\n" + removed.map { it.channel.name }.joinToString("\n"));
}
}
}
}
private fun handleData(session: SyncSession, opcode: UByte, subOpcode: UByte, data: ByteBuffer) {
val remotePublicKey = session.remotePublicKey
when (subOpcode) {
GJSyncOpcodes.sendToDevices -> {
StateApp.instance.scopeOrNull?.launch(Dispatchers.Main) {
val context = StateApp.instance.contextOrNull;
if (context != null && context is MainActivity) {
val dataBody = ByteArray(data.remaining());
val remainder = data.remaining();
data.get(dataBody, 0, remainder);
val json = String(dataBody, Charsets.UTF_8);
val obj = Json.decodeFromString<SendToDevicePackage>(json);
UIDialogs.appToast("Received url from device [${session.remotePublicKey}]:\n{${obj.url}");
context.handleUrl(obj.url, obj.position);
}
};
}
GJSyncOpcodes.syncStateExchange -> {
val dataBody = ByteArray(data.remaining());
data.get(dataBody);
val json = String(dataBody, Charsets.UTF_8);
val syncSessionData = Serializer.json.decodeFromString<SyncSessionData>(json);
Logger.i(TAG, "Received SyncSessionData from $remotePublicKey");
session.sendData(GJSyncOpcodes.syncSubscriptions, StateSubscriptions.instance.getSyncSubscriptionsPackageString());
session.sendData(GJSyncOpcodes.syncSubscriptionGroups, StateSubscriptionGroups.instance.getSyncSubscriptionGroupsPackageString());
session.sendData(GJSyncOpcodes.syncPlaylists, StatePlaylists.instance.getSyncPlaylistsPackageString())
session.sendData(GJSyncOpcodes.syncWatchLater, Json.encodeToString(StatePlaylists.instance.getWatchLaterSyncPacket(false)));
val recentHistory = StateHistory.instance.getRecentHistory(syncSessionData.lastHistory);
if(recentHistory.isNotEmpty())
session.sendJsonData(GJSyncOpcodes.syncHistory, recentHistory);
}
GJSyncOpcodes.syncExport -> {
val dataBody = ByteArray(data.remaining());
val bytesStr = ByteArrayInputStream(data.array(), data.position(), data.remaining());
bytesStr.use { bytesStrBytes ->
val exportStruct = StateBackup.ExportStructure.fromZipBytes(bytesStrBytes);
for (store in exportStruct.stores) {
if (store.key.equals("subscriptions", true)) {
val subStore =
StateSubscriptions.instance.getUnderlyingSubscriptionsStore();
StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) {
val pack = SyncSubscriptionsPackage(
store.value.map {
subStore.fromReconstruction(it, exportStruct.cache)
},
StateSubscriptions.instance.getSubscriptionRemovals()
);
handleSyncSubscriptionPackage(session, pack);
}
}
}
}
}
GJSyncOpcodes.syncSubscriptions -> {
val dataBody = ByteArray(data.remaining());
data.get(dataBody);
val json = String(dataBody, Charsets.UTF_8);
val subPackage = Serializer.json.decodeFromString<SyncSubscriptionsPackage>(json);
handleSyncSubscriptionPackage(session, subPackage);
val newestSub = subPackage.subscriptions.maxOf { it.creationTime };
val sesData = getSyncSessionData(remotePublicKey);
if(newestSub > sesData.lastSubscription) {
sesData.lastSubscription = newestSub;
saveSyncSessionData(sesData);
}
}
GJSyncOpcodes.syncSubscriptionGroups -> {
val dataBody = ByteArray(data.remaining());
data.get(dataBody);
val json = String(dataBody, Charsets.UTF_8);
val pack = Serializer.json.decodeFromString<SyncSubscriptionGroupsPackage>(json);
var lastSubgroupChange = OffsetDateTime.MIN;
for(group in pack.groups){
if(group.lastChange > lastSubgroupChange)
lastSubgroupChange = group.lastChange;
val existing = StateSubscriptionGroups.instance.getSubscriptionGroup(group.id);
if(existing == null)
StateSubscriptionGroups.instance.updateSubscriptionGroup(group, false, true);
else if(existing.lastChange < group.lastChange) {
existing.name = group.name;
existing.urls = group.urls;
existing.image = group.image;
existing.priority = group.priority;
existing.lastChange = group.lastChange;
StateSubscriptionGroups.instance.updateSubscriptionGroup(existing, false, true);
}
}
for(removal in pack.groupRemovals) {
val creation = StateSubscriptionGroups.instance.getSubscriptionGroup(removal.key);
val removalTime = OffsetDateTime.ofInstant(Instant.ofEpochSecond(removal.value, 0), ZoneOffset.UTC);
if(creation != null && creation.creationTime < removalTime)
StateSubscriptionGroups.instance.deleteSubscriptionGroup(removal.key, false);
}
}
GJSyncOpcodes.syncPlaylists -> {
val dataBody = ByteArray(data.remaining());
data.get(dataBody);
val json = String(dataBody, Charsets.UTF_8);
val pack = Serializer.json.decodeFromString<SyncPlaylistsPackage>(json);
for(playlist in pack.playlists) {
val existing = StatePlaylists.instance.getPlaylist(playlist.id);
if(existing == null)
StatePlaylists.instance.createOrUpdatePlaylist(playlist, false);
else if(existing.dateUpdate.toLocalDateTime() < playlist.dateUpdate.toLocalDateTime()) {
existing.dateUpdate = playlist.dateUpdate;
existing.name = playlist.name;
existing.videos = playlist.videos;
existing.dateCreation = playlist.dateCreation;
existing.datePlayed = playlist.datePlayed;
StatePlaylists.instance.createOrUpdatePlaylist(existing, false);
}
}
for(removal in pack.playlistRemovals) {
val creation = StatePlaylists.instance.getPlaylist(removal.key);
val removalTime = OffsetDateTime.ofInstant(Instant.ofEpochSecond(removal.value, 0), ZoneOffset.UTC);
if(creation != null && creation.dateCreation < removalTime)
StatePlaylists.instance.removePlaylist(creation, false);
}
}
GJSyncOpcodes.syncWatchLater -> {
val dataBody = ByteArray(data.remaining());
data.get(dataBody);
val json = String(dataBody, Charsets.UTF_8);
val pack = Serializer.json.decodeFromString<SyncWatchLaterPackage>(json);
Logger.i(TAG, "SyncWatchLater received ${pack.videos.size} (${pack.videoAdds?.size}, ${pack.videoRemovals?.size})");
val allExisting = StatePlaylists.instance.getWatchLater();
for(video in pack.videos) {
val existing = allExisting.firstOrNull { it.url == video.url };
val time = if(pack.videoAdds != null && pack.videoAdds.containsKey(video.url)) OffsetDateTime.ofInstant(Instant.ofEpochSecond(pack.videoAdds[video.url] ?: 0), ZoneOffset.UTC) else OffsetDateTime.MIN;
if(existing == null) {
StatePlaylists.instance.addToWatchLater(video, false);
if(time > OffsetDateTime.MIN)
StatePlaylists.instance.setWatchLaterAddTime(video.url, time);
}
}
for(removal in pack.videoRemovals) {
val watchLater = allExisting.firstOrNull { it.url == removal.key } ?: continue;
val creation = StatePlaylists.instance.getWatchLaterRemovalTime(watchLater.url) ?: OffsetDateTime.MIN;
val removalTime = OffsetDateTime.ofInstant(Instant.ofEpochSecond(removal.value), ZoneOffset.UTC);
if(creation < removalTime)
StatePlaylists.instance.removeFromWatchLater(watchLater, false, removalTime);
}
val packReorderTime = OffsetDateTime.ofInstant(Instant.ofEpochSecond(pack.reorderTime), ZoneOffset.UTC);
val localReorderTime = StatePlaylists.instance.getWatchLaterLastReorderTime();
if(localReorderTime < packReorderTime && pack.ordering != null) {
StatePlaylists.instance.updateWatchLaterOrdering(smartMerge(pack.ordering!!, StatePlaylists.instance.getWatchLaterOrdering()), true);
}
}
GJSyncOpcodes.syncHistory -> {
val dataBody = ByteArray(data.remaining());
data.get(dataBody);
val json = String(dataBody, Charsets.UTF_8);
val history = Serializer.json.decodeFromString<List<HistoryVideo>>(json);
Logger.i(TAG, "SyncHistory received ${history.size} videos from ${remotePublicKey}");
var lastHistory = OffsetDateTime.MIN;
for(video in history){
val hist = StateHistory.instance.getHistoryByVideo(video.video, true, video.date);
if(hist != null)
StateHistory.instance.updateHistoryPosition(video.video, hist, true, video.position, video.date)
if(lastHistory < video.date)
lastHistory = video.date;
}
if(lastHistory != OffsetDateTime.MIN && history.size > 1) {
val sesData = getSyncSessionData(remotePublicKey);
if (lastHistory > sesData.lastHistory) {
sesData.lastHistory = lastHistory;
saveSyncSessionData(sesData);
}
}
}
}
}
private fun createNewSyncSession(remotePublicKey: String, remoteDeviceName: String?, onAuthorized: ((SyncSession) -> Unit)?): SyncSession {
return SyncSession(
remotePublicKey,
onAuthorized = { it, isNewlyAuthorized, isNewSession ->
if (!isNewSession) {
return@SyncSession
}
it.remoteDeviceName?.let { remoteDeviceName ->
synchronized(_nameStorage) {
_nameStorage.setAndSave(remotePublicKey, remoteDeviceName)
}
}
Logger.i(TAG, "${remotePublicKey} authorized (name: ${it.displayName})")
onAuthorized?.invoke(it)
_authorizedDevices.addDistinct(remotePublicKey)
_authorizedDevices.save()
deviceUpdatedOrAdded.emit(it.remotePublicKey, it)
checkForSync(it);
},
onUnauthorized = {
unauthorize(remotePublicKey)
synchronized(_sessions) {
it.close()
_sessions.remove(remotePublicKey)
}
},
onConnectedChanged = { it, connected ->
Logger.i(TAG, "$remotePublicKey connected: " + connected)
deviceUpdatedOrAdded.emit(it.remotePublicKey, it)
},
onClose = {
Logger.i(TAG, "$remotePublicKey closed")
synchronized(_sessions)
{
_sessions.remove(it.remotePublicKey)
}
deviceRemoved.emit(it.remotePublicKey)
},
dataHandler = { it, opcode, subOpcode, data ->
handleData(it, opcode, subOpcode, data)
},
remoteDeviceName
)
}
private fun createSocketSession(socket: Socket, isResponder: Boolean, onAuthorized: (session: SyncSession) -> Unit): SyncSocketSession {
var session: SyncSession? = null var session: SyncSession? = null
return SyncSocketSession((socket.remoteSocketAddress as InetSocketAddress).address.hostAddress!!, keyPair!!, LittleEndianDataInputStream(socket.getInputStream()), LittleEndianDataOutputStream(socket.getOutputStream()), var channelSocket: ChannelSocket? = null
return SyncSocketSession(
(socket.remoteSocketAddress as InetSocketAddress).address.hostAddress!!,
keyPair!!,
LittleEndianDataInputStream(socket.getInputStream()),
LittleEndianDataOutputStream(socket.getOutputStream()),
onClose = { s -> onClose = { s ->
session?.removeSocketSession(s) if (channelSocket != null)
session?.removeChannel(channelSocket!!)
},
isHandshakeAllowed = { _, pk, pairingCode ->
Logger.v(TAG, "Check if handshake allowed from '${pk}'.")
synchronized (_authorizedDevices)
{
if (_authorizedDevices.values.contains(pk))
return@SyncSocketSession true
}
Logger.v(TAG, "Check if handshake allowed with pairing code '${pairingCode}' with active pairing code '${_pairingCode}'.");
if (_pairingCode == null || pairingCode.isNullOrEmpty())
return@SyncSocketSession false
return@SyncSocketSession _pairingCode == pairingCode
}, },
onHandshakeComplete = { s -> onHandshakeComplete = { s ->
val remotePublicKey = s.remotePublicKey val remotePublicKey = s.remotePublicKey
@ -303,6 +767,8 @@ class StateSync {
Logger.i(TAG, "Handshake complete with (LocalPublicKey = ${s.localPublicKey}, RemotePublicKey = ${s.remotePublicKey})") Logger.i(TAG, "Handshake complete with (LocalPublicKey = ${s.localPublicKey}, RemotePublicKey = ${s.remotePublicKey})")
channelSocket = ChannelSocket(s)
synchronized(_sessions) { synchronized(_sessions) {
session = _sessions[s.remotePublicKey] session = _sessions[s.remotePublicKey]
if (session == null) { if (session == null) {
@ -310,126 +776,99 @@ class StateSync {
_nameStorage.get(remotePublicKey) _nameStorage.get(remotePublicKey)
} }
session = SyncSession(remotePublicKey, onAuthorized = { it, isNewlyAuthorized, isNewSession -> synchronized(_lastAddressStorage) {
if (!isNewSession) { _lastAddressStorage.setAndSave(remotePublicKey, s.remoteAddress)
return@SyncSession }
}
it.remoteDeviceName?.let { remoteDeviceName -> session = createNewSyncSession(remotePublicKey, remoteDeviceName, onAuthorized)
synchronized(_nameStorage) {
_nameStorage.setAndSave(remotePublicKey, remoteDeviceName)
}
}
Logger.i(TAG, "${s.remotePublicKey} authorized (name: ${it.displayName})")
synchronized(_lastAddressStorage) {
_lastAddressStorage.setAndSave(remotePublicKey, s.remoteAddress)
}
onAuthorized(it, s)
_authorizedDevices.addDistinct(remotePublicKey)
_authorizedDevices.save()
deviceUpdatedOrAdded.emit(it.remotePublicKey, session!!)
checkForSync(it);
}, onUnauthorized = {
unauthorize(remotePublicKey)
synchronized(_sessions) {
session?.close()
_sessions.remove(remotePublicKey)
}
}, onConnectedChanged = { it, connected ->
Logger.i(TAG, "${s.remotePublicKey} connected: " + connected)
deviceUpdatedOrAdded.emit(it.remotePublicKey, session!!)
}, onClose = {
Logger.i(TAG, "${s.remotePublicKey} closed")
synchronized(_sessions)
{
_sessions.remove(it.remotePublicKey)
}
deviceRemoved.emit(it.remotePublicKey)
}, remoteDeviceName)
_sessions[remotePublicKey] = session!! _sessions[remotePublicKey] = session!!
} }
session!!.addSocketSession(s) session!!.addChannel(channelSocket!!)
} }
if (isResponder) { handleAuthorization(channelSocket!!, isResponder)
val isAuthorized = synchronized(_authorizedDevices) {
_authorizedDevices.values.contains(remotePublicKey)
}
if (!isAuthorized) {
val scope = StateApp.instance.scopeOrNull
val activity = SyncShowPairingCodeActivity.activity
if (scope != null && activity != null) {
scope.launch(Dispatchers.Main) {
UIDialogs.showConfirmationDialog(activity, "Allow connection from ${remotePublicKey}?", action = {
scope.launch(Dispatchers.IO) {
try {
session!!.authorize(s)
Logger.i(TAG, "Connection authorized for $remotePublicKey by confirmation")
} catch (e: Throwable) {
Logger.e(TAG, "Failed to send authorize", e)
}
}
}, cancelAction = {
scope.launch(Dispatchers.IO) {
try {
unauthorize(remotePublicKey)
} catch (e: Throwable) {
Logger.w(TAG, "Failed to send unauthorize", e)
}
synchronized(_sessions) {
session?.close()
_sessions.remove(remotePublicKey)
}
}
})
}
} else {
val publicKey = session!!.remotePublicKey
session!!.unauthorize(s)
session!!.close()
synchronized(_sessions) {
_sessions.remove(publicKey)
}
Logger.i(TAG, "Connection unauthorized for ${remotePublicKey} because not authorized and not on pairing activity to ask")
}
} else {
//Responder does not need to check because already approved
session!!.authorize(s)
Logger.i(TAG, "Connection authorized for ${remotePublicKey} because already authorized")
}
} else {
//Initiator does not need to check because the manual action of scanning the QR counts as approval
session!!.authorize(s)
Logger.i(TAG, "Connection authorized for ${remotePublicKey} because initiator")
}
}, },
onData = { s, opcode, subOpcode, data -> onData = { s, opcode, subOpcode, data ->
session?.handlePacket(s, opcode, subOpcode, data) session?.handlePacket(opcode, subOpcode, data)
}) }
)
}
private fun handleAuthorization(channel: IChannel, isResponder: Boolean) {
val syncSession = channel.syncSession!!
val remotePublicKey = channel.remotePublicKey!!
if (isResponder) {
val isAuthorized = synchronized(_authorizedDevices) {
_authorizedDevices.values.contains(remotePublicKey)
}
if (!isAuthorized) {
val scope = StateApp.instance.scopeOrNull
val activity = SyncShowPairingCodeActivity.activity
if (scope != null && activity != null) {
scope.launch(Dispatchers.Main) {
UIDialogs.showConfirmationDialog(activity, "Allow connection from ${remotePublicKey}?",
action = {
scope.launch(Dispatchers.IO) {
try {
syncSession.authorize()
Logger.i(TAG, "Connection authorized for $remotePublicKey by confirmation")
} catch (e: Throwable) {
Logger.e(TAG, "Failed to send authorize", e)
}
}
},
cancelAction = {
scope.launch(Dispatchers.IO) {
try {
unauthorize(remotePublicKey)
} catch (e: Throwable) {
Logger.w(TAG, "Failed to send unauthorize", e)
}
syncSession.close()
synchronized(_sessions) {
_sessions.remove(remotePublicKey)
}
}
}
)
}
} else {
val publicKey = syncSession.remotePublicKey
syncSession.unauthorize()
syncSession.close()
synchronized(_sessions) {
_sessions.remove(publicKey)
}
Logger.i(TAG, "Connection unauthorized for $remotePublicKey because not authorized and not on pairing activity to ask")
}
} else {
//Responder does not need to check because already approved
syncSession.authorize()
Logger.i(TAG, "Connection authorized for $remotePublicKey because already authorized")
}
} else {
//Initiator does not need to check because the manual action of scanning the QR counts as approval
syncSession.authorize()
Logger.i(TAG, "Connection authorized for $remotePublicKey because initiator")
}
} }
inline fun <reified T> broadcastJsonData(subOpcode: UByte, data: T) { inline fun <reified T> broadcastJsonData(subOpcode: UByte, data: T) {
broadcast(SyncSocketSession.Opcode.DATA.value, subOpcode, Json.encodeToString(data)); broadcast(Opcode.DATA.value, subOpcode, Json.encodeToString(data));
} }
fun broadcastData(subOpcode: UByte, data: String) { fun broadcastData(subOpcode: UByte, data: String) {
broadcast(SyncSocketSession.Opcode.DATA.value, subOpcode, data.toByteArray(Charsets.UTF_8)); broadcast(Opcode.DATA.value, subOpcode, ByteBuffer.wrap(data.toByteArray(Charsets.UTF_8)));
} }
fun broadcast(opcode: UByte, subOpcode: UByte, data: String) { fun broadcast(opcode: UByte, subOpcode: UByte, data: String) {
broadcast(opcode, subOpcode, data.toByteArray(Charsets.UTF_8)); broadcast(opcode, subOpcode, ByteBuffer.wrap(data.toByteArray(Charsets.UTF_8)));
} }
fun broadcast(opcode: UByte, subOpcode: UByte, data: ByteArray) { fun broadcast(opcode: UByte, subOpcode: UByte, data: ByteBuffer) {
for(session in getAuthorizedSessions()) { for(session in getAuthorizedSessions()) {
try { try {
session.send(opcode, subOpcode, data); session.send(opcode, subOpcode, data);
@ -456,21 +895,46 @@ class StateSync {
_serverSocket?.close() _serverSocket?.close()
_serverSocket = null _serverSocket = null
//_thread?.join() _thread?.interrupt()
_thread = null _thread = null
_connectThread?.interrupt()
_connectThread = null _connectThread = null
_threadRelay?.interrupt()
_threadRelay = null
_relaySession?.stop()
_relaySession = null
} }
fun connect(deviceInfo: SyncDeviceInfo, onStatusUpdate: ((session: SyncSocketSession?, complete: Boolean, message: String) -> Unit)? = null): SyncSocketSession { fun connect(deviceInfo: SyncDeviceInfo, onStatusUpdate: ((complete: Boolean?, message: String) -> Unit)? = null) {
onStatusUpdate?.invoke(null, false, "Connecting...") try {
val socket = getConnectedSocket(deviceInfo.addresses.map { InetAddress.getByName(it) }, deviceInfo.port) ?: throw Exception("Failed to connect") connect(deviceInfo.addresses, deviceInfo.port, deviceInfo.publicKey, deviceInfo.pairingCode, onStatusUpdate)
onStatusUpdate?.invoke(null, false, "Handshaking...") } catch (e: Throwable) {
Logger.e(TAG, "Failed to connect directly", e)
val relaySession = _relaySession
if (relaySession != null) {
onStatusUpdate?.invoke(null, "Connecting via relay...")
val session = createSocketSession(socket, false) { _, ss -> runBlocking {
onStatusUpdate?.invoke(ss, true, "Handshake complete") relaySession.startRelayedChannel(deviceInfo.publicKey, deviceInfo.pairingCode)
onStatusUpdate?.invoke(true, "Connected")
}
} else {
throw Exception("Failed to connect.")
}
}
}
fun connect(addresses: Array<String>, port: Int, publicKey: String, pairingCode: String?, onStatusUpdate: ((complete: Boolean?, message: String) -> Unit)? = null): SyncSocketSession {
onStatusUpdate?.invoke(null, "Connecting directly...")
val socket = getConnectedSocket(addresses.map { InetAddress.getByName(it) }, port) ?: throw Exception("Failed to connect")
onStatusUpdate?.invoke(null, "Handshaking...")
val session = createSocketSession(socket, false) { s ->
onStatusUpdate?.invoke(true, "Authorized")
} }
session.startAsInitiator(deviceInfo.publicKey) session.startAsInitiator(publicKey, pairingCode)
return session return session
} }
@ -526,6 +990,8 @@ class StateSync {
val hash = "BLAKE2b" val hash = "BLAKE2b"
var protocolName = "Noise_${pattern}_${dh}_${cipher}_${hash}" var protocolName = "Noise_${pattern}_${dh}_${cipher}_${hash}"
val version = 1 val version = 1
val RELAY_SERVER = "relay.grayjay.app"
val RELAY_PUBLIC_KEY = "xGbHRzDOvE6plRbQaFgSen82eijF+gxS0yeUaeEErkw="
private const val TAG = "StateSync" private const val TAG = "StateSync"
const val PORT = 12315 const val PORT = 12315

View File

@ -0,0 +1,332 @@
package com.futo.platformplayer.sync.internal
import com.futo.platformplayer.logging.Logger
import com.futo.platformplayer.noise.protocol.CipherStatePair
import com.futo.platformplayer.noise.protocol.DHState
import com.futo.platformplayer.noise.protocol.HandshakeState
import com.futo.platformplayer.states.StateSync
import java.nio.ByteBuffer
import java.nio.ByteOrder
import java.util.Base64
interface IChannel : AutoCloseable {
val remotePublicKey: String?
val remoteVersion: Int?
var authorizable: IAuthorizable?
var syncSession: SyncSession?
fun setDataHandler(onData: ((SyncSocketSession, IChannel, UByte, UByte, ByteBuffer) -> Unit)?)
fun send(opcode: UByte, subOpcode: UByte = 0u, data: ByteBuffer? = null)
fun setCloseHandler(onClose: ((IChannel) -> Unit)?)
}
class ChannelSocket(private val session: SyncSocketSession) : IChannel {
override val remotePublicKey: String? get() = session.remotePublicKey
override val remoteVersion: Int? get() = session.remoteVersion
private var onData: ((SyncSocketSession, IChannel, UByte, UByte, ByteBuffer) -> Unit)? = null
private var onClose: ((IChannel) -> Unit)? = null
override var authorizable: IAuthorizable?
get() = session.authorizable
set(value) { session.authorizable = value }
override var syncSession: SyncSession? = null
override fun setDataHandler(onData: ((SyncSocketSession, IChannel, UByte, UByte, ByteBuffer) -> Unit)?) {
this.onData = onData
}
override fun setCloseHandler(onClose: ((IChannel) -> Unit)?) {
this.onClose = onClose
}
override fun close() {
session.stop()
onClose?.invoke(this)
}
fun invokeDataHandler(opcode: UByte, subOpcode: UByte, data: ByteBuffer) {
onData?.invoke(session, this, opcode, subOpcode, data)
}
override fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer?) {
if (data != null) {
session.send(opcode, subOpcode, data)
} else {
session.send(opcode, subOpcode)
}
}
}
class ChannelRelayed(
private val session: SyncSocketSession,
private val localKeyPair: DHState,
private val publicKey: String,
private val initiator: Boolean
) : IChannel {
private val sendLock = Object()
private val decryptLock = Object()
private var handshakeState: HandshakeState? = if (initiator) {
HandshakeState(StateSync.protocolName, HandshakeState.INITIATOR).apply {
localKeyPair.copyFrom(this@ChannelRelayed.localKeyPair)
remotePublicKey.setPublicKey(Base64.getDecoder().decode(publicKey), 0)
}
} else {
HandshakeState(StateSync.protocolName, HandshakeState.RESPONDER).apply {
localKeyPair.copyFrom(this@ChannelRelayed.localKeyPair)
}
}
private var transport: CipherStatePair? = null
override var authorizable: IAuthorizable? = null
val isAuthorized: Boolean get() = authorizable?.isAuthorized ?: false
var connectionId: Long = 0L
override var remotePublicKey: String? = publicKey
private set
override var remoteVersion: Int? = null
private set
override var syncSession: SyncSession? = null
private var onData: ((SyncSocketSession, IChannel, UByte, UByte, ByteBuffer) -> Unit)? = null
private var onClose: ((IChannel) -> Unit)? = null
private var disposed = false
init {
handshakeState?.start()
}
override fun setDataHandler(onData: ((SyncSocketSession, IChannel, UByte, UByte, ByteBuffer) -> Unit)?) {
this.onData = onData
}
override fun setCloseHandler(onClose: ((IChannel) -> Unit)?) {
this.onClose = onClose
}
override fun close() {
disposed = true
if (connectionId != 0L) {
Thread {
try {
session.sendRelayError(connectionId, SyncErrorCode.ConnectionClosed)
} catch (e: Exception) {
Logger.e("ChannelRelayed", "Exception while sending relay error", e)
}
}.start()
}
transport?.sender?.destroy()
transport?.receiver?.destroy()
transport = null
handshakeState?.destroy()
handshakeState = null
onClose?.invoke(this)
}
private fun throwIfDisposed() {
if (disposed) throw IllegalStateException("ChannelRelayed is disposed")
}
fun invokeDataHandler(opcode: UByte, subOpcode: UByte, data: ByteBuffer) {
onData?.invoke(session, this, opcode, subOpcode, data)
}
private fun completeHandshake(remoteVersion: Int, transport: CipherStatePair) {
throwIfDisposed()
this.remoteVersion = remoteVersion
val remoteKeyBytes = ByteArray(handshakeState!!.remotePublicKey.publicKeyLength)
handshakeState!!.remotePublicKey.getPublicKey(remoteKeyBytes, 0)
this.remotePublicKey = Base64.getEncoder().encodeToString(remoteKeyBytes)
handshakeState?.destroy()
handshakeState = null
this.transport = transport
Logger.i("ChannelRelayed", "Completed handshake for connectionId $connectionId")
}
private fun sendPacket(packet: ByteArray) {
throwIfDisposed()
synchronized(sendLock) {
val encryptedPayload = ByteArray(packet.size + 16)
val encryptedLength = transport!!.sender.encryptWithAd(null, packet, 0, encryptedPayload, 0, packet.size)
val relayedPacket = ByteArray(8 + encryptedLength)
ByteBuffer.wrap(relayedPacket).order(ByteOrder.LITTLE_ENDIAN).apply {
putLong(connectionId)
put(encryptedPayload, 0, encryptedLength)
}
session.send(Opcode.RELAY.value, RelayOpcode.DATA.value, ByteBuffer.wrap(relayedPacket).order(ByteOrder.LITTLE_ENDIAN))
}
}
fun sendError(errorCode: SyncErrorCode) {
throwIfDisposed()
synchronized(sendLock) {
val packet = ByteArray(4)
ByteBuffer.wrap(packet).order(ByteOrder.LITTLE_ENDIAN).putInt(errorCode.value)
val encryptedPayload = ByteArray(4 + 16)
val encryptedLength = transport!!.sender.encryptWithAd(null, packet, 0, encryptedPayload, 0, packet.size)
val relayedPacket = ByteArray(8 + encryptedLength)
ByteBuffer.wrap(relayedPacket).order(ByteOrder.LITTLE_ENDIAN).apply {
putLong(connectionId)
put(encryptedPayload, 0, encryptedLength)
}
session.send(Opcode.RELAY.value, RelayOpcode.ERROR.value, ByteBuffer.wrap(relayedPacket))
}
}
override fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer?) {
throwIfDisposed()
val actualCount = data?.remaining() ?: 0
val ENCRYPTION_OVERHEAD = 16
val CONNECTION_ID_SIZE = 8
val HEADER_SIZE = 6
val MAX_DATA_PER_PACKET = SyncSocketSession.MAXIMUM_PACKET_SIZE - HEADER_SIZE - CONNECTION_ID_SIZE - ENCRYPTION_OVERHEAD - 16
if (actualCount > MAX_DATA_PER_PACKET && data != null) {
val streamId = session.generateStreamId()
val totalSize = actualCount
var sendOffset = 0
while (sendOffset < totalSize) {
val bytesRemaining = totalSize - sendOffset
val bytesToSend = minOf(MAX_DATA_PER_PACKET - 8 - 2, bytesRemaining)
val streamData: ByteArray
val streamOpcode: StreamOpcode
if (sendOffset == 0) {
streamOpcode = StreamOpcode.START
streamData = ByteArray(4 + 4 + 1 + 1 + bytesToSend)
ByteBuffer.wrap(streamData).order(ByteOrder.LITTLE_ENDIAN).apply {
putInt(streamId)
putInt(totalSize)
put(opcode.toByte())
put(subOpcode.toByte())
put(data.array(), data.position() + sendOffset, bytesToSend)
}
} else {
streamData = ByteArray(4 + 4 + bytesToSend)
ByteBuffer.wrap(streamData).order(ByteOrder.LITTLE_ENDIAN).apply {
putInt(streamId)
putInt(sendOffset)
put(data.array(), data.position() + sendOffset, bytesToSend)
}
streamOpcode = if (bytesToSend < bytesRemaining) StreamOpcode.DATA else StreamOpcode.END
}
val fullPacket = ByteArray(HEADER_SIZE + streamData.size)
ByteBuffer.wrap(fullPacket).order(ByteOrder.LITTLE_ENDIAN).apply {
putInt(streamData.size + 2)
put(Opcode.STREAM.value.toByte())
put(streamOpcode.value.toByte())
put(streamData)
}
sendPacket(fullPacket)
sendOffset += bytesToSend
}
} else {
val packet = ByteArray(HEADER_SIZE + actualCount)
ByteBuffer.wrap(packet).order(ByteOrder.LITTLE_ENDIAN).apply {
putInt(actualCount + 2)
put(opcode.toByte())
put(subOpcode.toByte())
if (actualCount > 0 && data != null) put(data.array(), data.position(), actualCount)
}
sendPacket(packet)
}
}
fun sendRequestTransport(requestId: Int, publicKey: String, pairingCode: String? = null) {
throwIfDisposed()
synchronized(sendLock) {
val channelMessage = ByteArray(1024)
val channelBytesWritten = handshakeState!!.writeMessage(channelMessage, 0, null, 0, 0)
val publicKeyBytes = Base64.getDecoder().decode(publicKey)
if (publicKeyBytes.size != 32) throw IllegalArgumentException("Public key must be 32 bytes")
val (pairingMessageLength, pairingMessage) = if (pairingCode != null) {
val pairingHandshake = HandshakeState(SyncSocketSession.nProtocolName, HandshakeState.INITIATOR).apply {
remotePublicKey.setPublicKey(publicKeyBytes, 0)
start()
}
val pairingCodeBytes = pairingCode.toByteArray(Charsets.UTF_8)
if (pairingCodeBytes.size > 32) throw IllegalArgumentException("Pairing code must not exceed 32 bytes")
val pairingMessageBuffer = ByteArray(1024)
val bytesWritten = pairingHandshake.writeMessage(pairingMessageBuffer, 0, pairingCodeBytes, 0, pairingCodeBytes.size)
bytesWritten to pairingMessageBuffer.copyOf(bytesWritten)
} else {
0 to ByteArray(0)
}
val packetSize = 4 + 32 + 4 + pairingMessageLength + 4 + channelBytesWritten
val packet = ByteArray(packetSize)
ByteBuffer.wrap(packet).order(ByteOrder.LITTLE_ENDIAN).apply {
putInt(requestId)
put(publicKeyBytes)
putInt(pairingMessageLength)
if (pairingMessageLength > 0) put(pairingMessage)
putInt(channelBytesWritten)
put(channelMessage, 0, channelBytesWritten)
}
session.send(Opcode.REQUEST.value, RequestOpcode.TRANSPORT.value, ByteBuffer.wrap(packet))
}
}
fun sendResponseTransport(remoteVersion: Int, requestId: Int, handshakeMessage: ByteArray) {
throwIfDisposed()
synchronized(sendLock) {
val message = ByteArray(1024)
val plaintext = ByteArray(1024)
handshakeState!!.readMessage(handshakeMessage, 0, handshakeMessage.size, plaintext, 0)
val bytesWritten = handshakeState!!.writeMessage(message, 0, null, 0, 0)
val transport = handshakeState!!.split()
val responsePacket = ByteArray(20 + bytesWritten)
ByteBuffer.wrap(responsePacket).order(ByteOrder.LITTLE_ENDIAN).apply {
putInt(0) // Status code
putLong(connectionId)
putInt(requestId)
putInt(bytesWritten)
put(message, 0, bytesWritten)
}
completeHandshake(remoteVersion, transport)
session.send(Opcode.RESPONSE.value, ResponseOpcode.TRANSPORT.value, ByteBuffer.wrap(responsePacket))
}
}
fun decrypt(encryptedPayload: ByteBuffer): ByteBuffer {
throwIfDisposed()
synchronized(decryptLock) {
val encryptedBytes = ByteArray(encryptedPayload.remaining()).also { encryptedPayload.get(it) }
val decryptedPayload = ByteArray(encryptedBytes.size - 16)
val plen = transport!!.receiver.decryptWithAd(null, encryptedBytes, 0, decryptedPayload, 0, encryptedBytes.size)
if (plen != decryptedPayload.size) throw IllegalStateException("Expected decrypted payload length to be $plen")
return ByteBuffer.wrap(decryptedPayload).order(ByteOrder.LITTLE_ENDIAN)
}
}
fun handleTransportRelayed(remoteVersion: Int, connectionId: Long, handshakeMessage: ByteArray) {
throwIfDisposed()
synchronized(decryptLock) {
this.connectionId = connectionId
val plaintext = ByteArray(1024)
val plen = handshakeState!!.readMessage(handshakeMessage, 0, handshakeMessage.size, plaintext, 0)
val transport = handshakeState!!.split()
completeHandshake(remoteVersion, transport)
}
}
}

View File

@ -0,0 +1,60 @@
package com.futo.platformplayer.sync.internal
enum class Opcode(val value: UByte) {
PING(0u),
PONG(1u),
NOTIFY(2u),
STREAM(3u),
DATA(4u),
REQUEST(5u),
RESPONSE(6u),
RELAY(7u)
}
enum class NotifyOpcode(val value: UByte) {
AUTHORIZED(0u),
UNAUTHORIZED(1u),
CONNECTION_INFO(2u)
}
enum class StreamOpcode(val value: UByte) {
START(0u),
DATA(1u),
END(2u)
}
enum class RequestOpcode(val value: UByte) {
CONNECTION_INFO(0u),
TRANSPORT(1u),
TRANSPORT_RELAYED(2u),
PUBLISH_RECORD(3u),
DELETE_RECORD(4u),
LIST_RECORD_KEYS(5u),
GET_RECORD(6u),
BULK_PUBLISH_RECORD(7u),
BULK_GET_RECORD(8u),
BULK_CONNECTION_INFO(9u),
BULK_DELETE_RECORD(10u)
}
enum class ResponseOpcode(val value: UByte) {
CONNECTION_INFO(0u),
TRANSPORT(1u),
TRANSPORT_RELAYED(2u), //TODO: Server errors also included in this one, disentangle?
PUBLISH_RECORD(3u),
DELETE_RECORD(4u),
LIST_RECORD_KEYS(5u),
GET_RECORD(6u),
BULK_PUBLISH_RECORD(7u),
BULK_GET_RECORD(8u),
BULK_CONNECTION_INFO(9u),
BULK_DELETE_RECORD(10u)
}
enum class RelayOpcode(val value: UByte) {
DATA(0u),
RELAYED_DATA(1u),
ERROR(2u),
RELAYED_ERROR(3u),
RELAY_ERROR(4u)
}

View File

@ -5,10 +5,12 @@ class SyncDeviceInfo {
var publicKey: String var publicKey: String
var addresses: Array<String> var addresses: Array<String>
var port: Int var port: Int
var pairingCode: String?
constructor(publicKey: String, addresses: Array<String>, port: Int) { constructor(publicKey: String, addresses: Array<String>, port: Int, pairingCode: String?) {
this.publicKey = publicKey this.publicKey = publicKey
this.addresses = addresses this.addresses = addresses
this.port = port this.port = port
this.pairingCode = pairingCode
} }
} }

View File

@ -0,0 +1,6 @@
package com.futo.platformplayer.sync.internal
enum class SyncErrorCode(val value: Int) {
ConnectionClosed(1),
NotFound(2)
}

View File

@ -1,37 +1,13 @@
package com.futo.platformplayer.sync.internal package com.futo.platformplayer.sync.internal
import com.futo.platformplayer.UIDialogs import com.futo.platformplayer.UIDialogs
import com.futo.platformplayer.activities.MainActivity
import com.futo.platformplayer.api.media.Serializer
import com.futo.platformplayer.logging.Logger import com.futo.platformplayer.logging.Logger
import com.futo.platformplayer.models.HistoryVideo
import com.futo.platformplayer.models.Subscription import com.futo.platformplayer.models.Subscription
import com.futo.platformplayer.smartMerge
import com.futo.platformplayer.states.StateApp
import com.futo.platformplayer.states.StateBackup
import com.futo.platformplayer.states.StateHistory
import com.futo.platformplayer.states.StatePlaylists
import com.futo.platformplayer.states.StateSubscriptionGroups
import com.futo.platformplayer.states.StateSubscriptions import com.futo.platformplayer.states.StateSubscriptions
import com.futo.platformplayer.states.StateSync
import com.futo.platformplayer.sync.SyncSessionData
import com.futo.platformplayer.sync.internal.SyncSocketSession.Opcode
import com.futo.platformplayer.sync.models.SendToDevicePackage
import com.futo.platformplayer.sync.models.SyncPlaylistsPackage
import com.futo.platformplayer.sync.models.SyncSubscriptionGroupsPackage
import com.futo.platformplayer.sync.models.SyncSubscriptionsPackage import com.futo.platformplayer.sync.models.SyncSubscriptionsPackage
import com.futo.platformplayer.sync.models.SyncWatchLaterPackage
import com.futo.platformplayer.toUtf8String
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.serialization.encodeToString import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
import java.io.ByteArrayInputStream
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.nio.ByteOrder
import java.time.Instant
import java.time.OffsetDateTime
import java.time.ZoneOffset
import java.util.UUID import java.util.UUID
interface IAuthorizable { interface IAuthorizable {
@ -39,13 +15,14 @@ interface IAuthorizable {
} }
class SyncSession : IAuthorizable { class SyncSession : IAuthorizable {
private val _socketSessions: MutableList<SyncSocketSession> = mutableListOf() private val _channels: MutableList<IChannel> = mutableListOf()
private var _authorized: Boolean = false private var _authorized: Boolean = false
private var _remoteAuthorized: Boolean = false private var _remoteAuthorized: Boolean = false
private val _onAuthorized: (session: SyncSession, isNewlyAuthorized: Boolean, isNewSession: Boolean) -> Unit private val _onAuthorized: (session: SyncSession, isNewlyAuthorized: Boolean, isNewSession: Boolean) -> Unit
private val _onUnauthorized: (session: SyncSession) -> Unit private val _onUnauthorized: (session: SyncSession) -> Unit
private val _onClose: (session: SyncSession) -> Unit private val _onClose: (session: SyncSession) -> Unit
private val _onConnectedChanged: (session: SyncSession, connected: Boolean) -> Unit private val _onConnectedChanged: (session: SyncSession, connected: Boolean) -> Unit
private val _dataHandler: (session: SyncSession, opcode: UByte, subOpcode: UByte, data: ByteBuffer) -> Unit
val remotePublicKey: String val remotePublicKey: String
override val isAuthorized get() = _authorized && _remoteAuthorized override val isAuthorized get() = _authorized && _remoteAuthorized
private var _wasAuthorized = false private var _wasAuthorized = false
@ -56,140 +33,151 @@ class SyncSession : IAuthorizable {
private set private set
val displayName: String get() = remoteDeviceName ?: remotePublicKey val displayName: String get() = remoteDeviceName ?: remotePublicKey
var connected: Boolean = false val linkType: LinkType get()
private set(v) { {
if (field != v) { var hasProxied = false
field = v var hasDirect = false
this._onConnectedChanged(this, v) synchronized(_channels)
{
for (channel in _channels)
{
if (channel is ChannelRelayed)
hasProxied = true
if (channel is ChannelSocket)
hasDirect = true
if (hasProxied && hasDirect)
return LinkType.Local
}
} }
if (hasProxied)
return LinkType.Proxied
if (hasDirect)
return LinkType.Local
return LinkType.None
} }
constructor(remotePublicKey: String, onAuthorized: (session: SyncSession, isNewlyAuthorized: Boolean, isNewSession: Boolean) -> Unit, onUnauthorized: (session: SyncSession) -> Unit, onConnectedChanged: (session: SyncSession, connected: Boolean) -> Unit, onClose: (session: SyncSession) -> Unit, remoteDeviceName: String?) { var connected: Boolean = false
private set(v) {
if (field != v) {
field = v
this._onConnectedChanged(this, v)
}
}
constructor(
remotePublicKey: String,
onAuthorized: (session: SyncSession, isNewlyAuthorized: Boolean, isNewSession: Boolean) -> Unit,
onUnauthorized: (session: SyncSession) -> Unit,
onConnectedChanged: (session: SyncSession, connected: Boolean) -> Unit,
onClose: (session: SyncSession) -> Unit,
dataHandler: (session: SyncSession, opcode: UByte, subOpcode: UByte, data: ByteBuffer) -> Unit,
remoteDeviceName: String? = null
) {
this.remotePublicKey = remotePublicKey this.remotePublicKey = remotePublicKey
this.remoteDeviceName = remoteDeviceName
_onAuthorized = onAuthorized _onAuthorized = onAuthorized
_onUnauthorized = onUnauthorized _onUnauthorized = onUnauthorized
_onConnectedChanged = onConnectedChanged _onConnectedChanged = onConnectedChanged
_onClose = onClose _onClose = onClose
_dataHandler = dataHandler
} }
fun addSocketSession(socketSession: SyncSocketSession) { fun addChannel(channel: IChannel) {
if (socketSession.remotePublicKey != remotePublicKey) { if (channel.remotePublicKey != remotePublicKey) {
throw Exception("Public key of session must match public key of socket session") throw Exception("Public key of session must match public key of channel")
} }
synchronized(_socketSessions) { synchronized(_channels) {
_socketSessions.add(socketSession) _channels.add(channel)
connected = _socketSessions.isNotEmpty() connected = _channels.isNotEmpty()
} }
socketSession.authorizable = this channel.authorizable = this
channel.syncSession = this
} }
fun authorize(socketSession: SyncSocketSession) { fun authorize() {
Logger.i(TAG, "Sent AUTHORIZED with session id $_id") Logger.i(TAG, "Sent AUTHORIZED with session id $_id")
val idString = _id.toString()
if (socketSession.remoteVersion >= 3) { val idBytes = idString.toByteArray(Charsets.UTF_8)
val idStringBytes = _id.toString().toByteArray() val name = "${android.os.Build.MANUFACTURER}-${android.os.Build.MODEL}"
val nameBytes = "${android.os.Build.MANUFACTURER}-${android.os.Build.MODEL}".toByteArray() val nameBytes = name.toByteArray(Charsets.UTF_8)
val buffer = ByteArray(1 + idStringBytes.size + 1 + nameBytes.size) val buffer = ByteArray(1 + idBytes.size + 1 + nameBytes.size)
socketSession.send(Opcode.NOTIFY_AUTHORIZED.value, 0u, ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN).apply { buffer[0] = idBytes.size.toByte()
put(idStringBytes.size.toByte()) System.arraycopy(idBytes, 0, buffer, 1, idBytes.size)
put(idStringBytes) buffer[1 + idBytes.size] = nameBytes.size.toByte()
put(nameBytes.size.toByte()) System.arraycopy(nameBytes, 0, buffer, 2 + idBytes.size, nameBytes.size)
put(nameBytes) send(Opcode.NOTIFY.value, NotifyOpcode.AUTHORIZED.value, ByteBuffer.wrap(buffer))
}.apply { flip() })
} else {
socketSession.send(Opcode.NOTIFY_AUTHORIZED.value, 0u, ByteBuffer.wrap(_id.toString().toByteArray()))
}
_authorized = true _authorized = true
checkAuthorized() checkAuthorized()
} }
fun unauthorize(socketSession: SyncSocketSession? = null) { fun unauthorize() {
if (socketSession != null) send(Opcode.NOTIFY.value, NotifyOpcode.UNAUTHORIZED.value)
socketSession.send(Opcode.NOTIFY_UNAUTHORIZED.value)
else {
val ss = synchronized(_socketSessions) {
_socketSessions.first()
}
ss.send(Opcode.NOTIFY_UNAUTHORIZED.value)
}
} }
private fun checkAuthorized() { private fun checkAuthorized() {
if (isAuthorized) { if (isAuthorized) {
val isNewlyAuthorized = !_wasAuthorized; val isNewlyAuthorized = !_wasAuthorized
val isNewSession = _lastAuthorizedRemoteId != _remoteId; val isNewSession = _lastAuthorizedRemoteId != _remoteId
Logger.i(TAG, "onAuthorized (isNewlyAuthorized = $isNewlyAuthorized, isNewSession = $isNewSession)"); Logger.i(TAG, "onAuthorized (isNewlyAuthorized = $isNewlyAuthorized, isNewSession = $isNewSession)")
_onAuthorized.invoke(this, !_wasAuthorized, _lastAuthorizedRemoteId != _remoteId) _onAuthorized(this, isNewlyAuthorized, isNewSession)
_wasAuthorized = true _wasAuthorized = true
_lastAuthorizedRemoteId = _remoteId _lastAuthorizedRemoteId = _remoteId
} }
} }
fun removeSocketSession(socketSession: SyncSocketSession) { fun removeChannel(channel: IChannel) {
synchronized(_socketSessions) { synchronized(_channels) {
_socketSessions.remove(socketSession) _channels.remove(channel)
connected = _socketSessions.isNotEmpty() connected = _channels.isNotEmpty()
} }
} }
fun close() { fun close() {
synchronized(_socketSessions) { synchronized(_channels) {
for (socketSession in _socketSessions) { _channels.forEach { it.close() }
socketSession.stop() _channels.clear()
}
_socketSessions.clear()
} }
_onClose(this)
_onClose.invoke(this)
} }
fun handlePacket(socketSession: SyncSocketSession, opcode: UByte, subOpcode: UByte, data: ByteBuffer) { fun handlePacket(opcode: UByte, subOpcode: UByte, data: ByteBuffer) {
try { try {
Logger.i(TAG, "Handle packet (opcode: ${opcode}, subOpcode: ${subOpcode}, data.length: ${data.remaining()})") Logger.i(TAG, "Handle packet (opcode: $opcode, subOpcode: $subOpcode, data.length: ${data.remaining()})")
when (opcode) { when (opcode) {
Opcode.NOTIFY_AUTHORIZED.value -> { Opcode.NOTIFY.value -> when (subOpcode) {
if (socketSession.remoteVersion >= 3) { NotifyOpcode.AUTHORIZED.value -> {
val idByteCount = data.get().toInt() val idByteCount = data.get().toInt()
if (idByteCount > 64) if (idByteCount > 64)
throw Exception("Id should always be smaller than 64 bytes") throw Exception("Id should always be smaller than 64 bytes")
val idBytes = ByteArray(idByteCount) val idBytes = ByteArray(idByteCount)
data.get(idBytes) data.get(idBytes)
val nameByteCount = data.get().toInt() val nameByteCount = data.get().toInt()
if (nameByteCount > 64) if (nameByteCount > 64)
throw Exception("Name should always be smaller than 64 bytes") throw Exception("Name should always be smaller than 64 bytes")
val nameBytes = ByteArray(nameByteCount) val nameBytes = ByteArray(nameByteCount)
data.get(nameBytes) data.get(nameBytes)
_remoteId = UUID.fromString(idBytes.toString(Charsets.UTF_8)) _remoteId = UUID.fromString(idBytes.toString(Charsets.UTF_8))
remoteDeviceName = nameBytes.toString(Charsets.UTF_8) remoteDeviceName = nameBytes.toString(Charsets.UTF_8)
} else { _remoteAuthorized = true
val str = data.toUtf8String() Logger.i(TAG, "Received AUTHORIZED with session id $_remoteId (device name: '${remoteDeviceName ?: "not set"}')")
_remoteId = if (data.remaining() >= 0) UUID.fromString(str) else UUID.fromString("00000000-0000-0000-0000-000000000000") checkAuthorized()
remoteDeviceName = null return
}
NotifyOpcode.UNAUTHORIZED.value -> {
_remoteAuthorized = false
_remoteId = null
remoteDeviceName = null
_lastAuthorizedRemoteId = null
_onUnauthorized(this)
return
} }
_remoteAuthorized = true
Logger.i(TAG, "Received AUTHORIZED with session id $_remoteId (device name: '${remoteDeviceName ?: "not set"}')")
checkAuthorized()
return
} }
Opcode.NOTIFY_UNAUTHORIZED.value -> {
_remoteId = null
remoteDeviceName = null
_lastAuthorizedRemoteId = null
_remoteAuthorized = false
_onUnauthorized(this)
return
}
//TODO: Handle any kind of packet (that is not necessarily authorized)
} }
if (!isAuthorized) { if (!isAuthorized) {
@ -197,282 +185,58 @@ class SyncSession : IAuthorizable {
} }
if (opcode != Opcode.DATA.value) { if (opcode != Opcode.DATA.value) {
Logger.w(TAG, "Unknown opcode received: (opcode = ${opcode}, subOpcode = ${subOpcode})}") Logger.w(TAG, "Unknown opcode received: (opcode = $opcode, subOpcode = $subOpcode)")
return return
} }
Logger.i(TAG, "Received (opcode = ${opcode}, subOpcode = ${subOpcode}) (${data.remaining()} bytes)") Logger.i(TAG, "Received (opcode = $opcode, subOpcode = $subOpcode) (${data.remaining()} bytes)")
//TODO: Abstract this out _dataHandler.invoke(this, opcode, subOpcode, data)
when (subOpcode) { } catch (ex: Exception) {
GJSyncOpcodes.sendToDevices -> { Logger.w(TAG, "Failed to handle sync package $opcode: ${ex.message}", ex)
StateApp.instance.scopeOrNull?.launch(Dispatchers.Main) {
val context = StateApp.instance.contextOrNull;
if (context != null && context is MainActivity) {
val dataBody = ByteArray(data.remaining());
val remainder = data.remaining();
data.get(dataBody, 0, remainder);
val json = String(dataBody, Charsets.UTF_8);
val obj = Json.decodeFromString<SendToDevicePackage>(json);
UIDialogs.appToast("Received url from device [${socketSession.remotePublicKey}]:\n{${obj.url}");
context.handleUrl(obj.url, obj.position);
}
};
}
GJSyncOpcodes.syncStateExchange -> {
val dataBody = ByteArray(data.remaining());
data.get(dataBody);
val json = String(dataBody, Charsets.UTF_8);
val syncSessionData = Serializer.json.decodeFromString<SyncSessionData>(json);
Logger.i(TAG, "Received SyncSessionData from " + remotePublicKey);
sendData(GJSyncOpcodes.syncSubscriptions, StateSubscriptions.instance.getSyncSubscriptionsPackageString());
sendData(GJSyncOpcodes.syncSubscriptionGroups, StateSubscriptionGroups.instance.getSyncSubscriptionGroupsPackageString());
sendData(GJSyncOpcodes.syncPlaylists, StatePlaylists.instance.getSyncPlaylistsPackageString())
sendData(GJSyncOpcodes.syncWatchLater, Json.encodeToString(StatePlaylists.instance.getWatchLaterSyncPacket(false)));
val recentHistory = StateHistory.instance.getRecentHistory(syncSessionData.lastHistory);
if(recentHistory.size > 0)
sendJsonData(GJSyncOpcodes.syncHistory, recentHistory);
}
GJSyncOpcodes.syncExport -> {
val dataBody = ByteArray(data.remaining());
val bytesStr = ByteArrayInputStream(data.array(), data.position(), data.remaining());
try {
val exportStruct = StateBackup.ExportStructure.fromZipBytes(bytesStr);
for (store in exportStruct.stores) {
if (store.key.equals("subscriptions", true)) {
val subStore =
StateSubscriptions.instance.getUnderlyingSubscriptionsStore();
StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) {
val pack = SyncSubscriptionsPackage(
store.value.map {
subStore.fromReconstruction(it, exportStruct.cache)
},
StateSubscriptions.instance.getSubscriptionRemovals()
);
handleSyncSubscriptionPackage(this@SyncSession, pack);
}
}
}
} finally {
bytesStr.close();
}
}
GJSyncOpcodes.syncSubscriptions -> {
val dataBody = ByteArray(data.remaining());
data.get(dataBody);
val json = String(dataBody, Charsets.UTF_8);
val subPackage = Serializer.json.decodeFromString<SyncSubscriptionsPackage>(json);
handleSyncSubscriptionPackage(this, subPackage);
val newestSub = subPackage.subscriptions.maxOf { it.creationTime };
val sesData = StateSync.instance.getSyncSessionData(remotePublicKey);
if(newestSub > sesData.lastSubscription) {
sesData.lastSubscription = newestSub;
StateSync.instance.saveSyncSessionData(sesData);
}
}
GJSyncOpcodes.syncSubscriptionGroups -> {
val dataBody = ByteArray(data.remaining());
data.get(dataBody);
val json = String(dataBody, Charsets.UTF_8);
val pack = Serializer.json.decodeFromString<SyncSubscriptionGroupsPackage>(json);
var lastSubgroupChange = OffsetDateTime.MIN;
for(group in pack.groups){
if(group.lastChange > lastSubgroupChange)
lastSubgroupChange = group.lastChange;
val existing = StateSubscriptionGroups.instance.getSubscriptionGroup(group.id);
if(existing == null)
StateSubscriptionGroups.instance.updateSubscriptionGroup(group, false, true);
else if(existing.lastChange < group.lastChange) {
existing.name = group.name;
existing.urls = group.urls;
existing.image = group.image;
existing.priority = group.priority;
existing.lastChange = group.lastChange;
StateSubscriptionGroups.instance.updateSubscriptionGroup(existing, false, true);
}
}
for(removal in pack.groupRemovals) {
val creation = StateSubscriptionGroups.instance.getSubscriptionGroup(removal.key);
val removalTime = OffsetDateTime.ofInstant(Instant.ofEpochSecond(removal.value, 0), ZoneOffset.UTC);
if(creation != null && creation.creationTime < removalTime)
StateSubscriptionGroups.instance.deleteSubscriptionGroup(removal.key, false);
}
}
GJSyncOpcodes.syncPlaylists -> {
val dataBody = ByteArray(data.remaining());
data.get(dataBody);
val json = String(dataBody, Charsets.UTF_8);
val pack = Serializer.json.decodeFromString<SyncPlaylistsPackage>(json);
for(playlist in pack.playlists) {
val existing = StatePlaylists.instance.getPlaylist(playlist.id);
if(existing == null)
StatePlaylists.instance.createOrUpdatePlaylist(playlist, false);
else if(existing.dateUpdate.toLocalDateTime() < playlist.dateUpdate.toLocalDateTime()) {
existing.dateUpdate = playlist.dateUpdate;
existing.name = playlist.name;
existing.videos = playlist.videos;
existing.dateCreation = playlist.dateCreation;
existing.datePlayed = playlist.datePlayed;
StatePlaylists.instance.createOrUpdatePlaylist(existing, false);
}
}
for(removal in pack.playlistRemovals) {
val creation = StatePlaylists.instance.getPlaylist(removal.key);
val removalTime = OffsetDateTime.ofInstant(Instant.ofEpochSecond(removal.value, 0), ZoneOffset.UTC);
if(creation != null && creation.dateCreation < removalTime)
StatePlaylists.instance.removePlaylist(creation, false);
}
}
GJSyncOpcodes.syncWatchLater -> {
val dataBody = ByteArray(data.remaining());
data.get(dataBody);
val json = String(dataBody, Charsets.UTF_8);
val pack = Serializer.json.decodeFromString<SyncWatchLaterPackage>(json);
Logger.i(TAG, "SyncWatchLater received ${pack.videos.size} (${pack.videoAdds?.size}, ${pack.videoRemovals?.size})");
val allExisting = StatePlaylists.instance.getWatchLater();
for(video in pack.videos) {
val existing = allExisting.firstOrNull { it.url == video.url };
val time = if(pack.videoAdds != null && pack.videoAdds.containsKey(video.url)) OffsetDateTime.ofInstant(Instant.ofEpochSecond(pack.videoAdds[video.url] ?: 0), ZoneOffset.UTC) else OffsetDateTime.MIN;
if(existing == null) {
StatePlaylists.instance.addToWatchLater(video, false);
if(time > OffsetDateTime.MIN)
StatePlaylists.instance.setWatchLaterAddTime(video.url, time);
}
}
for(removal in pack.videoRemovals) {
val watchLater = allExisting.firstOrNull { it.url == removal.key } ?: continue;
val creation = StatePlaylists.instance.getWatchLaterRemovalTime(watchLater.url) ?: OffsetDateTime.MIN;
val removalTime = OffsetDateTime.ofInstant(Instant.ofEpochSecond(removal.value), ZoneOffset.UTC);
if(creation < removalTime)
StatePlaylists.instance.removeFromWatchLater(watchLater, false, removalTime);
}
val packReorderTime = OffsetDateTime.ofInstant(Instant.ofEpochSecond(pack.reorderTime), ZoneOffset.UTC);
val localReorderTime = StatePlaylists.instance.getWatchLaterLastReorderTime();
if(localReorderTime < packReorderTime && pack.ordering != null) {
StatePlaylists.instance.updateWatchLaterOrdering(smartMerge(pack.ordering!!, StatePlaylists.instance.getWatchLaterOrdering()), true);
}
}
GJSyncOpcodes.syncHistory -> {
val dataBody = ByteArray(data.remaining());
data.get(dataBody);
val json = String(dataBody, Charsets.UTF_8);
val history = Serializer.json.decodeFromString<List<HistoryVideo>>(json);
Logger.i(TAG, "SyncHistory received ${history.size} videos from ${remotePublicKey}");
var lastHistory = OffsetDateTime.MIN;
for(video in history){
val hist = StateHistory.instance.getHistoryByVideo(video.video, true, video.date);
if(hist != null)
StateHistory.instance.updateHistoryPosition(video.video, hist, true, video.position, video.date)
if(lastHistory < video.date)
lastHistory = video.date;
}
if(lastHistory != OffsetDateTime.MIN && history.size > 1) {
val sesData = StateSync.instance.getSyncSessionData(remotePublicKey);
if (lastHistory > sesData.lastHistory) {
sesData.lastHistory = lastHistory;
StateSync.instance.saveSyncSessionData(sesData);
}
}
}
}
} }
catch(ex: Exception) { catch(ex: Exception) {
Logger.w(TAG, "Failed to handle sync package ${opcode}: ${ex.message}", ex); Logger.w(TAG, "Failed to handle sync package ${opcode}: ${ex.message}", ex);
} }
} }
private fun handleSyncSubscriptionPackage(origin: SyncSession, pack: SyncSubscriptionsPackage) {
val added = mutableListOf<Subscription>()
for(sub in pack.subscriptions) {
if(!StateSubscriptions.instance.isSubscribed(sub.channel)) {
val removalTime = StateSubscriptions.instance.getSubscriptionRemovalTime(sub.channel.url);
if(sub.creationTime > removalTime) {
val newSub = StateSubscriptions.instance.addSubscription(sub.channel, sub.creationTime);
added.add(newSub);
}
}
}
if(added.size > 3)
UIDialogs.appToast("${added.size} Subscriptions from ${origin.remotePublicKey.substring(0, Math.min(8, origin.remotePublicKey.length))}");
else if(added.size > 0)
UIDialogs.appToast("Subscriptions from ${origin.remotePublicKey.substring(0, Math.min(8, origin.remotePublicKey.length))}:\n" +
added.map { it.channel.name }.joinToString("\n"));
if(pack.subscriptions != null && pack.subscriptions.size > 0) {
for (subRemoved in pack.subscriptionRemovals) {
val removed = StateSubscriptions.instance.applySubscriptionRemovals(pack.subscriptionRemovals);
if(removed.size > 3)
UIDialogs.appToast("Removed ${removed.size} Subscriptions from ${origin.remotePublicKey.substring(0, Math.min(8, origin.remotePublicKey.length))}");
else if(removed.size > 0)
UIDialogs.appToast("Subscriptions removed from ${origin.remotePublicKey.substring(0, Math.min(8, origin.remotePublicKey.length))}:\n" +
removed.map { it.channel.name }.joinToString("\n"));
}
}
}
inline fun <reified T> sendJsonData(subOpcode: UByte, data: T) { inline fun <reified T> sendJsonData(subOpcode: UByte, data: T) {
send(Opcode.DATA.value, subOpcode, Json.encodeToString<T>(data)); send(Opcode.DATA.value, subOpcode, Json.encodeToString(data))
} }
fun sendData(subOpcode: UByte, data: String) {
send(Opcode.DATA.value, subOpcode, data.toByteArray(Charsets.UTF_8));
}
fun send(opcode: UByte, subOpcode: UByte, data: String) {
send(opcode, subOpcode, data.toByteArray(Charsets.UTF_8));
}
fun send(opcode: UByte, subOpcode: UByte, data: ByteArray) {
val socketSessions = synchronized(_socketSessions) {
_socketSessions.toList()
}
if (socketSessions.isEmpty()) { fun sendData(subOpcode: UByte, data: String) {
Logger.v(TAG, "Packet was not sent (opcode = ${opcode}, subOpcode = ${subOpcode}) due to no connected sockets") send(Opcode.DATA.value, subOpcode, ByteBuffer.wrap(data.toByteArray(Charsets.UTF_8)))
}
fun send(opcode: UByte, subOpcode: UByte, data: String) {
send(opcode, subOpcode, ByteBuffer.wrap(data.toByteArray(Charsets.UTF_8)))
}
fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer? = null) {
//TODO: Prioritize local connections
val channels = synchronized(_channels) { _channels.toList() }
if (channels.isEmpty()) {
//TODO: Should this throw?
Logger.v(TAG, "Packet was not sent (opcode = $opcode, subOpcode = $subOpcode) due to no connected sockets")
return return
} }
var sent = false var sent = false
for (socketSession in socketSessions) { for (channel in channels) {
try { try {
socketSession.send(opcode, subOpcode, ByteBuffer.wrap(data)) channel.send(opcode, subOpcode, data)
sent = true sent = true
break break
} catch (e: Throwable) { } catch (e: Throwable) {
Logger.w(TAG, "Packet failed to send (opcode = ${opcode}, subOpcode = ${subOpcode})", e) Logger.w(TAG, "Packet failed to send (opcode = $opcode, subOpcode = $subOpcode)", e)
} }
} }
if (!sent) { if (!sent) {
throw Exception("Packet was not sent (opcode = ${opcode}, subOpcode = ${subOpcode}) due to send errors and no remaining candidates") throw Exception("Packet was not sent (opcode = $opcode, subOpcode = $subOpcode) due to send errors and no remaining candidates")
} }
} }
private companion object { companion object {
const val TAG = "SyncSession" private const val TAG = "SyncSession"
} }
} }