Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Simplex/Messaging/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2037,7 +2037,7 @@ client
labelMyThread $ B.unpack ("client $" <> encode sessionId) <> " deliver/SEND"
-- lookup can be outside of STM transaction,
-- as long as the check that it is the same client is inside.
getSubscribedClient rId (queueSubscribers subscribers) >>= mapM_ deliverIfSame
getSubscribed >>= mapM_ deliverIfSame
deliverIfSame rcv = do
ts <- getSystemSeconds
atomically $ whenM (sameClient rc rcv) $
Expand Down
10 changes: 6 additions & 4 deletions src/Simplex/Messaging/Server/MsgStore/Postgres.hs
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,13 @@ instance MsgStoreClass PostgresMsgStore where
q.status, q.updated_at, q.link_id, q.rcv_service_id,
m.msg_id, m.msg_ts, m.msg_quota, m.msg_ntf_flag, m.msg_body
FROM msg_queues q
LEFT JOIN (
SELECT recipient_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body,
ROW_NUMBER() OVER (PARTITION BY recipient_id ORDER BY message_id ASC) AS row_num
LEFT JOIN LATERAL (
SELECT msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body
FROM messages
) m ON q.recipient_id = m.recipient_id AND m.row_num = 1
WHERE recipient_id = q.recipient_id
ORDER BY message_id ASC
LIMIT 1
) m ON true
WHERE q.rcv_service_id = ? AND q.deleted_at IS NULL;
|]
(Only serviceId)
Expand Down
12 changes: 8 additions & 4 deletions tests/ServerTests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ testServiceDeliverSubscribe =
signSend_ sh aServicePK Nothing ("11", serviceId, SUBS 1 idsHash)
[mId3] <-
fmap catMaybes $
receiveInAnyOrder -- race between SOKS and MSG, clients can handle it
receiveInAnyOrder -- race between SOKS, MSG and ALLS (sndQ and msgQ are separate threads)
sh
[ \case
Resp "11" serviceId' (SOKS n idsHash') -> do
Expand All @@ -731,9 +731,11 @@ testServiceDeliverSubscribe =
rId'' `shouldBe` rId
dec mId3 msg3 `shouldBe` Right "hello 3"
pure $ Just $ Just mId3
_ -> pure Nothing,
\case
Resp "" NoEntity ALLS -> pure $ Just Nothing
_ -> pure Nothing
]
Resp "" NoEntity ALLS <- tGet1 sh
Resp "12" _ OK <- signSendRecv sh rKey ("12", rId, ACK mId3)
Resp "14" _ OK <- signSendRecv h sKey ("14", sId, _SEND "hello 4")
Resp "" _ (Msg mId4 msg4) <- tGet1 sh
Expand Down Expand Up @@ -811,7 +813,7 @@ testServiceUpgradeAndDowngrade =
signSend_ sh aServicePK Nothing ("14", serviceId, SUBS 3 idsHash)
[(rKey3_1, rId3_1, mId3_1), (rKey3_2, rId3_2, mId3_2)] <-
fmap catMaybes $
receiveInAnyOrder -- race between SOKS and MSG, clients can handle it
receiveInAnyOrder -- race between SOKS, MSG and ALLS (sndQ and msgQ are separate threads)
sh
[ \case
Resp "14" serviceId' (SOKS n idsHash') -> do
Expand All @@ -829,9 +831,11 @@ testServiceUpgradeAndDowngrade =
Resp "" rId'' (Msg mId3 msg3) | rId'' == rId2 -> do
dec2 mId3 msg3 `shouldBe` Right "hello 3.2"
pure $ Just $ Just (rKey2, rId2, mId3)
_ -> pure Nothing,
\case
Resp "" NoEntity ALLS -> pure $ Just Nothing
_ -> pure Nothing
]
Resp "" NoEntity ALLS <- tGet1 sh
Resp "15" _ OK <- signSendRecv sh rKey3_1 ("15", rId3_1, ACK mId3_1)
Resp "16" _ OK <- signSendRecv sh rKey3_2 ("16", rId3_2, ACK mId3_2)
pure ()
Expand Down
Loading