Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -407,11 +407,17 @@ private void setupAppLayout() {
revokeConsentButton.setOnClickListener(v -> togglePrivacyConsent(false));

loginUserButton.setOnClickListener(v -> {
dialog.createUpdateAlertDialog("", Dialog.DialogAction.LOGIN, ProfileUtil.FieldType.EXTERNAL_USER_ID, new UpdateAlertDialogCallback() {
dialog.createAddPairAlertDialog("Login User", "External ID", "JWT Token (optional)", ProfileUtil.FieldType.EXTERNAL_USER_ID, new AddPairAlertDialogCallback() {
@Override
public void onSuccess(String update) {
if (update != null && !update.isEmpty()) {
OneSignal.login(update);
public void onSuccess(Pair<String, Object> pair) {
String externalId = pair.first;
String jwt = pair.second != null ? pair.second.toString().trim() : "";
if (externalId != null && !externalId.isEmpty()) {
if (!jwt.isEmpty()) {
OneSignal.login(externalId, jwt);
} else {
OneSignal.login(externalId);
}
refreshState();
}
}
Expand Down Expand Up @@ -444,11 +450,13 @@ private void setupJWTLayout() {
OneSignal.updateUserJwt(OneSignal.getUser().getExternalId(), "");
});
updateJwtButton.setOnClickListener(v -> {
dialog.createUpdateAlertDialog("", Dialog.DialogAction.UPDATE, ProfileUtil.FieldType.JWT, new UpdateAlertDialogCallback() {
dialog.createAddPairAlertDialog("Update JWT", "External ID", "JWT Token", ProfileUtil.FieldType.EXTERNAL_USER_ID, new AddPairAlertDialogCallback() {
@Override
public void onSuccess(String update) {
if (update != null && !update.isEmpty()) {
OneSignal.updateUserJwt(OneSignal.getUser().getExternalId(), update);
public void onSuccess(Pair<String, Object> pair) {
String externalId = pair.first;
String jwt = pair.second != null ? pair.second.toString().trim() : "";
if (externalId != null && !externalId.isEmpty() && !jwt.isEmpty()) {
OneSignal.updateUserJwt(externalId, jwt);
refreshState();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ private void toggleUpdateAlertDialogAttributes(boolean disableAttributes) {
* Click OK to verify and update the field being updated
*/
public void createAddPairAlertDialog(String content, final ProfileUtil.FieldType field, final AddPairAlertDialogCallback callback) {
createAddPairAlertDialog(content, "Key", "Value", field, callback);
}

public void createAddPairAlertDialog(String content, String keyHint, String valueHint, final ProfileUtil.FieldType field, final AddPairAlertDialogCallback callback) {
final View addPairAlertDialogView = layoutInflater.inflate(R.layout.add_pair_alert_dialog_layout, null, false);

final TextView addPairAlertDialogTitleTextView = addPairAlertDialogView.findViewById(R.id.add_pair_alert_dialog_title_text_view);
Expand All @@ -142,8 +146,8 @@ public void createAddPairAlertDialog(String content, final ProfileUtil.FieldType
final EditText addPairAlertDialogValueEditText = addPairAlertDialogView.findViewById(R.id.add_pair_alert_dialog_value_edit_text);
final ProgressBar addPairAlertDialogProgressBar = addPairAlertDialogView.findViewById(R.id.add_pair_alert_dialog_progress_bar);

addPairAlertDialogKeyTextInputLayout.setHint("Key");
addPairAlertDialogValueTextInputLayout.setHint("Value");
addPairAlertDialogKeyTextInputLayout.setHint(keyHint);
addPairAlertDialogValueTextInputLayout.setHint(valueHint);
addPairAlertDialogTitleTextView.setText(content);

font.applyFont(addPairAlertDialogTitleTextView, font.saralaBold);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.onesignal.core.internal.operations

import com.onesignal.IUserJwtInvalidatedListener
import kotlin.reflect.KClass

/**
Expand Down Expand Up @@ -42,6 +43,19 @@ interface IOperationRepo {
suspend fun awaitInitialized()

fun forceExecuteOperations()

/**
* Update the JWT on all queued operations for the given external ID.
* Resets the unauthorized retry counter and wakes the queue processor.
*/
fun updateJwtForExternalId(
externalId: String,
jwt: String,
)

fun addUserJwtInvalidatedListener(listener: IUserJwtInvalidatedListener)

fun removeUserJwtInvalidatedListener(listener: IUserJwtInvalidatedListener)
}

// Extension function so the syntax containsInstanceOf<Operation>() can be used over
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.onesignal.core.internal.operations

import com.onesignal.common.modeling.Model
import com.onesignal.common.modeling.ModelChangeTags

/**
* An [Operation] can be enqueued and executed on the [IOperationRepo]. Each concrete-class
Expand Down Expand Up @@ -49,6 +50,34 @@ abstract class Operation(name: String) : Model() {
*/
abstract val canStartExecute: Boolean

/**
* The JWT token stamped on this operation at enqueue time. Used by executors to authenticate
* backend requests for the user who created this operation, even if the current user has
* changed since then.
*/
var operationJwt: String?
get() = getOptStringProperty("_jwt")
set(value) {
setOptStringProperty("_jwt", value, ModelChangeTags.NO_PROPOGATE, true)
}

/**
* The external ID of the user who created this operation, stamped at enqueue time.
* Used to associate operations with specific users for multi-user JWT management.
*/
var operationExternalId: String?
get() = getOptStringProperty("_externalId")
set(value) {
setOptStringProperty("_externalId", value, ModelChangeTags.NO_PROPOGATE, true)
}

/**
* Whether this operation requires JWT authentication when identity verification is enabled.
* Most operations require JWT; override to return false for operations that don't
* (e.g. UpdateSubscriptionOperation).
*/
open val requiresJwt: Boolean get() = true

/**
* Called when an operation has resolved a local ID to a backend ID (i.e. successfully
* created a backend resource). Any IDs within the operation that could be local IDs should
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
package com.onesignal.core.internal.operations.impl

import com.onesignal.IUserJwtInvalidatedListener
import com.onesignal.UserJwtInvalidatedEvent
import com.onesignal.common.events.EventProducer
import com.onesignal.common.modeling.ISingletonModelStoreChangeHandler
import com.onesignal.common.modeling.ModelChangedArgs
import com.onesignal.common.threading.OSPrimaryCoroutineScope
import com.onesignal.common.threading.WaiterWithValue
import com.onesignal.core.internal.config.ConfigModel
import com.onesignal.core.internal.config.ConfigModelStore
import com.onesignal.core.internal.operations.ExecutionResult
import com.onesignal.core.internal.operations.GroupComparisonType
Expand Down Expand Up @@ -31,15 +37,20 @@ internal class OperationRepo(
private val _identityModelStore: IdentityModelStore,
private val _time: ITime,
private val _newRecordState: NewRecordsState,
) : IOperationRepo, IStartableService {
) : IOperationRepo, IStartableService, ISingletonModelStoreChangeHandler<ConfigModel> {
companion object {
private const val FAIL_UNAUTHORIZED_MAX_RETRIES = 3
}

internal class OperationQueueItem(
val operation: Operation,
val waiter: WaiterWithValue<Boolean>? = null,
val bucket: Int,
var retries: Int = 0,
var unauthorizedRetries: Int = 0,
) {
override fun toString(): String {
return "bucket:$bucket, retries:$retries, operation:$operation\n"
return "bucket:$bucket, retries:$retries, unauthorizedRetries:$unauthorizedRetries, operation:$operation\n"
}
}

Expand All @@ -55,6 +66,7 @@ internal class OperationRepo(
private var paused = false
private var coroutineScope = CoroutineScope(newSingleThreadContext(name = "OpRepo"))
private val initialized = CompletableDeferred<Unit>()
val jwtInvalidatedCallback = EventProducer<IUserJwtInvalidatedListener>()

override suspend fun awaitInitialized() {
initialized.await()
Expand Down Expand Up @@ -97,13 +109,28 @@ internal class OperationRepo(
}

override fun start() {
_configModelStore.subscribe(this)
coroutineScope.launch {
// load saved operations first then start processing the queue to ensure correct operation order
loadSavedOperations()
processQueueForever()
}
}

override fun onModelReplaced(
model: ConfigModel,
tag: String,
) {
if (model.isInitializedWithRemote) {
waiter.wake(LoopWaiterMessage(false))
}
}

override fun onModelUpdated(
args: ModelChangedArgs,
tag: String,
) { }

/**
* Enqueuing will be performed in a designate coroutine and may not be added instantly.
* This is to prevent direct enqueuing from the main thread that may cause a deadlock if loading
Expand Down Expand Up @@ -147,6 +174,11 @@ internal class OperationRepo(
addToStore: Boolean,
index: Int? = null,
) {
if (addToStore) {
queueItem.operation.operationJwt = _identityModelStore.model.jwtToken
queueItem.operation.operationExternalId = _identityModelStore.model.externalId
}

synchronized(queue) {
val hasExisting = queue.any { it.operation.id == queueItem.operation.id }
if (hasExisting) {
Expand Down Expand Up @@ -262,12 +294,26 @@ internal class OperationRepo(
ops.forEach { it.waiter?.wake(true) }
}
ExecutionResult.FAIL_UNAUTHORIZED -> {
Logging.error("Operation execution failed with invalid jwt")
_identityModelStore.invalidateJwt()
val externalId = ops.first().operation.operationExternalId
Logging.error("Operation execution failed with invalid jwt for externalId: $externalId")

// add back all operations to the front of the queue to be re-executed.
synchronized(queue) {
ops.reversed().forEach { queue.add(0, it) }
ops.reversed().forEach {
it.unauthorizedRetries++
if (it.unauthorizedRetries > FAIL_UNAUTHORIZED_MAX_RETRIES) {
Logging.error("Dropping operation after $FAIL_UNAUTHORIZED_MAX_RETRIES unauthorized retries: ${it.operation}")
_operationModelStore.remove(it.operation.id)
it.waiter?.wake(false)
} else {
it.operation.operationJwt = null
_operationModelStore.persist()
queue.add(0, it)
}
}
}

if (externalId != null) {
jwtInvalidatedCallback.fire { it.onUserJwtInvalidated(UserJwtInvalidatedEvent(externalId)) }
}
}
ExecutionResult.FAIL_NORETRY,
Expand Down Expand Up @@ -313,9 +359,12 @@ internal class OperationRepo(
// if there are operations provided on the result, we need to enqueue them at the
// beginning of the queue.
if (response.operations != null) {
val startingOp = ops.first().operation
synchronized(queue) {
for (op in response.operations.reversed()) {
op.id = UUID.randomUUID().toString()
op.operationJwt = startingOp.operationJwt
op.operationExternalId = startingOp.operationExternalId
val queueItem = OperationQueueItem(op, bucket = 0)
queue.add(0, queueItem)
_operationModelStore.add(0, queueItem.operation)
Expand Down Expand Up @@ -374,18 +423,33 @@ internal class OperationRepo(

internal fun getNextOps(bucketFilter: Int): List<OperationQueueItem>? {
return synchronized(queue) {
// Ensure the operation does not have empty JWT if identity verification is on
if (_configModelStore.model.useIdentityVerification &&
_identityModelStore.model.jwtToken == null
) {
Logging.debug("getNextOps queue:\n$queue")

if (!_configModelStore.model.isInitializedWithRemote) {
Logging.debug("getNextOps: waiting for remote params")
return null
}

val useIV = _configModelStore.model.useIdentityVerification
if (useIV) {
val toDiscard =
queue.filter {
it.operation.operationExternalId == null && it.operation.requiresJwt
}
for (item in toDiscard) {
Logging.debug("getNextOps: discarding anonymous op: ${item.operation}")
queue.remove(item)
_operationModelStore.remove(item.operation.id)
item.waiter?.wake(false)
}
}

val startingOp =
queue.firstOrNull {
it.operation.canStartExecute &&
_newRecordState.canAccess(it.operation.applyToRecordId) &&
it.bucket <= bucketFilter
it.bucket <= bucketFilter &&
(!useIV || !it.operation.requiresJwt || it.operation.operationJwt != null)
}

if (startingOp != null) {
Expand Down Expand Up @@ -443,6 +507,30 @@ internal class OperationRepo(
return ops
}

override fun addUserJwtInvalidatedListener(listener: IUserJwtInvalidatedListener) {
jwtInvalidatedCallback.subscribe(listener)
}

override fun removeUserJwtInvalidatedListener(listener: IUserJwtInvalidatedListener) {
jwtInvalidatedCallback.unsubscribe(listener)
}

override fun updateJwtForExternalId(
externalId: String,
jwt: String,
) {
synchronized(queue) {
for (item in queue) {
if (item.operation.operationExternalId == externalId) {
item.operation.operationJwt = jwt
item.unauthorizedRetries = 0
}
}
}
_operationModelStore.persist()
waiter.wake(LoopWaiterMessage(false))
}

/**
* Load saved operations from preference service and add them into the queue
* NOTE: Sometimes the loading might take longer than expected due to I/O reads from disk,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,21 +451,20 @@ internal class OneSignalImp : IOneSignal, IServiceProvider {
for (model in identityModelStore!!.store.list()) {
if (externalId == model.externalId) {
identityModelStore!!.model.jwtToken = token
operationRepo!!.forceExecuteOperations()
Logging.log(LogLevel.DEBUG, "JWT $token is updated for externalId $externalId")
return
break
}
}

Logging.log(LogLevel.DEBUG, "No identity found for externalId $externalId")
operationRepo!!.updateJwtForExternalId(externalId, token)
}

override fun addUserJwtInvalidatedListener(listener: IUserJwtInvalidatedListener) {
user.addUserJwtInvalidatedListener(listener)
operationRepo!!.addUserJwtInvalidatedListener(listener)
}

override fun removeUserJwtInvalidatedListener(listener: IUserJwtInvalidatedListener) {
user.removeUserJwtInvalidatedListener(listener)
operationRepo!!.removeUserJwtInvalidatedListener(listener)
}

private fun createAndSwitchToNewUser(
Expand Down
Loading
Loading