Skip to content

Commit

Permalink
feat(federation): offline backends handling (epic) (#1548)
Browse files Browse the repository at this point in the history
* Revert "feat: feature federation msg failed to send handling (AR-3015) (#1547)"

This reverts commit 6011673.

* chore(migration): out of conflict

* chore: more code documentation

* chore: recover small improv for uneeded call

* chore: detekt

* fix: sq migration updated after a really long time =/

* feat: offline backends - handling list-prekeys failed to list (AR-3171) (#1662)

* feat: remove unused code, add mapping of proto contentc

* feat: expand message target to include users to ignore

* feat: resetting intentions on message targets

* feat: resetting intentions on message targets

* feat: test adjustment for target change

* feat: adjusting signatures and simplify message sender mapping

* feat: adjusting signatures and simplify message sender mapping

* feat: adjusting signatures and simplify message sender mapping

* feat: adjusting signatures and simplify message sender mapping

* feat: clean code, and fallback for clients targets

* feat: ignoring failed recipients mapping for non regular messages

* feat: ignoring failed recipients mapping for non regular messages

* feat: ignoring failed recipients mapping for non regular messages

* feat: adding test coverage

* feat: adding test coverage

* feat: adding test coverage

* chore: tmp for merge

* feat: wip, merge sqw

* feat: offline backends - users metadata (AR-3124) (#1698)

* feat: adjust query to hide 1:1 convos without metadata

* feat: adjustment query to consider deleted users logic as it is now

* feat: tests for query conversations details

* feat: pr comments single quotes

* feat(offline-backends): users and conversations without metadata refetch pt1. (AR-3123) (#1736)

* feat: adjust query to hide 1:1 convos without metadata

* feat: adjustment query to consider deleted users logic as it is now

* feat: tests for query conversations details

* feat: persistence for getting users out of sync

* feat: persistence for getting users out of sync

* feat: pr comments single quotes

* feat: invok operator

* feat: persist failed convos

* feat: cleanup

* feat: tests cov

* feat: tests cov

* feat: tests cov

* feat: tests cov

* feat: refactor, persist users withoutmetadata with dedicated field

* feat: refactor, relay on missing metadata field for refetch usres

* feat: refactor, relay on missing metadata field for refetch conversations

* feat: refactor, relay on missing metadata field for refetch conversations

* feat: refactor, relay on missing metadata field for refetch conversations

* feat: refactor, fixing tests

* chore: add migration tests

* feat(offline-backends): users and conversations without metadata refetch pt2. (AR-3123) (#1740)

* feat: adjust query to hide 1:1 convos without metadata

* feat: adjustment query to consider deleted users logic as it is now

* feat: tests for query conversations details

* feat: persistence for getting users out of sync

* feat: persistence for getting users out of sync

* feat: pr comments single quotes

* feat: invok operator

* feat: persist failed convos

* feat: skeleton classes to build upon

* feat: skeleton classes to build upon

* feat: skeleton classes to build upon

* feat: queries and metadata for syncing metadata

* feat: provider di

* feat: cleanup

* feat: cleanup, clock instant

* feat: tests cov

* feat: tests cov

* feat: tests cov

* feat: tests cov

* feat: refactor, persist users withoutmetadata with dedicated field

* feat: refactor, relay on missing metadata field for refetch usres

* feat: refactor, relay on missing metadata field for refetch conversations

* feat: refactor, relay on missing metadata field for refetch conversations

* feat: refactor, relay on missing metadata field for refetch conversations

* feat: refactor, fixing tests

* chore: add migration tests

* chore: ddetekt

* chore: new query

* chore: test for use case

* chore: test cov

* chore: test cov

* chore: change strategy to run sync after inc

* chore: logging

* chore: logging

* chore: logging ref

* fix: db ops

* fix: db ops, user persistence fixed

* chore: test cov

* feat: test coverage

* chore: wip for merge

* fix: handle edge case error when no sessions

* fix: tests

* feat: wip merge

* feat: wip merge

* feat: conversation creation with offline backends (WPB-364) (#1774)

* feat: add new response handling create convo v4

* feat: add new response handling create convo v4 test

* feat: add new response handling create convo v4 test

* feat: add tests cases and persitence of msg for failed to add

* feat: add tests cases and persitence of msg for failed to add

---------

Co-authored-by: Mohamad Jaara <mohamad.jaara@wire.com>

* chore: preparing for rebase

* chore: preparing merge

* chore: preparing merge, better naming for func

* chore: preparing merge, coverage

* chore: preparing merge, coverage

---------

Co-authored-by: Mohamad Jaara <mohamad.jaara@wire.com>
  • Loading branch information
yamilmedina and MohamadJaara authored Jun 23, 2023
1 parent 71a999c commit 8cae138
Show file tree
Hide file tree
Showing 66 changed files with 2,094 additions and 452 deletions.
15 changes: 15 additions & 0 deletions logic/src/commonMain/kotlin/com/wire/kalium/logic/CoreFailure.kt
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,21 @@ internal inline fun <T : Any> wrapStorageRequest(storageRequest: () -> T?): Eith
}
}

/**
* Wrap a storage request with a custom error handler that let's delegate the error handling to the caller.
*/
@Suppress("TooGenericExceptionCaught")
internal inline fun <T : Any> wrapStorageRequest(
noinline errorHandler: (Exception) -> Either<StorageFailure, T>,
storageRequest: () -> T?
): Either<StorageFailure, T> {
return try {
storageRequest()?.let { data -> Either.Right(data) } ?: Either.Left(StorageFailure.DataNotFound)
} catch (exception: Exception) {
errorHandler(exception)
}
}

internal inline fun <T : Any> wrapStorageNullableRequest(storageRequest: () -> T?): Either<StorageFailure, T?> {
return try {
storageRequest().let { data -> Either.Right(data) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package com.wire.kalium.logic.data.conversation

import com.wire.kalium.logic.data.connection.ConnectionStatusMapper
import com.wire.kalium.logic.data.id.IdMapper
import com.wire.kalium.logic.data.id.NetworkQualifiedId
import com.wire.kalium.logic.data.id.TeamId
import com.wire.kalium.logic.data.id.toApi
import com.wire.kalium.logic.data.id.toDao
Expand Down Expand Up @@ -77,6 +78,7 @@ interface ConversationMapper {
fun toApiModel(name: String?, members: List<UserId>, teamId: String?, options: ConversationOptions): CreateConversationRequest

fun fromMigrationModel(conversation: Conversation): ConversationEntity
fun fromFailedGroupConversationToEntity(conversationId: NetworkQualifiedId): ConversationEntity
}

@Suppress("TooManyFunctions", "LongParameterList")
Expand Down Expand Up @@ -113,6 +115,7 @@ internal class ConversationMapperImpl(
receiptMode = receiptModeMapper.fromApiToDaoModel(apiModel.receiptMode),
messageTimer = apiModel.messageTimer,
userMessageTimer = null, // user picked self deletion timer is only persisted locally
hasIncompleteMetadata = false
)

override fun fromApiModelToDaoModel(apiModel: ConvProtocol): Protocol = when (apiModel) {
Expand Down Expand Up @@ -357,6 +360,31 @@ internal class ConversationMapperImpl(
)
}

/**
* Default values and marked as [ConversationEntity.hasIncompleteMetadata] = true.
* So later we can re-fetch them.
*/
override fun fromFailedGroupConversationToEntity(conversationId: NetworkQualifiedId): ConversationEntity = ConversationEntity(
id = conversationId.toDao(),
name = null,
type = ConversationEntity.Type.GROUP,
teamId = null,
protocolInfo = ProtocolInfo.Proteus,
mutedStatus = ConversationEntity.MutedStatus.ALL_ALLOWED,
mutedTime = 0,
removedBy = null,
creatorId = "",
lastNotificationDate = "1970-01-01T00:00:00.000Z".toInstant(),
lastModifiedDate = "1970-01-01T00:00:00.000Z".toInstant(),
lastReadDate = "1970-01-01T00:00:00.000Z".toInstant(),
access = emptyList(),
accessRole = emptyList(),
receiptMode = ConversationEntity.ReceiptMode.DISABLED,
messageTimer = null,
userMessageTimer = null,
hasIncompleteMetadata = true
)

private fun ConversationResponse.getProtocolInfo(mlsGroupState: GroupState?): ProtocolInfo {
return when (protocol) {
ConvProtocol.MLS -> ProtocolInfo.MLS(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import com.wire.kalium.logic.data.client.MLSClientProvider
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.id.GroupID
import com.wire.kalium.logic.data.id.IdMapper
import com.wire.kalium.logic.data.id.NetworkQualifiedId
import com.wire.kalium.logic.data.id.QualifiedID
import com.wire.kalium.logic.data.id.toApi
import com.wire.kalium.logic.data.id.toCrypto
Expand Down Expand Up @@ -210,6 +211,7 @@ interface ConversationRepository {
suspend fun getConversationUnreadEventsCount(conversationId: ConversationId): Either<StorageFailure, Long>
suspend fun getUserSelfDeletionTimer(conversationId: ConversationId): Either<StorageFailure, SelfDeletionTimer?>
suspend fun updateUserSelfDeletionTimer(conversationId: ConversationId, selfDeletionTimer: SelfDeletionTimer): Either<CoreFailure, Unit>
suspend fun syncConversationsWithoutMetadata(): Either<CoreFailure, Unit>
}

@Suppress("LongParameterList", "TooManyFunctions")
Expand Down Expand Up @@ -266,7 +268,8 @@ internal class ConversationDataSource internal constructor(
}.onSuccess { conversations ->
if (conversations.conversationsFailed.isNotEmpty()) {
kaliumLogger.withFeatureId(CONVERSATIONS)
.d("Skipping ${conversations.conversationsFailed.size} conversations failed")
.d("Handling ${conversations.conversationsFailed.size} conversations failed")
persistIncompleteConversations(conversations.conversationsFailed)
}
if (conversations.conversationsNotFound.isNotEmpty()) {
kaliumLogger.withFeatureId(CONVERSATIONS)
Expand Down Expand Up @@ -751,6 +754,31 @@ internal class ConversationDataSource internal constructor(
)
}

override suspend fun syncConversationsWithoutMetadata(): Either<CoreFailure, Unit> = wrapStorageRequest {
val conversationsWithoutMetadata = conversationDAO.getConversationsWithoutMetadata()
if (conversationsWithoutMetadata.isNotEmpty()) {
kaliumLogger.d("Numbers of conversations to refresh: ${conversationsWithoutMetadata.size}")
val conversationsWithoutMetadataIds = conversationsWithoutMetadata.map { it.toApi() }
wrapApiRequest {
conversationApi.fetchConversationsListDetails(conversationsWithoutMetadataIds)
}.onSuccess {
persistConversations(it.conversationsFound, null)
}
}
}

private suspend fun persistIncompleteConversations(
conversationsFailed: List<NetworkQualifiedId>
): Either<CoreFailure, Unit> {
return wrapStorageRequest {
if (conversationsFailed.isNotEmpty()) {
conversationDAO.insertConversations(conversationsFailed.map { conversationId ->
conversationMapper.fromFailedGroupConversationToEntity(conversationId)
})
}
}
}

companion object {
const val DEFAULT_MEMBER_ROLE = "wire_member"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,22 @@ internal class NewGroupConversationSystemMessagesCreatorImpl(
status = Message.Status.SENT,
visibility = Message.Visibility.VISIBLE
)
).also { createFailedToAddSystemMessage(conversationResponse) }
}
}

private suspend fun createFailedToAddSystemMessage(conversationResponse: ConversationResponse) {
if (conversationResponse.failedToAdd.isNotEmpty()) {
val messageStartedWithFailedMembers = Message.System(
uuid4().toString(),
MessageContent.MemberChange.FailedToAdd(conversationResponse.failedToAdd.map { it.toModel() }),
conversationResponse.id.toModel(),
DateTimeUtil.currentIsoDateTimeString(),
selfUserId,
Message.Status.SENT,
Message.Visibility.VISIBLE
)
persistMessage(messageStartedWithFailedMembers)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ package com.wire.kalium.logic.data.message

import com.wire.kalium.logger.obfuscateDomain
import com.wire.kalium.logger.obfuscateId
import com.wire.kalium.util.serialization.toJsonElement
import com.wire.kalium.logic.data.conversation.ClientId
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.util.serialization.toJsonElement
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import kotlinx.serialization.encodeToString
Expand Down Expand Up @@ -80,7 +80,8 @@ sealed interface Message {
val editStatus: EditStatus,
val expirationData: ExpirationData? = null,
val reactions: Reactions = Reactions.EMPTY,
val expectsReadConfirmation: Boolean = false
val expectsReadConfirmation: Boolean = false,
val deliveryStatus: DeliveryStatus = DeliveryStatus.CompleteDelivery
) : Sendable, Standalone {

@Suppress("LongMethod")
Expand Down Expand Up @@ -133,7 +134,8 @@ sealed interface Message {
"visibility" to "$visibility",
"senderClientId" to senderClientId.value.obfuscateId(),
"editStatus" to editStatus.toLogMap(),
"expectsReadConfirmation" to "$expectsReadConfirmation"
"expectsReadConfirmation" to "$expectsReadConfirmation",
"deliveryStatus" to "$deliveryStatus"
)

properties.putAll(standardProperties)
Expand Down Expand Up @@ -456,3 +458,12 @@ enum class AssetType {

typealias ReactionsCount = Map<String, Int>
typealias UserReactions = Set<String>

sealed class DeliveryStatus {
data class PartialDelivery(
val recipientsFailedWithNoClients: List<UserId>,
val recipientsFailedDelivery: List<UserId>
) : DeliveryStatus()

object CompleteDelivery : DeliveryStatus()
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import com.wire.kalium.logic.data.notification.LocalNotificationMessageAuthor
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.di.MapperProvider
import com.wire.kalium.persistence.dao.message.AssetTypeEntity
import com.wire.kalium.persistence.dao.message.DeliveryStatusEntity
import com.wire.kalium.persistence.dao.message.MessageEntity
import com.wire.kalium.persistence.dao.message.MessageEntityContent
import com.wire.kalium.persistence.dao.message.MessagePreviewEntity
Expand Down Expand Up @@ -105,6 +106,7 @@ class MessageMapperImpl(
}
}

@Suppress("ComplexMethod")
override fun fromEntityToMessage(message: MessageEntity): Message.Standalone {
val status = when (message.status) {
MessageEntity.Status.PENDING -> Message.Status.PENDING
Expand Down Expand Up @@ -138,7 +140,14 @@ class MessageMapperImpl(
reactions = Message.Reactions(message.reactions.totalReactions, message.reactions.selfUserReactions),
senderUserName = message.senderName,
isSelfMessage = message.isSelfMessage,
expectsReadConfirmation = message.expectsReadConfirmation
expectsReadConfirmation = message.expectsReadConfirmation,
deliveryStatus = when (val recipientsFailure = message.deliveryStatus) {
is DeliveryStatusEntity.CompleteDelivery -> DeliveryStatus.CompleteDelivery
is DeliveryStatusEntity.PartialDelivery -> DeliveryStatus.PartialDelivery(
recipientsFailedWithNoClients = recipientsFailure.recipientsFailedWithNoClients.map { it.toModel() },
recipientsFailedDelivery = recipientsFailure.recipientsFailedDelivery.map { it.toModel() }
)
},
)

is MessageEntity.System -> Message.System(
Expand Down
Loading

0 comments on commit 8cae138

Please sign in to comment.