Minor fixes to sync protocol

pull/699/head
Alinson S. Xavier 5 years ago
parent 1787c0e74e
commit 9c0951ae58

@ -19,6 +19,7 @@
package org.isoron.uhabits.sync package org.isoron.uhabits.sync
import androidx.test.filters.*
import com.fasterxml.jackson.databind.* import com.fasterxml.jackson.databind.*
import io.ktor.client.* import io.ktor.client.*
import io.ktor.client.engine.mock.* import io.ktor.client.engine.mock.*
@ -29,6 +30,7 @@ import junit.framework.Assert.*
import kotlinx.coroutines.* import kotlinx.coroutines.*
import org.junit.* import org.junit.*
@MediumTest
class RemoteSyncServerTest { class RemoteSyncServerTest {
private val mapper = ObjectMapper() private val mapper = ObjectMapper()
@ -53,7 +55,7 @@ class RemoteSyncServerTest {
@Test @Test
fun when_get_data_version_succeeds_should_return_version() = runBlocking { fun when_get_data_version_succeeds_should_return_version() = runBlocking {
server("/ABC/version") { server("/db/ABC/version") {
respondWithJson(GetDataVersionResponse(5)) respondWithJson(GetDataVersionResponse(5))
}.apply { }.apply {
assertEquals(5, getDataVersion("ABC")) assertEquals(5, getDataVersion("ABC"))
@ -63,7 +65,7 @@ class RemoteSyncServerTest {
@Test(expected = ServiceUnavailable::class) @Test(expected = ServiceUnavailable::class)
fun when_get_data_version_with_server_error_should_raise_exception() = runBlocking { fun when_get_data_version_with_server_error_should_raise_exception() = runBlocking {
server("/ABC/version") { server("/db/ABC/version") {
respondError(HttpStatusCode.InternalServerError) respondError(HttpStatusCode.InternalServerError)
}.apply { }.apply {
getDataVersion("ABC") getDataVersion("ABC")
@ -73,7 +75,7 @@ class RemoteSyncServerTest {
@Test(expected = KeyNotFoundException::class) @Test(expected = KeyNotFoundException::class)
fun when_get_data_version_with_invalid_key_should_raise_exception() = runBlocking { fun when_get_data_version_with_invalid_key_should_raise_exception() = runBlocking {
server("/ABC/version") { server("/db/ABC/version") {
respondError(HttpStatusCode.NotFound) respondError(HttpStatusCode.NotFound)
}.apply { }.apply {
getDataVersion("ABC") getDataVersion("ABC")
@ -83,7 +85,7 @@ class RemoteSyncServerTest {
@Test @Test
fun when_get_data_succeeds_should_return_data() = runBlocking { fun when_get_data_succeeds_should_return_data() = runBlocking {
server("/ABC") { server("/db/ABC") {
respondWithJson(data) respondWithJson(data)
}.apply { }.apply {
assertEquals(data, getData("ABC")) assertEquals(data, getData("ABC"))
@ -93,7 +95,7 @@ class RemoteSyncServerTest {
@Test(expected = KeyNotFoundException::class) @Test(expected = KeyNotFoundException::class)
fun when_get_data_with_invalid_key_should_raise_exception() = runBlocking { fun when_get_data_with_invalid_key_should_raise_exception() = runBlocking {
server("/ABC") { server("/db/ABC") {
respondError(HttpStatusCode.NotFound) respondError(HttpStatusCode.NotFound)
}.apply { }.apply {
getData("ABC") getData("ABC")
@ -103,7 +105,7 @@ class RemoteSyncServerTest {
@Test @Test
fun when_put_succeeds_should_not_raise_exceptions() = runBlocking { fun when_put_succeeds_should_not_raise_exceptions() = runBlocking {
server("/ABC") { server("/db/ABC") {
respondOk() respondOk()
}.apply { }.apply {
put("ABC", data) put("ABC", data)

@ -19,6 +19,7 @@
package org.isoron.uhabits.sync package org.isoron.uhabits.sync
import android.util.*
import io.ktor.client.* import io.ktor.client.*
import io.ktor.client.engine.android.* import io.ktor.client.engine.android.*
import io.ktor.client.features.* import io.ktor.client.features.*
@ -26,9 +27,6 @@ import io.ktor.client.features.json.*
import io.ktor.client.request.* import io.ktor.client.request.*
import kotlinx.coroutines.* import kotlinx.coroutines.*
data class RegisterReponse(val key: String)
data class GetDataVersionResponse(val version: Long)
class RemoteSyncServer( class RemoteSyncServer(
private val baseURL: String = "https://sync.loophabits.org", private val baseURL: String = "https://sync.loophabits.org",
private val httpClient: HttpClient = HttpClient(Android) { private val httpClient: HttpClient = HttpClient(Android) {
@ -54,7 +52,10 @@ class RemoteSyncServer(
} catch (e: ServerResponseException) { } catch (e: ServerResponseException) {
throw ServiceUnavailable() throw ServiceUnavailable()
} catch (e: ClientRequestException) { } catch (e: ClientRequestException) {
throw KeyNotFoundException() Log.w("RemoteSyncServer", "ClientRequestException", e)
if(e.message!!.contains("409")) throw EditConflictException()
if(e.message!!.contains("404")) throw KeyNotFoundException()
throw e
} }
} }
@ -65,6 +66,7 @@ class RemoteSyncServer(
} catch (e: ServerResponseException) { } catch (e: ServerResponseException) {
throw ServiceUnavailable() throw ServiceUnavailable()
} catch (e: ClientRequestException) { } catch (e: ClientRequestException) {
Log.w("RemoteSyncServer", "ClientRequestException", e)
throw KeyNotFoundException() throw KeyNotFoundException()
} }
} }
@ -76,6 +78,7 @@ class RemoteSyncServer(
} catch(e: ServerResponseException) { } catch(e: ServerResponseException) {
throw ServiceUnavailable() throw ServiceUnavailable()
} catch (e: ClientRequestException) { } catch (e: ClientRequestException) {
Log.w("RemoteSyncServer", "ClientRequestException", e)
throw KeyNotFoundException() throw KeyNotFoundException()
} }
} }

@ -23,3 +23,7 @@ data class SyncData(
val version: Long, val version: Long,
val content: String val content: String
) )
data class RegisterReponse(val key: String)
data class GetDataVersionResponse(val version: Long)

@ -30,6 +30,7 @@ import org.isoron.uhabits.core.tasks.*
import org.isoron.uhabits.tasks.* import org.isoron.uhabits.tasks.*
import org.isoron.uhabits.utils.* import org.isoron.uhabits.utils.*
import java.io.* import java.io.*
import java.lang.RuntimeException
import javax.inject.* import javax.inject.*
@AppScope @AppScope
@ -43,7 +44,7 @@ class SyncManager @Inject constructor(
private val server = RemoteSyncServer() private val server = RemoteSyncServer()
private val tmpFile = File.createTempFile("import", "", context.externalCacheDir) private val tmpFile = File.createTempFile("import", "", context.externalCacheDir)
private var currVersion = 0L private var currVersion = 1L
private var dirty = true private var dirty = true
private lateinit var encryptionKey: EncryptionKey private lateinit var encryptionKey: EncryptionKey
@ -72,35 +73,53 @@ class SyncManager @Inject constructor(
} }
} }
private suspend fun push() { private suspend fun push(depth: Int = 0) {
if (!dirty) { if(depth >= 5) {
Log.i("SyncManager", "Database not dirty. Skipping upload.") throw RuntimeException()
return }
if (dirty) {
Log.i("SyncManager", "Encrypting local database...")
val db = DatabaseUtils.getDatabaseFile(context)
val encryptedDB = db.encryptToString(encryptionKey)
val size = encryptedDB.length / 1024
Log.i("SyncManager", "Pushing local database (version $currVersion, $size KB)")
try {
server.put(preferences.syncKey, SyncData(currVersion, encryptedDB))
dirty = false
} catch (e: EditConflictException) {
Log.i("SyncManager", "Sync conflict detected while pushing.")
setCurrentVersion(0)
pull()
push(depth = depth + 1)
}
} else {
Log.i("SyncManager", "Local database not modified. Skipping push.")
} }
Log.i("SyncManager", "Encrypting database...")
val db = DatabaseUtils.getDatabaseFile(context)
val encryptedDB = db.encryptToString(encryptionKey)
Log.i("SyncManager", "Uploading database (version ${currVersion}, ${encryptedDB.length / 1024} KB)")
server.put(preferences.syncKey, SyncData(currVersion, encryptedDB))
dirty = false
} }
private suspend fun pull() { private suspend fun pull() {
Log.i("SyncManager", "Fetching database from server...") Log.i("SyncManager", "Querying remote database version...")
val data = server.getData(preferences.syncKey) val remoteVersion = server.getDataVersion(syncKey)
Log.i("SyncManager", "Fetched database (version ${data.version}, ${data.content.length / 1024} KB)") Log.i("SyncManager", "Remote database has version $remoteVersion")
if (data.version == 0L) {
Log.i("SyncManager", "Initial upload detected. Marking db as dirty.") if (remoteVersion <= currVersion) {
dirty = true Log.i("SyncManager", "Local database is up-to-date. Skipping merge.")
}
if (data.version <= currVersion) {
Log.i("SyncManager", "Local version is up-to-date. Skipping merge.")
} else { } else {
Log.i("SyncManager", "Decrypting and merging with local changes...") Log.i("SyncManager", "Pulling remote database...")
val data = server.getData(syncKey)
val size = data.content.length / 1024
Log.i("SyncManager", "Pulled remote database (version ${data.version}, $size KB)")
Log.i("SyncManager", "Decrypting remote database and merging with local changes...")
data.content.decryptToFile(encryptionKey, tmpFile) data.content.decryptToFile(encryptionKey, tmpFile)
taskRunner.execute(importDataTaskFactory.create(tmpFile) { tmpFile.delete() }) taskRunner.execute(importDataTaskFactory.create(tmpFile) { tmpFile.delete() })
dirty = true
setCurrentVersion(data.version + 1)
} }
currVersion = data.version + 1 }
private fun setCurrentVersion(v: Long) {
currVersion = v
Log.i("SyncManager", "Setting local database version to $currVersion")
} }
suspend fun onResume() { suspend fun onResume() {
@ -118,6 +137,9 @@ class SyncManager @Inject constructor(
} }
override fun onCommandExecuted(command: Command?, refreshKey: Long?) { override fun onCommandExecuted(command: Command?, refreshKey: Long?) {
if (!dirty) {
setCurrentVersion(currVersion + 1)
}
dirty = true dirty = true
} }
} }

@ -24,11 +24,10 @@ interface AbstractSyncServer {
* Generates and returns a new sync key, which can be used to store and retrive * Generates and returns a new sync key, which can be used to store and retrive
* data. * data.
* *
* @throws RegistrationUnavailableException If key cannot be generated at this * @throws ServiceUnavailable If key cannot be generated at this time, for example,
* time, for example, due to insufficient server resources or temporary * due to insufficient server resources, temporary server maintenance or network problems.
* maintenance.
*/ */
fun register(): String suspend fun register(): String
/** /**
* Replaces data for a given sync key. * Replaces data for a given sync key.
@ -36,13 +35,26 @@ interface AbstractSyncServer {
* @throws KeyNotFoundException If key is not found * @throws KeyNotFoundException If key is not found
* @throws EditConflictException If the version of the data provided is not * @throws EditConflictException If the version of the data provided is not
* exactly the current data version plus one. * exactly the current data version plus one.
* @throws ServiceUnavailable If data cannot be put at this time, for example, due
* to insufficient server resources or network problems.
*/ */
fun put(key: String, newData: SyncData) suspend fun put(key: String, newData: SyncData)
/** /**
* Returns data for a given sync key. * Returns data for a given sync key.
* *
* @throws KeyNotFoundException If key is not found * @throws KeyNotFoundException If key is not found
* @throws ServiceUnavailable If data cannot be retrieved at this time, for example, due
* to insufficient server resources or network problems.
*/ */
fun get(key: String): SyncData suspend fun getData(key: String): SyncData
/**
* Returns the current data version for the given key
*
* @throws KeyNotFoundException If key is not found
* @throws ServiceUnavailable If data cannot be retrieved at this time, for example, due
* to insufficient server resources or network problems.
*/
suspend fun getDataVersion(key: String): Long
} }

@ -28,7 +28,7 @@ import kotlin.streams.*
class MemorySyncServer : AbstractSyncServer { class MemorySyncServer : AbstractSyncServer {
private val db = mutableMapOf<String, SyncData>() private val db = mutableMapOf<String, SyncData>()
override fun register(): String { override suspend fun register(): String {
synchronized(db) { synchronized(db) {
val key = generateKey() val key = generateKey()
db[key] = SyncData(0, "") db[key] = SyncData(0, "")
@ -36,7 +36,7 @@ class MemorySyncServer : AbstractSyncServer {
} }
} }
override fun put(key: String, newData: SyncData) { override suspend fun put(key: String, newData: SyncData) {
synchronized(db) { synchronized(db) {
if (!db.containsKey(key)) { if (!db.containsKey(key)) {
throw KeyNotFoundException() throw KeyNotFoundException()
@ -49,7 +49,7 @@ class MemorySyncServer : AbstractSyncServer {
} }
} }
override fun get(key: String): SyncData { override suspend fun getData(key: String): SyncData {
synchronized(db) { synchronized(db) {
if (!db.containsKey(key)) { if (!db.containsKey(key)) {
throw KeyNotFoundException() throw KeyNotFoundException()
@ -58,6 +58,15 @@ class MemorySyncServer : AbstractSyncServer {
} }
} }
override suspend fun getDataVersion(key: String): Long {
synchronized(db) {
if (!db.containsKey(key)) {
throw KeyNotFoundException()
}
return db.getValue(key).version
}
}
private fun generateKey(): String { private fun generateKey(): String {
val chars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" val chars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
while (true) { while (true) {

@ -22,9 +22,14 @@ package org.isoron.uhabits.sync
import com.fasterxml.jackson.databind.* import com.fasterxml.jackson.databind.*
data class SyncData( data class SyncData(
val version: Int, val version: Long,
val content: String, val content: String
) )
data class RegisterReponse(val key: String)
data class GetDataVersionResponse(val version: Long)
val defaultMapper = ObjectMapper() val defaultMapper = ObjectMapper()
fun SyncData.toJson(): String = defaultMapper.writeValueAsString(this) fun SyncData.toJson(): String = defaultMapper.writeValueAsString(this)
fun GetDataVersionResponse.toJson(): String = defaultMapper.writeValueAsString(this)

@ -19,13 +19,10 @@
package org.isoron.uhabits.sync package org.isoron.uhabits.sync
/**
* Generic class for all exceptions thrown by SyncServer.
*/
open class SyncException: RuntimeException() open class SyncException: RuntimeException()
class KeyNotFoundException: SyncException() class KeyNotFoundException: SyncException()
class RegistrationUnavailableException: SyncException() class ServiceUnavailable: SyncException()
class EditConflictException: SyncException() class EditConflictException: SyncException()

@ -29,8 +29,8 @@ fun Routing.registration(app: SyncApplication) {
post("/register") { post("/register") {
try { try {
val key = app.server.register() val key = app.server.register()
call.respond(HttpStatusCode.OK, mapOf("key" to key)) call.respond(HttpStatusCode.OK, RegisterReponse(key))
} catch (e: RegistrationUnavailableException) { } catch (e: ServiceUnavailable) {
call.respond(HttpStatusCode.ServiceUnavailable) call.respond(HttpStatusCode.ServiceUnavailable)
} }
} }

@ -31,7 +31,7 @@ fun Routing.storage(app: SyncApplication) {
get { get {
val key = call.parameters["key"]!! val key = call.parameters["key"]!!
try { try {
val data = app.server.get(key) val data = app.server.getData(key)
call.respond(HttpStatusCode.OK, data) call.respond(HttpStatusCode.OK, data)
} catch(e: KeyNotFoundException) { } catch(e: KeyNotFoundException) {
call.respond(HttpStatusCode.NotFound) call.respond(HttpStatusCode.NotFound)
@ -46,15 +46,14 @@ fun Routing.storage(app: SyncApplication) {
} catch (e: KeyNotFoundException) { } catch (e: KeyNotFoundException) {
call.respond(HttpStatusCode.NotFound) call.respond(HttpStatusCode.NotFound)
} catch (e: EditConflictException) { } catch (e: EditConflictException) {
val currData = app.server.get(key) call.respond(HttpStatusCode.Conflict)
call.respond(HttpStatusCode.Conflict, currData)
} }
} }
get("version") { get("version") {
val key = call.parameters["key"]!! val key = call.parameters["key"]!!
try { try {
val data = app.server.get(key) val version = app.server.getDataVersion(key)
call.respond(HttpStatusCode.OK, data.version) call.respond(HttpStatusCode.OK, GetDataVersionResponse(version))
} catch(e: KeyNotFoundException) { } catch(e: KeyNotFoundException) {
call.respond(HttpStatusCode.NotFound) call.respond(HttpStatusCode.NotFound)
} }

@ -19,33 +19,34 @@
package org.isoron.uhabits.sync package org.isoron.uhabits.sync
import kotlinx.coroutines.*
import org.junit.Test import org.junit.Test
import kotlin.test.* import kotlin.test.*
class MemorySyncServerTest { class MemorySyncServerTest {
private val server = MemorySyncServer() private val server = MemorySyncServer()
private val key = server.register() private val key = runBlocking { server.register() }
@Test @Test
fun testUsage() { fun testUsage(): Unit = runBlocking {
val data0 = SyncData(0, "") val data0 = SyncData(0, "")
assertEquals(server.get(key), data0) assertEquals(server.getData(key), data0)
val data1 = SyncData(1, "Hello world") val data1 = SyncData(1, "Hello world")
server.put(key, data1) server.put(key, data1)
assertEquals(server.get(key), data1) assertEquals(server.getData(key), data1)
val data2 = SyncData(2, "Hello new world") val data2 = SyncData(2, "Hello new world")
server.put(key, data2) server.put(key, data2)
assertEquals(server.get(key), data2) assertEquals(server.getData(key), data2)
assertFailsWith<EditConflictException> { assertFailsWith<EditConflictException> {
server.put(key, data2) server.put(key, data2)
} }
assertFailsWith<KeyNotFoundException> { assertFailsWith<KeyNotFoundException> {
server.get("INVALID") server.getData("INVALID")
} }
assertFailsWith<KeyNotFoundException> { assertFailsWith<KeyNotFoundException> {

@ -21,6 +21,7 @@ package org.isoron.uhabits.sync.app
import io.ktor.http.* import io.ktor.http.*
import io.ktor.server.testing.* import io.ktor.server.testing.*
import kotlinx.coroutines.*
import org.isoron.uhabits.sync.* import org.isoron.uhabits.sync.*
import org.junit.Test import org.junit.Test
import org.mockito.* import org.mockito.*
@ -29,7 +30,7 @@ import kotlin.test.*
class RegistrationModuleTest : BaseApplicationTest() { class RegistrationModuleTest : BaseApplicationTest() {
@Test @Test
fun `when register succeeds should return generated key`() { fun `when register succeeds should return generated key`():Unit = runBlocking {
`when`(server.register()).thenReturn("ABCDEF") `when`(server.register()).thenReturn("ABCDEF")
withTestApplication(app()) { withTestApplication(app()) {
val call = handleRequest(HttpMethod.Post, "/register") val call = handleRequest(HttpMethod.Post, "/register")
@ -39,8 +40,8 @@ class RegistrationModuleTest : BaseApplicationTest() {
} }
@Test @Test
fun `when registration is unavailable should return 503`() { fun `when registration is unavailable should return 503`():Unit = runBlocking {
`when`(server.register()).thenThrow(RegistrationUnavailableException()) `when`(server.register()).thenThrow(ServiceUnavailable())
withTestApplication(app()) { withTestApplication(app()) {
val call = handleRequest(HttpMethod.Post, "/register") val call = handleRequest(HttpMethod.Post, "/register")
assertEquals(HttpStatusCode.ServiceUnavailable, call.response.status()) assertEquals(HttpStatusCode.ServiceUnavailable, call.response.status())

@ -21,6 +21,7 @@ package org.isoron.uhabits.sync.app
import io.ktor.http.* import io.ktor.http.*
import io.ktor.server.testing.* import io.ktor.server.testing.*
import kotlinx.coroutines.*
import org.isoron.uhabits.sync.* import org.isoron.uhabits.sync.*
import org.junit.Test import org.junit.Test
import org.mockito.Mockito.* import org.mockito.Mockito.*
@ -31,8 +32,8 @@ class StorageModuleTest : BaseApplicationTest() {
private val data2 = SyncData(2, "Hello new world") private val data2 = SyncData(2, "Hello new world")
@Test @Test
fun `when get succeeds should return data`() { fun `when get succeeds should return data`(): Unit = runBlocking {
`when`(server.get("k1")).thenReturn(data1) `when`(server.getData("k1")).thenReturn(data1)
withTestApplication(app()) { withTestApplication(app()) {
handleGet("/db/k1").apply { handleGet("/db/k1").apply {
assertEquals(HttpStatusCode.OK, response.status()) assertEquals(HttpStatusCode.OK, response.status())
@ -42,19 +43,19 @@ class StorageModuleTest : BaseApplicationTest() {
} }
@Test @Test
fun `when get version succeeds should return version`() { fun `when get version succeeds should return version`(): Unit = runBlocking {
`when`(server.get("k1")).thenReturn(data1) `when`(server.getDataVersion("k1")).thenReturn(30)
withTestApplication(app()) { withTestApplication(app()) {
handleGet("/db/k1/version").apply { handleGet("/db/k1/version").apply {
assertEquals(HttpStatusCode.OK, response.status()) assertEquals(HttpStatusCode.OK, response.status())
assertEquals("1", response.content) assertEquals(GetDataVersionResponse(30).toJson(), response.content)
} }
} }
} }
@Test @Test
fun `when get with invalid key should return 404`() { fun `when get with invalid key should return 404`(): Unit = runBlocking {
`when`(server.get("k1")).thenThrow(KeyNotFoundException()) `when`(server.getData("k1")).thenThrow(KeyNotFoundException())
withTestApplication(app()) { withTestApplication(app()) {
handleGet("/db/k1").apply { handleGet("/db/k1").apply {
assertEquals(HttpStatusCode.NotFound, response.status()) assertEquals(HttpStatusCode.NotFound, response.status())
@ -64,17 +65,19 @@ class StorageModuleTest : BaseApplicationTest() {
@Test @Test
fun `when put succeeds should return OK`() { fun `when put succeeds should return OK`(): Unit = runBlocking {
withTestApplication(app()) { withTestApplication(app()) {
handlePut("/db/k1", data1).apply { handlePut("/db/k1", data1).apply {
assertEquals(HttpStatusCode.OK, response.status()) runBlocking {
verify(server).put("k1", data1) assertEquals(HttpStatusCode.OK, response.status())
verify(server).put("k1", data1)
}
} }
} }
} }
@Test @Test
fun `when put with invalid key should return 404`() { fun `when put with invalid key should return 404`(): Unit = runBlocking {
`when`(server.put("k1", data1)).thenThrow(KeyNotFoundException()) `when`(server.put("k1", data1)).thenThrow(KeyNotFoundException())
withTestApplication(app()) { withTestApplication(app()) {
handlePut("/db/k1", data1).apply { handlePut("/db/k1", data1).apply {
@ -84,13 +87,12 @@ class StorageModuleTest : BaseApplicationTest() {
} }
@Test @Test
fun `when put with invalid version should return 409 and current data`() { fun `when put with invalid version should return 409 and current data`(): Unit = runBlocking {
`when`(server.put("k1", data1)).thenThrow(EditConflictException()) `when`(server.put("k1", data1)).thenThrow(EditConflictException())
`when`(server.get("k1")).thenReturn(data2) `when`(server.getData("k1")).thenReturn(data2)
withTestApplication(app()) { withTestApplication(app()) {
handlePut("/db/k1", data1).apply { handlePut("/db/k1", data1).apply {
assertEquals(HttpStatusCode.Conflict, response.status()) assertEquals(HttpStatusCode.Conflict, response.status())
assertEquals(data2.toJson(), response.content)
} }
} }
} }

Loading…
Cancel
Save