Skip to content

Commit

Permalink
Merge pull request #71 from opensrp/subsequent-syncs-have-records-to-…
Browse files Browse the repository at this point in the history
…send-fix

Increment lastUpdated value for each datatype's last received history record after sync completes
  • Loading branch information
Rkareko authored Sep 21, 2023
2 parents 3a81476 + 8c170ae commit 8421c06
Show file tree
Hide file tree
Showing 11 changed files with 144 additions and 23 deletions.
2 changes: 1 addition & 1 deletion p2p-lib/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ afterEvaluate {
from(components["release"])
artifactId = "p2p-lib"
groupId = "org.smartregister"
version = "0.6.7-SNAPSHOT"
version = "0.6.8-SNAPSHOT"
pom {
name.set("Peer to Peer Library")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
package org.smartregister.p2p.dao

import java.util.TreeSet
import org.smartregister.p2p.model.RecordCount
import org.smartregister.p2p.search.data.JsonData
import org.smartregister.p2p.sync.DataType

interface SenderTransferDao {
fun getP2PDataTypes(): TreeSet<DataType>

fun getTotalRecordCount(highestRecordIdMap: HashMap<String, Long>): Long
fun getTotalRecordCount(highestRecordIdMap: HashMap<String, Long>): RecordCount

fun getJsonData(
dataType: DataType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
*/
package org.smartregister.p2p.data_sharing

import org.smartregister.p2p.model.RecordCount
import org.smartregister.p2p.sync.DataType

/** Created by Ephraim Kigamba - [email protected] on 21-03-2022. */
data class Manifest(
val dataType: DataType,
val recordsSize: Int,
val payloadSize: Int,
val totalRecordCount: Long = 0
val totalRecordCount: Long = 0,
val recordCount: RecordCount
)
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ constructor(
private lateinit var currentManifest: Manifest
private var totalRecordCount: Long = 0
private var totalReceivedRecordCount: Long = 0
private val receivedResourceCountMap: HashMap<String, Long> = HashMap()

fun processManifest(manifest: Manifest) {
currentManifest = manifest
Expand Down Expand Up @@ -72,6 +73,18 @@ constructor(
totalRecords = totalRecordCount
)

/**
* When all records have been received increment lastUpdatedAt by 1 This ensures that when a new
* transfer is initiated the already transferred items with the highest lastUpdated value are
* not sent again Fixes issue documented here
* https://github.com/opensrp/fhircore/issues/2390#issuecomment-1726305575
*/
if (transferCompletedForCurrentDataType(data)) {
lastUpdatedAt += 1
Timber.i(
"Last updatedAt incremented by 1 to $lastUpdatedAt for ${currentManifest.dataType.name}"
)
} else lastUpdatedAt
addOrUpdateLastRecord(currentManifest.dataType.name, lastUpdatedAt = lastUpdatedAt)

p2PReceiverViewModel.processIncomingManifest()
Expand Down Expand Up @@ -105,4 +118,27 @@ constructor(
private fun getP2pReceivedHistoryDao(): P2pReceivedHistoryDao {
return P2PLibrary.getInstance().getDb().p2pReceivedHistoryDao()
}

/**
* This method determines whether all records of the current data type being processed have been
* transferred
* @param data [JSONArray] contains the batch of records being transferred
* @return Boolean indicating whether all records for a particular data type have been transferred
*/
private fun transferCompletedForCurrentDataType(data: JSONArray): Boolean {
val currentDataTypeTotalRecordCount =
currentManifest.recordCount.dataTypeTotalCountMap[currentManifest.dataType.name]

receivedResourceCountMap[currentManifest.dataType.name] =
if (receivedResourceCountMap[currentManifest.dataType.name] != null)
receivedResourceCountMap[currentManifest.dataType.name]?.plus(data!!.length())!!
else data!!.length().toLong()

Timber.i(
"totalReceivedRecordCount is ${receivedResourceCountMap[currentManifest.dataType.name]} and totalRecordCount is $currentDataTypeTotalRecordCount"
)

return receivedResourceCountMap[currentManifest.dataType.name] ==
currentDataTypeTotalRecordCount
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import java.util.TreeSet
import kotlinx.coroutines.withContext
import org.smartregister.p2p.P2PLibrary
import org.smartregister.p2p.model.P2PReceivedHistory
import org.smartregister.p2p.model.RecordCount
import org.smartregister.p2p.payload.BytePayload
import org.smartregister.p2p.payload.PayloadContract
import org.smartregister.p2p.search.ui.P2PSenderViewModel
Expand All @@ -37,12 +38,12 @@ constructor(
private val remainingLastRecordIds = HashMap<String, Long>()
private val batchSize = 25
private var awaitingDataTypeRecordsBatchSize = 0
private var totalRecordCount: Long = 0
private var totalSentRecordCount: Long = 0

private lateinit var awaitingPayload: PayloadContract<out Any>
private var sendingSyncCompleteManifest = false
private var recordsBatchOffset = 0
private var recordCount: RecordCount = RecordCount(0L, hashMapOf())

suspend fun startSyncProcess() {
Timber.i("Start sync process")
Expand All @@ -69,7 +70,7 @@ constructor(
}

fun populateTotalRecordCount() {
totalRecordCount =
recordCount =
P2PLibrary.getInstance().getSenderTransferDao().getTotalRecordCount(remainingLastRecordIds)
}

Expand All @@ -83,7 +84,8 @@ constructor(
Manifest(
dataType = DataType(name, DataType.Filetype.JSON, 0),
recordsSize = 0,
payloadSize = 0
payloadSize = 0,
recordCount = recordCount
)

sendingSyncCompleteManifest = true
Expand Down Expand Up @@ -129,7 +131,8 @@ constructor(
dataType = dataType,
recordsSize = awaitingDataTypeRecordsBatchSize,
payloadSize = recordsJsonString.length,
totalRecordCount = totalRecordCount
totalRecordCount = recordCount.totalRecordCount,
recordCount = recordCount
)

p2PSenderViewModel.sendManifest(manifest = manifest)
Expand All @@ -154,10 +157,12 @@ constructor(

open fun updateTotalSentRecordCount() {
this.totalSentRecordCount = totalSentRecordCount + awaitingDataTypeRecordsBatchSize
Timber.i("Progress update: Updating progress to $totalSentRecordCount out of $totalRecordCount")
Timber.i(
"Progress update: Updating progress to $totalSentRecordCount out of ${recordCount.totalRecordCount}"
)
p2PSenderViewModel.updateTransferProgress(
totalSentRecords = totalSentRecordCount,
totalRecords = totalRecordCount
totalRecords = recordCount.totalRecordCount
)
}
}
21 changes: 21 additions & 0 deletions p2p-lib/src/main/java/org/smartregister/p2p/model/RecordCount.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2022-2023 Ona Systems, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.smartregister.p2p.model

data class RecordCount(
val totalRecordCount: Long = 0,
val dataTypeTotalCountMap: HashMap<String, Long> = hashMapOf()
)
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.smartregister.p2p.dao.P2pReceivedHistoryDao
import org.smartregister.p2p.dao.ReceiverTransferDao
import org.smartregister.p2p.model.P2PReceivedHistory
import org.smartregister.p2p.model.P2PState
import org.smartregister.p2p.model.RecordCount
import org.smartregister.p2p.robolectric.RobolectricTest
import org.smartregister.p2p.search.ui.P2PReceiverViewModel
import org.smartregister.p2p.shadows.ShadowAppDatabase
Expand Down Expand Up @@ -84,7 +85,13 @@ class SyncReceiverHandlerTest : RobolectricTest() {
)

dataType = DataType(name = "Group", type = DataType.Filetype.JSON, position = 0)
manifest = Manifest(dataType = dataType, recordsSize = 25, payloadSize = 50)
manifest =
Manifest(
dataType = dataType,
recordsSize = 25,
payloadSize = 50,
recordCount = RecordCount(50L, hashMapOf())
)

syncReceiverHandler =
spyk(
Expand All @@ -99,7 +106,13 @@ class SyncReceiverHandlerTest : RobolectricTest() {
@Test
fun `processManifest() calls p2PReceiverViewModel#handleDataTransferCompleteManifest() when data type name is sync complete`() {
dataType = DataType(name = Constants.SYNC_COMPLETE, type = DataType.Filetype.JSON, position = 0)
val manifest = Manifest(dataType = dataType, recordsSize = 25, payloadSize = 50)
val manifest =
Manifest(
dataType = dataType,
recordsSize = 25,
payloadSize = 50,
recordCount = RecordCount(50L, hashMapOf())
)
every { p2PReceiverViewModel.updateTransferProgress(any(), any()) } just runs
every {
p2PReceiverViewModel.handleDataTransferCompleteManifest(P2PState.TRANSFER_COMPLETE)
Expand All @@ -116,7 +129,13 @@ class SyncReceiverHandlerTest : RobolectricTest() {
fun `processManifest() calls p2PReceiverViewModel#handleDataTransferCompleteManifest() when data type name is data up to date`() {
dataType =
DataType(name = Constants.DATA_UP_TO_DATE, type = DataType.Filetype.JSON, position = 0)
val manifest = Manifest(dataType = dataType, recordsSize = 0, payloadSize = 0)
val manifest =
Manifest(
dataType = dataType,
recordsSize = 0,
payloadSize = 0,
recordCount = RecordCount(0L, hashMapOf())
)
every { p2PReceiverViewModel.updateTransferProgress(any(), any()) } just runs
every { p2PReceiverViewModel.handleDataTransferCompleteManifest(P2PState.DATA_UP_TO_DATE) } just
runs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.smartregister.p2p.CoroutineTestRule
import org.smartregister.p2p.P2PLibrary
import org.smartregister.p2p.dao.SenderTransferDao
import org.smartregister.p2p.model.P2PReceivedHistory
import org.smartregister.p2p.model.RecordCount
import org.smartregister.p2p.payload.BytePayload
import org.smartregister.p2p.robolectric.RobolectricTest
import org.smartregister.p2p.search.data.JsonData
Expand Down Expand Up @@ -259,7 +260,7 @@ class SyncSenderHandlerTest : RobolectricTest() {
fun `updateTotalSentRecordCount() calls p2PSenderViewModel#updateTransferProgress`() {
ReflectionHelpers.setField(syncSenderHandler, "awaitingDataTypeRecordsBatchSize", 25)
ReflectionHelpers.setField(syncSenderHandler, "totalSentRecordCount", 10)
ReflectionHelpers.setField(syncSenderHandler, "totalRecordCount", 40)
ReflectionHelpers.setField(syncSenderHandler, "recordCount", RecordCount(40L, hashMapOf()))
every { p2PSenderViewModel.updateTransferProgress(any(), any()) } just runs
syncSenderHandler.updateTotalSentRecordCount()

Expand All @@ -268,10 +269,14 @@ class SyncSenderHandlerTest : RobolectricTest() {

@Test
fun `populateTotalRecordCount() returns correct data`() {
every { senderTransferDao.getTotalRecordCount(any()) } returns 23
Assert.assertEquals(0, ReflectionHelpers.getField<Long>(syncSenderHandler, "totalRecordCount"))
every { senderTransferDao.getTotalRecordCount(any()) } returns RecordCount(23L, hashMapOf())
val initialRecordCount =
ReflectionHelpers.getField<RecordCount>(syncSenderHandler, "recordCount")
Assert.assertEquals(0, initialRecordCount.totalRecordCount)
syncSenderHandler.populateTotalRecordCount()
Assert.assertEquals(23, ReflectionHelpers.getField<Long>(syncSenderHandler, "totalRecordCount"))
val updatedRecordCount =
ReflectionHelpers.getField<RecordCount>(syncSenderHandler, "recordCount")
Assert.assertEquals(23, updatedRecordCount.totalRecordCount)
}

fun getDataTypes(): TreeSet<DataType> =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import org.junit.Test
import org.robolectric.util.ReflectionHelpers
import org.smartregister.p2p.CoroutineTestRule
import org.smartregister.p2p.WifiP2pBroadcastReceiver
import org.smartregister.p2p.model.RecordCount
import org.smartregister.p2p.payload.BytePayload
import org.smartregister.p2p.payload.PayloadContract
import org.smartregister.p2p.payload.StringPayload
Expand Down Expand Up @@ -1041,6 +1042,11 @@ class WifiDirectDataSharingStrategyTest : RobolectricTest() {

private fun populateManifest(): Manifest {
val dataType = DataType(name = "Patient", type = DataType.Filetype.JSON, position = 1)
return Manifest(dataType = dataType, recordsSize = 25, payloadSize = 50)
return Manifest(
dataType = dataType,
recordsSize = 25,
payloadSize = 50,
recordCount = RecordCount(50L, hashMapOf())
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import org.smartregister.p2p.data_sharing.SyncReceiverHandler
import org.smartregister.p2p.data_sharing.WifiDirectDataSharingStrategy
import org.smartregister.p2p.model.P2PReceivedHistory
import org.smartregister.p2p.model.P2PState
import org.smartregister.p2p.model.RecordCount
import org.smartregister.p2p.model.TransferProgress
import org.smartregister.p2p.payload.BytePayload
import org.smartregister.p2p.payload.PayloadContract
Expand Down Expand Up @@ -104,7 +105,13 @@ class P2PReceiverViewModelTest : RobolectricTest() {
@Test
fun `processIncomingManifest() with manifest calls syncReceiver#processManifest()`() {
val dataType = DataType(name = "Patient", type = DataType.Filetype.JSON, position = 1)
expectedManifest = Manifest(dataType = dataType, recordsSize = 25, payloadSize = 50)
expectedManifest =
Manifest(
dataType = dataType,
recordsSize = 25,
payloadSize = 50,
recordCount = RecordCount(50L, hashMapOf())
)
every { syncReceiverHandler.processManifest(manifest = expectedManifest) } just runs
every { p2PReceiverViewModel.listenForIncomingManifest() } answers { expectedManifest }
p2PReceiverViewModel.processIncomingManifest()
Expand All @@ -115,7 +122,13 @@ class P2PReceiverViewModelTest : RobolectricTest() {
fun `processIncomingManifest() with sync complete manifest value calls p2PReceiverViewModel#handleDataTransferCompleteManifest()`() {
val dataType =
DataType(name = Constants.SYNC_COMPLETE, type = DataType.Filetype.JSON, position = 1)
expectedManifest = Manifest(dataType = dataType, recordsSize = 25, payloadSize = 50)
expectedManifest =
Manifest(
dataType = dataType,
recordsSize = 25,
payloadSize = 50,
recordCount = RecordCount(50L, hashMapOf())
)
every { p2PReceiverViewModel.listenForIncomingManifest() } answers { expectedManifest }
p2PReceiverViewModel.processIncomingManifest()
verify(exactly = 1) {
Expand All @@ -126,7 +139,13 @@ class P2PReceiverViewModelTest : RobolectricTest() {
@Test
fun `listenForIncomingManifest() returns correct manifest`() {
val dataType = DataType(name = "Group", type = DataType.Filetype.JSON, position = 0)
expectedManifest = Manifest(dataType = dataType, recordsSize = 25, payloadSize = 50)
expectedManifest =
Manifest(
dataType = dataType,
recordsSize = 25,
payloadSize = 50,
recordCount = RecordCount(50L, hashMapOf())
)
every { dataSharingStrategy.receiveManifest(device = any(), operationListener = any()) } answers
{
expectedManifest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import org.smartregister.p2p.data_sharing.DeviceInfo
import org.smartregister.p2p.data_sharing.Manifest
import org.smartregister.p2p.data_sharing.SyncSenderHandler
import org.smartregister.p2p.data_sharing.WifiDirectDataSharingStrategy
import org.smartregister.p2p.model.RecordCount
import org.smartregister.p2p.model.TransferProgress
import org.smartregister.p2p.payload.PayloadContract
import org.smartregister.p2p.payload.StringPayload
Expand Down Expand Up @@ -209,7 +210,13 @@ internal class P2PSenderViewModelTest : RobolectricTest() {
fun `sendManifest() should call dataSharingStrategy#sendManifest()`() {
every { dataSharingStrategy.sendManifest(any(), any(), any()) } just runs

val manifest = Manifest(DataType("Patient", DataType.Filetype.JSON, 0), 250, 50)
val manifest =
Manifest(
DataType("Patient", DataType.Filetype.JSON, 0),
250,
50,
recordCount = RecordCount(250L, hashMapOf())
)

p2PSenderViewModel.sendManifest(manifest)

Expand All @@ -228,7 +235,7 @@ internal class P2PSenderViewModelTest : RobolectricTest() {
val dataTypes = TreeSet<DataType>()
dataTypes.add(DataType("Patient", DataType.Filetype.JSON, 0))
every { p2pSenderTransferDao.getP2PDataTypes() } returns dataTypes
every { p2pSenderTransferDao.getTotalRecordCount(any()) } returns 0
every { p2pSenderTransferDao.getTotalRecordCount(any()) } returns RecordCount(0L, hashMapOf())
coEvery { syncSenderHandler.startSyncProcess() } just runs
every { p2PSenderViewModel.createSyncSenderHandler(any(), any()) } returns syncSenderHandler

Expand All @@ -243,7 +250,7 @@ internal class P2PSenderViewModelTest : RobolectricTest() {
val syncPayload = StringPayload("[]")

every { p2pSenderTransferDao.getP2PDataTypes() } returns TreeSet<DataType>()
every { p2pSenderTransferDao.getTotalRecordCount(any()) } returns 0
every { p2pSenderTransferDao.getTotalRecordCount(any()) } returns RecordCount(0L, hashMapOf())
every { p2PSenderViewModel.disconnect() } just runs

p2PSenderViewModel.processReceivedHistory(syncPayload)
Expand Down

0 comments on commit 8421c06

Please sign in to comment.