Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import io.getstream.chat.android.client.extensions.ATTACHMENT_TYPE_IMAGE
import io.getstream.chat.android.client.extensions.cidToTypeAndId
import io.getstream.chat.android.client.extensions.extractBaseUrl
import io.getstream.chat.android.client.extensions.getCreatedAtOrNull
import io.getstream.chat.android.client.extensions.internal.isLaterThanDays
import io.getstream.chat.android.client.header.VersionPrefixHeader
import io.getstream.chat.android.client.helpers.AppSettingManager
Expand Down Expand Up @@ -157,6 +158,7 @@
import io.getstream.chat.android.client.user.storage.UserCredentialStorage
import io.getstream.chat.android.client.utils.ProgressCallback
import io.getstream.chat.android.client.utils.TokenUtils
import io.getstream.chat.android.client.utils.internal.ServerClockOffset
import io.getstream.chat.android.client.utils.mergePartially
import io.getstream.chat.android.client.utils.message.ensureId
import io.getstream.chat.android.client.utils.observable.ChatEventsObservable
Expand Down Expand Up @@ -286,6 +288,7 @@
@InternalStreamChatApi
public val audioPlayer: AudioPlayer,
private val now: () -> Date = ::Date,
private val serverClockOffset: ServerClockOffset,
private val repository: ChatClientRepository,
private val messageReceiptReporter: MessageReceiptReporter,
internal val messageReceiptManager: MessageReceiptManager,
Expand Down Expand Up @@ -2588,16 +2591,34 @@

/**
* Ensure the message has a [Message.createdLocallyAt] timestamp.
* If not, set it to the max of the channel's [Channel.lastMessageAt] + 1 millisecond and [now].
* This ensures that the message appears in the correct order in the channel.
* If not, set it to the max of the channel's [Channel.lastMessageAt] + 1 millisecond and the
* estimated server time. Using estimated server time (instead of raw local clock) prevents
* cross-user ordering issues when the device clock is skewed.
*/
private suspend fun Message.ensureCreatedLocallyAt(cid: String): Message {
val lastMessageAt = repositoryFacade.selectChannel(cid = cid)?.lastMessageAt
val lastMessageAtPlusOneMillisecond = lastMessageAt?.let {
Date(it.time + 1)
val parentId = this.parentId
if (parentId != null) {

Check warning on line 2600 in stream-chat-android-client/src/main/java/io/getstream/chat/android/client/ChatClient.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Move "return" statements from all branches before "if" statement.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-chat-android&issues=AZzCutSGQz_sVbjk2UkD&open=AZzCutSGQz_sVbjk2UkD&pullRequest=6199
// Thread reply
val lastMessage = repositoryFacade.selectMessagesForThread(parentId, limit = 1).lastOrNull()
val lastMessageAt = lastMessage?.getCreatedAtOrNull()
val lastMessageAtPlusOneMillisecond = lastMessageAt?.let {
Date(it.time + 1)
}
val createdLocallyAt = max(lastMessageAtPlusOneMillisecond, serverClockOffset.estimatedServerTime())
return copy(createdLocallyAt = this.createdLocallyAt ?: createdLocallyAt)
} else {
// Regular message
val (type, id) = cid.cidToTypeAndId()
// Fetch channel lastMessageAt from state, fallback to offline storage
val channelState = logicRegistry?.channelStateLogic(type, id)?.listenForChannelState()
val lastMessageAt = channelState?.channelData?.value?.lastMessageAt
?: repositoryFacade.selectChannel(cid = cid)?.lastMessageAt
val lastMessageAtPlusOneMillisecond = lastMessageAt?.let {
Date(it.time + 1)
}
val createdLocallyAt = max(lastMessageAtPlusOneMillisecond, serverClockOffset.estimatedServerTime())
return copy(createdLocallyAt = this.createdLocallyAt ?: createdLocallyAt)
}
val createdLocallyAt = max(lastMessageAtPlusOneMillisecond, now())
return copy(createdLocallyAt = this.createdLocallyAt ?: createdLocallyAt)
}

/**
Expand Down Expand Up @@ -5037,6 +5058,8 @@
warmUpReflection()
}

val serverClockOffset = ServerClockOffset()

val module =
ChatModule(
appContext = appContext,
Expand All @@ -5055,6 +5078,7 @@
lifecycle = lifecycle,
appName = this.appName,
appVersion = this.appVersion,
serverClockOffset = serverClockOffset,
)

val api = module.api()
Expand Down Expand Up @@ -5091,6 +5115,7 @@
retryPolicy = retryPolicy,
appSettingsManager = appSettingsManager,
chatSocket = module.chatSocket,
serverClockOffset = serverClockOffset,
pluginFactories = pluginFactories,
repositoryFactoryProvider = repositoryFactoryProvider
?: pluginFactories
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ import io.getstream.chat.android.client.uploader.FileUploader
import io.getstream.chat.android.client.uploader.StreamFileUploader
import io.getstream.chat.android.client.user.CurrentUserFetcher
import io.getstream.chat.android.client.utils.HeadersUtil
import io.getstream.chat.android.client.utils.internal.ServerClockOffset
import io.getstream.chat.android.models.UserId
import io.getstream.log.StreamLog
import okhttp3.Interceptor
Expand Down Expand Up @@ -116,6 +117,7 @@ import java.util.concurrent.TimeUnit
* @param lifecycle Host [Lifecycle] used to observe app foreground/background and manage socket behavior.
* @param appName Optional app name added to default headers for tracking.
* @param appVersion Optional app version added to default headers for tracking.
* @param serverClockOffset Shared clock-offset tracker used by the socket layer for time synchronisation.
*/
@Suppress("TooManyFunctions")
internal class ChatModule
Expand All @@ -137,6 +139,7 @@ constructor(
private val lifecycle: Lifecycle,
private val appName: String?,
private val appVersion: String?,
private val serverClockOffset: ServerClockOffset,
) {

private val headersUtil = HeadersUtil(appContext, appName, appVersion)
Expand Down Expand Up @@ -311,6 +314,7 @@ constructor(
lifecycleObserver,
networkStateProvider,
clientDebugger,
serverClockOffset,
)

private fun buildApi(chatConfig: ChatClientConfig): ChatApi = ProxyChatApi(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import io.getstream.chat.android.client.network.NetworkStateProvider
import io.getstream.chat.android.client.scope.UserScope
import io.getstream.chat.android.client.socket.ChatSocketStateService.State
import io.getstream.chat.android.client.token.TokenManager
import io.getstream.chat.android.client.utils.internal.ServerClockOffset
import io.getstream.chat.android.core.internal.coroutines.DispatcherProvider
import io.getstream.chat.android.models.User
import io.getstream.log.taggedLogger
Expand All @@ -52,6 +53,7 @@ internal open class ChatSocket(
private val lifecycleObserver: StreamLifecycleObserver,
private val networkStateProvider: NetworkStateProvider,
private val clientDebugger: ChatClientDebugger? = null,
private val serverClockOffset: ServerClockOffset,
) {
private var streamWebSocket: StreamWebSocket? = null
private val logger by taggedLogger(TAG)
Expand All @@ -61,7 +63,13 @@ internal open class ChatSocket(
private var socketStateObserverJob: Job? = null
private val healthMonitor = HealthMonitor(
userScope = userScope,
checkCallback = { (chatSocketStateService.currentState as? State.Connected)?.event?.let(::sendEvent) },
checkCallback = {
(chatSocketStateService.currentState as? State.Connected)?.event?.let {
if (sendEvent(it)) {
serverClockOffset.onHealthCheckSent()
}
}
},
reconnectCallback = { chatSocketStateService.onWebSocketEventLost() },
)
private val lifecycleHandler = object : LifecycleHandler {
Expand All @@ -84,6 +92,7 @@ internal open class ChatSocket(
socketListenerJob?.cancel()
when (networkStateProvider.isConnected()) {
true -> {
serverClockOffset.onConnectionStarted()
streamWebSocket = socketFactory.createSocket(connectionConf).apply {
socketListenerJob = listen().onEach {
when (it) {
Expand Down Expand Up @@ -194,8 +203,14 @@ internal open class ChatSocket(

private suspend fun handleEvent(chatEvent: ChatEvent) {
when (chatEvent) {
is ConnectedEvent -> chatSocketStateService.onConnectionEstablished(chatEvent)
is HealthEvent -> healthMonitor.ack()
is ConnectedEvent -> {
serverClockOffset.onConnected(chatEvent.createdAt)
chatSocketStateService.onConnectionEstablished(chatEvent)
}
is HealthEvent -> {
serverClockOffset.onHealthCheck(chatEvent.createdAt)
healthMonitor.ack()
}
else -> callListeners { listener -> listener.onEvent(chatEvent) }
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Copyright (c) 2014-2026 Stream.io Inc. All rights reserved.
*
* Licensed under the Stream License;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://github.com/GetStream/stream-chat-android/blob/main/LICENSE
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.getstream.chat.android.client.utils.internal

import io.getstream.chat.android.client.events.ConnectedEvent
import io.getstream.chat.android.client.events.HealthEvent
import java.util.Date

/**
* Tracks the offset between the local device clock and the server clock using
* NTP-style estimation from WebSocket health check round-trips.
*
* The algorithm keeps only the sample with the lowest observed RTT, since a
* smaller round-trip means less room for network asymmetry to distort the
* measurement. Under the assumption that clock skew is constant for the
* duration of a session, the estimate monotonically improves over time.
*
* Thread-safe: single-field writes use [Volatile] for visibility; compound
* read-modify-write sequences are guarded by [lock] for atomicity.
*
* @param localTimeMs Clock source for the local device time (injectable for tests).
* @param maxRttMs Upper bound on plausible RTT. Samples exceeding this are
* discarded as stale or mismatched. Defaults to the health check cycle
* interval (MONITOR_INTERVAL + HEALTH_CHECK_INTERVAL = 11 000 ms).
*/
internal class ServerClockOffset(
private val localTimeMs: () -> Long = { System.currentTimeMillis() },
private val maxRttMs: Long = DEFAULT_MAX_RTT_MS,
) {

private val lock = Any()

@Volatile
private var offsetMs: Long = 0L

@Volatile
private var bestRttMs: Long = Long.MAX_VALUE

@Volatile
private var healthCheckSentAtMs: Long = 0L

@Volatile
private var connectionStartedAtMs: Long = 0L

/**
* Record the local time immediately before starting a WebSocket connection.
* When the next [ConnectedEvent] arrives, [onConnected] will pair with this
* timestamp to compute the offset using the NTP midpoint formula.
*/
internal fun onConnectionStarted() {
connectionStartedAtMs = localTimeMs()
}

/**
* Record the local time immediately before sending a health check echo.
* The next [onHealthCheck] call will pair with this timestamp to compute RTT.
*/
internal fun onHealthCheckSent() {
healthCheckSentAtMs = localTimeMs()
}

/**
* Calibration from a [ConnectedEvent].
*
* If [onConnectionStarted] was called before this connection (e.g. right before
* opening the WebSocket), uses the NTP midpoint of (connectionStartedAt, receivedAt)
* and serverTime for a more accurate offset. Otherwise falls back to a naive
* `localTime - serverTime` estimate.
*
* Resets health check state, since a new connection means any in-flight health
* check from the previous connection is stale.
*/
internal fun onConnected(serverTime: Date) {
synchronized(lock) {
bestRttMs = Long.MAX_VALUE
healthCheckSentAtMs = 0L

val receivedAtMs = localTimeMs()
val startedAtMs = connectionStartedAtMs
connectionStartedAtMs = 0L

if (startedAtMs > 0L) {
val rtt = receivedAtMs - startedAtMs
if (rtt in 1..maxRttMs) {
offsetMs = (startedAtMs + receivedAtMs) / 2 - serverTime.time
bestRttMs = rtt
return
}
}
offsetMs = receivedAtMs - serverTime.time
}
}

/**
* Refine the offset using a [HealthEvent] paired with [onHealthCheckSent].
*
* Computes RTT from the stored send time and the current receive time,
* then applies the NTP midpoint formula:
* ```
* offset = (sentAt + receivedAt) / 2 - serverTime
* ```
*
* The sample is accepted only if:
* - There is a pending [onHealthCheckSent] timestamp.
* - RTT is positive (guards against clock anomalies).
* - RTT is below [maxRttMs] (rejects stale / mismatched pairs).
* - RTT is lower than any previous sample (min-RTT selection).
*/
internal fun onHealthCheck(serverTime: Date) {
synchronized(lock) {
val sentAtMs = healthCheckSentAtMs
if (sentAtMs <= 0L) return
healthCheckSentAtMs = 0L

val receivedAtMs = localTimeMs()
val rtt = receivedAtMs - sentAtMs
if (rtt !in 1..maxRttMs) return

if (rtt < bestRttMs) {
bestRttMs = rtt
offsetMs = (sentAtMs + receivedAtMs) / 2 - serverTime.time
}
}
}

/**
* Returns the current time adjusted to the server timescale.
*
* Before the first [onConnected] call, this returns the raw local time
* (offset = 0).
*/
internal fun estimatedServerTime(): Date =
Date(localTimeMs() - offsetMs)

internal companion object {
internal const val DEFAULT_MAX_RTT_MS = 11_000L
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import io.getstream.chat.android.client.token.FakeTokenManager
import io.getstream.chat.android.client.user.CredentialConfig
import io.getstream.chat.android.client.user.storage.UserCredentialStorage
import io.getstream.chat.android.client.utils.TokenUtils
import io.getstream.chat.android.client.utils.internal.ServerClockOffset
import io.getstream.chat.android.models.ConnectionData
import io.getstream.chat.android.models.EventType
import io.getstream.chat.android.models.GuestUser
Expand Down Expand Up @@ -126,6 +127,7 @@ internal class ChatClientConnectionTests {
retryPolicy = mock(),
appSettingsManager = mock(),
chatSocket = fakeChatSocket,
serverClockOffset = ServerClockOffset(),
pluginFactories = emptyList(),
repositoryFactoryProvider = NoOpRepositoryFactory.Provider,
mutableClientState = mutableClientState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import io.getstream.chat.android.client.scope.UserTestScope
import io.getstream.chat.android.client.socket.FakeChatSocket
import io.getstream.chat.android.client.token.FakeTokenManager
import io.getstream.chat.android.client.utils.TokenUtils
import io.getstream.chat.android.client.utils.internal.ServerClockOffset
import io.getstream.chat.android.client.utils.retry.NoRetryPolicy
import io.getstream.chat.android.models.ConnectionState
import io.getstream.chat.android.models.EventType
Expand Down Expand Up @@ -138,6 +139,7 @@ internal class ChatClientTest {
retryPolicy = NoRetryPolicy(),
appSettingsManager = mock(),
chatSocket = fakeChatSocket,
serverClockOffset = ServerClockOffset(),
pluginFactories = emptyList(),
mutableClientState = Mother.mockedClientState(),
repositoryFactoryProvider = NoOpRepositoryFactory.Provider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import io.getstream.chat.android.client.plugin.factory.PluginFactory
import io.getstream.chat.android.client.scope.ClientTestScope
import io.getstream.chat.android.client.scope.UserTestScope
import io.getstream.chat.android.client.setup.state.internal.MutableClientState
import io.getstream.chat.android.client.utils.internal.ServerClockOffset
import io.getstream.chat.android.core.internal.InternalStreamChatApi
import io.getstream.chat.android.models.InitializationState
import io.getstream.chat.android.models.NoOpMessageTransformer
Expand Down Expand Up @@ -174,6 +175,7 @@ public class DependencyResolverTest {
retryPolicy = mock(),
appSettingsManager = mock(),
chatSocket = mock(),
serverClockOffset = ServerClockOffset(),
pluginFactories = pluginFactories,
repositoryFactoryProvider = mock(),
mutableClientState = mutableClientState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import io.getstream.chat.android.client.setup.state.internal.MutableClientState
import io.getstream.chat.android.client.token.FakeTokenManager
import io.getstream.chat.android.client.uploader.FileUploader
import io.getstream.chat.android.client.utils.TokenUtils
import io.getstream.chat.android.client.utils.internal.ServerClockOffset
import io.getstream.chat.android.client.utils.retry.NoRetryPolicy
import io.getstream.chat.android.models.EventType
import io.getstream.chat.android.models.NoOpMessageTransformer
Expand Down Expand Up @@ -121,6 +122,7 @@ internal class MockClientBuilder(
retryPolicy = NoRetryPolicy(),
appSettingsManager = mock(),
chatSocket = mock(),
serverClockOffset = ServerClockOffset(),
pluginFactories = emptyList(),
repositoryFactoryProvider = NoOpRepositoryFactory.Provider,
mutableClientState = mutableClientState,
Expand Down
Loading
Loading