Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package dev.matheuscruz.domain;

import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import java.time.Instant;

@Entity
@Table(name = "inbound_messages")
public class InboundMessage {

@Id
private String messageId;

private Instant processedAt;

protected InboundMessage() {
}

public InboundMessage(String messageId) {
this.messageId = messageId;
this.processedAt = Instant.now();
}

public String getMessageId() {
return messageId;
}

public Instant getProcessedAt() {
return processedAt;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package dev.matheuscruz.domain;

import io.quarkus.hibernate.orm.panache.PanacheRepositoryBase;
import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class InboundMessageRepository implements PanacheRepositoryBase<InboundMessage, String> {
}
65 changes: 65 additions & 0 deletions timeless-api/src/main/java/dev/matheuscruz/domain/Outbox.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package dev.matheuscruz.domain;

import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import java.time.Instant;

@Entity
@Table(name = "outbox")
public class Outbox {

@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;

@Column(columnDefinition = "TEXT")
private String payload;

private String status;

private Instant createdAt;

private Instant processedAt;

protected Outbox() {
}

public Outbox(String payload) {
this.payload = payload;
this.status = "PENDING";
this.createdAt = Instant.now();
}

public Long getId() {
return id;
}

public String getPayload() {
return payload;
}

public String getStatus() {
return status;
}

public Instant getCreatedAt() {
return createdAt;
}

public Instant getProcessedAt() {
return processedAt;
}

public void markAsSent() {
this.status = "SENT";
this.processedAt = Instant.now();
}

public void markAsFailed() {
this.status = "FAILED";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package dev.matheuscruz.domain;

import io.quarkus.hibernate.orm.panache.PanacheRepository;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.List;

@ApplicationScoped
public class OutboxRepository implements PanacheRepository<Outbox> {

public List<Outbox> findPending() {
return list("status", "PENDING");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package dev.matheuscruz.infra.queue;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import dev.matheuscruz.domain.Outbox;
import dev.matheuscruz.domain.OutboxRepository;
import io.quarkus.scheduler.Scheduled;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;
import java.util.List;
import java.util.UUID;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.jboss.logging.Logger;
import software.amazon.awssdk.services.sqs.SqsClient;

@ApplicationScoped
public class OutboxPoller {

private static final Logger LOGGER = Logger.getLogger(OutboxPoller.class);

final OutboxRepository outboxRepository;
final SqsClient sqs;
final String processedMessagesUrl;
final ObjectMapper objectMapper;

public OutboxPoller(OutboxRepository outboxRepository, SqsClient sqs,
@ConfigProperty(name = "whatsapp.recognized-message.queue-url") String processedMessagesUrl,
ObjectMapper objectMapper) {
this.outboxRepository = outboxRepository;
this.sqs = sqs;
this.processedMessagesUrl = processedMessagesUrl;
this.objectMapper = objectMapper;
}

@Scheduled(every = "10s")
@Transactional
public void poll() {
List<Outbox> pendingMessages = outboxRepository.findPending();
if (pendingMessages.isEmpty()) {
return;
}

LOGGER.infof("Processing %d pending outbox messages", pendingMessages.size());

for (Outbox outbox : pendingMessages) {
try {
sendMessage(outbox);
outbox.markAsSent();
outboxRepository.persist(outbox);
} catch (Exception e) {
LOGGER.error("Failed to send outbox message: " + outbox.getId(), e);
outbox.markAsFailed();
outboxRepository.persist(outbox);
}
}
}

private void sendMessage(Outbox outbox) {
sqs.sendMessage(req -> req.messageBody(outbox.getPayload()).messageGroupId("ProcessedMessages")
.messageDeduplicationId(outbox.getId().toString()).queueUrl(processedMessagesUrl));
}
}
57 changes: 46 additions & 11 deletions timeless-api/src/main/java/dev/matheuscruz/infra/queue/SQS.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import dev.matheuscruz.domain.InboundMessage;
import dev.matheuscruz.domain.InboundMessageRepository;
import dev.matheuscruz.domain.Outbox;
import dev.matheuscruz.domain.OutboxRepository;
import dev.matheuscruz.domain.Record;
import dev.matheuscruz.domain.RecordRepository;
import dev.matheuscruz.domain.User;
Expand Down Expand Up @@ -33,6 +37,8 @@ public class SQS {
final TextAiService aiService;
final RecordRepository recordRepository;
final UserRepository userRepository;
final OutboxRepository outboxRepository;
final InboundMessageRepository inboundMessageRepository;
final Logger logger = Logger.getLogger(SQS.class);

private static final ObjectReader INCOMING_MESSAGE_READER = new ObjectMapper().readerFor(IncomingMessage.class);
Expand All @@ -42,7 +48,8 @@ public class SQS {
public SQS(SqsClient sqs, @ConfigProperty(name = "whatsapp.incoming-message.queue-url") String incomingMessagesUrl,
@ConfigProperty(name = "whatsapp.recognized-message.queue-url") String messagesProcessedUrl,
ObjectMapper objectMapper, TextAiService aiService, RecordRepository recordRepository,
UserRepository userRepository) {
UserRepository userRepository, OutboxRepository outboxRepository,
InboundMessageRepository inboundMessageRepository) {

this.sqs = sqs;
this.incomingMessagesUrl = incomingMessagesUrl;
Expand All @@ -51,6 +58,8 @@ public SQS(SqsClient sqs, @ConfigProperty(name = "whatsapp.incoming-message.queu
this.aiService = aiService;
this.recordRepository = recordRepository;
this.userRepository = userRepository;
this.outboxRepository = outboxRepository;
this.inboundMessageRepository = inboundMessageRepository;
}

@Scheduled(every = "5s")
Expand All @@ -72,6 +81,12 @@ private void processMessage(String body, String receiptHandle) {
return;
}

if (inboundMessageRepository.findById(incomingMessage.messageId()) != null) {
logger.warnf("Message %s already processed. Deleting from queue.", incomingMessage.messageId());
deleteMessageUsing(receiptHandle);
return;
}

handleUserMessage(user.get(), incomingMessage, receiptHandle);
}

Expand Down Expand Up @@ -100,15 +115,24 @@ private void handleUserMessage(User user, IncomingMessage message, String receip
private void processAddTransactionMessage(User user, IncomingMessage message, String receiptHandle,
RecognizedOperation recognizedOperation) throws IOException {
RecognizedTransaction recognizedTransaction = recognizedOperation.recognizedTransaction();
sendProcessedMessage(new TransactionMessageProcessed(AiOperations.ADD_TRANSACTION.commandName(),
message.messageId(), MessageStatus.PROCESSED, user.getPhoneNumber(), recognizedTransaction.withError(),
recognizedTransaction));

Record record = new Record.Builder().userId(user.getId()).amount(recognizedTransaction.amount())
.description(recognizedTransaction.description()).transaction(recognizedTransaction.type())
.category(recognizedTransaction.category()).build();

QuarkusTransaction.requiringNew().run(() -> recordRepository.persist(record));
TransactionMessageProcessed processedMessage = new TransactionMessageProcessed(
AiOperations.ADD_TRANSACTION.commandName(), message.messageId(), MessageStatus.PROCESSED,
user.getPhoneNumber(), recognizedTransaction.withError(), recognizedTransaction);

QuarkusTransaction.requiringNew().run(() -> {
inboundMessageRepository.persist(new InboundMessage(message.messageId()));
recordRepository.persist(record);
try {
saveToOutbox(processedMessage);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
});

deleteMessageUsing(receiptHandle);

Expand All @@ -119,16 +143,27 @@ private void processSimpleMessage(User user, IncomingMessage message, String rec
RecognizedOperation recognizedOperation) throws IOException {
logger.infof("Processing simple message for user %s", recognizedOperation.recognizedTransaction());
SimpleMessage response = new SimpleMessage(recognizedOperation.recognizedTransaction().description());
sendProcessedMessage(new SimpleMessageProcessed(AiOperations.GET_BALANCE.commandName(), message.messageId(),
MessageStatus.PROCESSED, user.getPhoneNumber(), response));

SimpleMessageProcessed processedMessage = new SimpleMessageProcessed(AiOperations.GET_BALANCE.commandName(),
message.messageId(), MessageStatus.PROCESSED, user.getPhoneNumber(), response);

QuarkusTransaction.requiringNew().run(() -> {
inboundMessageRepository.persist(new InboundMessage(message.messageId()));
try {
saveToOutbox(processedMessage);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
});

deleteMessageUsing(receiptHandle);
logger.infof("Message %s processed as GET_BALANCE", message.messageId());
}

private void sendProcessedMessage(Object processedMessage) throws JsonProcessingException {
String messageBody = objectMapper.writeValueAsString(processedMessage);
sqs.sendMessage(req -> req.messageBody(messageBody).messageGroupId("ProcessedMessages")
.messageDeduplicationId(UUID.randomUUID().toString()).queueUrl(processedMessagesUrl));
private void saveToOutbox(Object processedMessage) throws JsonProcessingException {
String payload = objectMapper.writeValueAsString(processedMessage);
Outbox outbox = new Outbox(payload);
outboxRepository.persist(outbox);
}

private IncomingMessage parseIncomingMessage(String messageBody) {
Expand Down
Loading