Skip to content

Commit

Permalink
fix: duplicated messages conversation creation (WPB-1916) (#1816)
Browse files Browse the repository at this point in the history
* fix: avoid duplication of event in case already handled

* fix: avoid duplication of event in case already handled

* fix: tests cov

* fix: tests cov repo

* fix: tests cov repo
  • Loading branch information
yamilmedina authored Jun 22, 2023
1 parent 4c639f1 commit f445ee8
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ import com.wire.kalium.logic.functional.onFailure
import com.wire.kalium.logic.functional.onSuccess
import com.wire.kalium.logic.kaliumLogger
import com.wire.kalium.logic.wrapApiRequest
import com.wire.kalium.logic.wrapProteusRequest
import com.wire.kalium.logic.wrapMLSRequest
import com.wire.kalium.logic.wrapProteusRequest
import com.wire.kalium.logic.wrapStorageRequest
import com.wire.kalium.network.api.base.authenticated.client.ClientApi
import com.wire.kalium.network.api.base.authenticated.conversation.ConvProtocol
Expand Down Expand Up @@ -100,6 +100,19 @@ interface ConversationRepository {
originatedFromEvent: Boolean = false
): Either<CoreFailure, Unit>

/**
* Creates a conversation from a new event
*
* @param conversation from event
* @param selfUserTeamId - self user team id if team user
* @return Either<CoreFailure, Boolean> - true if the conversation was created, false if it was already present
*/
suspend fun persistConversation(
conversation: ConversationResponse,
selfUserTeamId: String?,
originatedFromEvent: Boolean = false
): Either<CoreFailure, Boolean>

suspend fun getConversationList(): Either<StorageFailure, Flow<List<Conversation>>>
suspend fun observeConversationList(): Flow<List<Conversation>>
suspend fun observeConversationListDetails(): Flow<List<ConversationDetails>>
Expand Down Expand Up @@ -275,6 +288,27 @@ internal class ConversationDataSource internal constructor(
return latestResult
}

override suspend fun persistConversation(
conversation: ConversationResponse,
selfUserTeamId: String?,
originatedFromEvent: Boolean
): Either<CoreFailure, Boolean> = wrapStorageRequest {
val isNewConversation = conversationDAO.getConversationBaseInfoByQualifiedID(conversation.id.toDao()) == null
if (isNewConversation) {
conversationDAO.insertConversation(
conversationMapper.fromApiModelToDaoModel(
conversation,
mlsGroupState = conversation.groupId?.let { mlsGroupState(idMapper.fromGroupIDEntity(it), originatedFromEvent) },
selfTeamIdProvider().getOrNull(),
)
)
conversationDAO.insertMembersWithQualifiedId(
memberMapper.fromApiModelToDaoModel(conversation.members), idMapper.fromApiToDao(conversation.id)
)
}
isNewConversation
}

override suspend fun persistConversations(
conversations: List<ConversationResponse>,
selfUserTeamId: String?,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package com.wire.kalium.logic.sync.receiver.conversation

import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.data.conversation.ConversationRepository
import com.wire.kalium.logic.data.conversation.NewGroupConversationSystemMessagesCreator
import com.wire.kalium.logic.data.event.Event
Expand All @@ -37,7 +36,7 @@ import com.wire.kalium.logic.kaliumLogger
import com.wire.kalium.util.DateTimeUtil

interface NewConversationEventHandler {
suspend fun handle(event: Event.Conversation.NewConversation): Either<CoreFailure, Unit>
suspend fun handle(event: Event.Conversation.NewConversation)
}

internal class NewConversationEventHandlerImpl(
Expand All @@ -47,29 +46,41 @@ internal class NewConversationEventHandlerImpl(
private val newGroupConversationSystemMessagesCreator: NewGroupConversationSystemMessagesCreator,
) : NewConversationEventHandler {

override suspend fun handle(event: Event.Conversation.NewConversation): Either<CoreFailure, Unit> = conversationRepository
.persistConversations(listOf(event.conversation), selfTeamIdProvider().getOrNull()?.value, originatedFromEvent = true)
.flatMap { conversationRepository.updateConversationModifiedDate(event.conversationId, DateTimeUtil.currentInstant()) }
.flatMap {
userRepository.fetchUsersIfUnknownByIds(event.conversation.members.otherMembers.map { it.id.toModel() }.toSet())
}.onSuccess {
createSystemMessagesForNewConversation(event)
kaliumLogger.logEventProcessing(EventLoggingStatus.SUCCESS, event)
}
.onFailure {
kaliumLogger.logEventProcessing(EventLoggingStatus.FAILURE, event, Pair("errorInfo", "$it"))
}
override suspend fun handle(event: Event.Conversation.NewConversation) {
conversationRepository
.persistConversation(event.conversation, selfTeamIdProvider().getOrNull()?.value, true)
.flatMap { isNewUnhandledConversation ->
conversationRepository.updateConversationModifiedDate(event.conversationId, DateTimeUtil.currentInstant())
Either.Right(isNewUnhandledConversation)
}.flatMap { isNewUnhandledConversation ->
userRepository.fetchUsersIfUnknownByIds(event.conversation.members.otherMembers.map { it.id.toModel() }.toSet())
Either.Right(isNewUnhandledConversation)
}.onSuccess { isNewUnhandledConversation ->
createSystemMessagesForNewConversation(isNewUnhandledConversation, event)
kaliumLogger.logEventProcessing(EventLoggingStatus.SUCCESS, event)
}.onFailure {
kaliumLogger.logEventProcessing(EventLoggingStatus.FAILURE, event, Pair("errorInfo", "$it"))
}
}

/**
* Creates system messages for new conversation.
* Conversation started, members added and failed, read receipt status.
*
* @param isNewUnhandledConversation if true we need to generate system messages for new conversation
* @param event new conversation event
*/
private suspend fun createSystemMessagesForNewConversation(event: Event.Conversation.NewConversation) = run {
newGroupConversationSystemMessagesCreator.conversationStarted(event.senderUserId, event.conversation)
newGroupConversationSystemMessagesCreator.conversationResolvedMembersAddedAndFailed(
event.conversationId.toDao(),
event.conversation
)
newGroupConversationSystemMessagesCreator.conversationReadReceiptStatus(event.conversation)
private suspend fun createSystemMessagesForNewConversation(
isNewUnhandledConversation: Boolean,
event: Event.Conversation.NewConversation
) {
if (isNewUnhandledConversation) {
newGroupConversationSystemMessagesCreator.conversationStarted(event.senderUserId, event.conversation)
newGroupConversationSystemMessagesCreator.conversationResolvedMembersAddedAndFailed(
event.conversationId.toDao(),
event.conversation
)
newGroupConversationSystemMessagesCreator.conversationReadReceiptStatus(event.conversation)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,52 @@ class ConversationRepositoryTest {
}
}

@Test
fun givenNewConversationEvent_whenCallingPersistConversationFromEvent_thenConversationShouldBePersisted() = runTest {
val event = Event.Conversation.NewConversation("id", TestConversation.ID, false, TestUser.SELF.id, "time", CONVERSATION_RESPONSE)
val selfUserFlow = flowOf(TestUser.SELF)
val (arrangement, conversationRepository) = Arrangement()
.withSelfUserFlow(selfUserFlow)
.withExpectedConversationBase(null)
.arrange()

conversationRepository.persistConversation(event.conversation, "teamId")

with(arrangement) {
verify(conversationDAO)
.suspendFunction(conversationDAO::insertConversation)
.with(
matching { conversation ->
conversation.id.value == CONVERSATION_RESPONSE.id.value
}
)
.wasInvoked(exactly = once)
}
}

@Test
fun givenNewConversationEvent_whenCallingPersistConversationFromEventAndExists_thenConversationPersistenceShouldBeSkipped() = runTest {
val event = Event.Conversation.NewConversation("id", TestConversation.ID, false, TestUser.SELF.id, "time", CONVERSATION_RESPONSE)
val selfUserFlow = flowOf(TestUser.SELF)
val (arrangement, conversationRepository) = Arrangement()
.withSelfUserFlow(selfUserFlow)
.withExpectedConversationBase(TestConversation.ENTITY)
.arrange()

conversationRepository.persistConversation(event.conversation, "teamId")

with(arrangement) {
verify(conversationDAO)
.suspendFunction(conversationDAO::insertConversation)
.with(
matching { conversation ->
conversation.id.value == CONVERSATION_RESPONSE.id.value
}
)
.wasNotInvoked()
}
}

@Test
fun givenNewConversationEventWithMlsConversation_whenCallingInsertConversation_thenMlsGroupExistenceShouldBeQueried() = runTest {
val event = Event.Conversation.NewConversation(
Expand Down Expand Up @@ -1070,6 +1116,13 @@ class ConversationRepositoryTest {
.thenReturn(conversationEntity)
}

fun withExpectedConversationBase(conversationEntity: ConversationEntity?) = apply {
given(conversationDAO)
.suspendFunction(conversationDAO::getConversationBaseInfoByQualifiedID)
.whenInvokedWith(any())
.thenReturn(conversationEntity)
}

fun withFetchConversationDetailsResult(
response: NetworkResponse<ConversationResponse>,
idMatcher: Matcher<ConversationIdDTO> = any()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class NewConversationEventHandlerTest {

val (arrangement, eventHandler) = Arrangement()
.withUpdateConversationModifiedDateReturning(Either.Right(Unit))
.withPersistingConversations(Either.Right(Unit))
.withPersistingConversations(Either.Right(true))
.withFetchUsersIfUnknownIds(members)
.withSelfUserTeamId(Either.Right(teamId))
.withConversationStartedSystemMessage()
Expand All @@ -88,8 +88,8 @@ class NewConversationEventHandlerTest {
eventHandler.handle(event)

verify(arrangement.conversationRepository)
.suspendFunction(arrangement.conversationRepository::persistConversations)
.with(eq(listOf(event.conversation)), eq(teamIdValue), eq(true))
.suspendFunction(arrangement.conversationRepository::persistConversation)
.with(eq(event.conversation), eq(teamIdValue))
.wasInvoked(exactly = once)

verify(arrangement.userRepository)
Expand Down Expand Up @@ -118,7 +118,7 @@ class NewConversationEventHandlerTest {

val (arrangement, eventHandler) = Arrangement()
.withUpdateConversationModifiedDateReturning(Either.Right(Unit))
.withPersistingConversations(Either.Right(Unit))
.withPersistingConversations(Either.Right(true))
.withFetchUsersIfUnknownIds(members)
.withSelfUserTeamId(Either.Right(teamId))
.withConversationStartedSystemMessage()
Expand Down Expand Up @@ -159,7 +159,7 @@ class NewConversationEventHandlerTest {

val (arrangement, eventHandler) = Arrangement()
.withUpdateConversationModifiedDateReturning(Either.Right(Unit))
.withPersistingConversations(Either.Right(Unit))
.withPersistingConversations(Either.Right(true))
.withFetchUsersIfUnknownIds(members)
.withSelfUserTeamId(Either.Right(teamId))
.withConversationStartedSystemMessage()
Expand Down Expand Up @@ -194,6 +194,66 @@ class NewConversationEventHandlerTest {
.wasInvoked(exactly = once)
}

@Test
fun givenNewGroupConversationEvent_whenHandlingItAndAlreadyPresent_thenShouldSkipPersistingTheSystemMessagesForNewConversation() =
runTest {
// given
val event = Event.Conversation.NewConversation(
id = "eventId",
conversationId = TestConversation.ID,
transient = false,
timestampIso = "timestamp",
conversation = TestConversation.CONVERSATION_RESPONSE.copy(
creator = "creatorId@creatorDomain",
receiptMode = ReceiptMode.ENABLED
),
senderUserId = TestUser.SELF.id
)

val members = event.conversation.members.otherMembers.map { it.id.toModel() }.toSet()
val teamId = TestTeam.TEAM_ID
val creatorQualifiedId = QualifiedID(
value = "creatorId",
domain = "creatorDomain"
)

val (arrangement, eventHandler) = Arrangement()
.withUpdateConversationModifiedDateReturning(Either.Right(Unit))
.withPersistingConversations(Either.Right(false))
.withFetchUsersIfUnknownIds(members)
.withSelfUserTeamId(Either.Right(teamId))
.withConversationStartedSystemMessage()
.withConversationResolvedMembersSystemMessage()
.withReadReceiptsSystemMessage()
.withQualifiedId(creatorQualifiedId)
.arrange()

// when
eventHandler.handle(event)

// then
verify(arrangement.newGroupConversationSystemMessagesCreator)
.suspendFunction(
arrangement.newGroupConversationSystemMessagesCreator::conversationStarted,
fun2<UserId, ConversationResponse>()
)
.with(any(), eq(event.conversation))
.wasNotInvoked()

verify(arrangement.newGroupConversationSystemMessagesCreator)
.suspendFunction(arrangement.newGroupConversationSystemMessagesCreator::conversationResolvedMembersAddedAndFailed)
.with(eq(event.conversationId.toDao()), eq(event.conversation))
.wasNotInvoked()

verify(arrangement.newGroupConversationSystemMessagesCreator)
.suspendFunction(
arrangement.newGroupConversationSystemMessagesCreator::conversationReadReceiptStatus,
fun1<ConversationResponse>()
)
.with(eq(event.conversation))
.wasNotInvoked()
}

private class Arrangement {
@Mock
val conversationRepository = mock(classOf<ConversationRepository>())
Expand Down Expand Up @@ -225,10 +285,10 @@ class NewConversationEventHandlerTest {
.thenReturn(result)
}

fun withPersistingConversations(result: Either<StorageFailure, Unit>) = apply {
fun withPersistingConversations(result: Either<StorageFailure, Boolean>) = apply {
given(conversationRepository)
.suspendFunction(conversationRepository::persistConversations)
.whenInvokedWith(any(), any(), any())
.suspendFunction(conversationRepository::persistConversation)
.whenInvokedWith(any(), any())
.thenReturn(result)
}

Expand Down

0 comments on commit f445ee8

Please sign in to comment.