Skip to content

Commit c5f0a5c

Browse files
committed
fix
1 parent 4e8fa40 commit c5f0a5c

File tree

2 files changed

+6
-7
lines changed

2 files changed

+6
-7
lines changed

src/Simplex/Messaging/Server.hs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1492,7 +1492,7 @@ client
14921492
Cmd SNotifier NSUB -> response . (corrId,entId,) <$> case q_ of
14931493
Just (q, QueueRec {notifier = Just ntfCreds}) ->
14941494
either (pure . ERR) (\_ -> subscribeNotifications q ntfCreds)
1495-
$ batchSubs >>= sequence . M.lookup (recipientId q) . \(_, _, n) -> n
1495+
$ batchSubs >>= \(_, _, ntfQs) -> sequence (M.lookup (recipientId q) ntfQs)
14961496
_ -> pure $ ERR INTERNAL
14971497
Cmd SNotifierService (NSUBS n idsHash) -> response . (corrId,entId,) <$> case clntServiceId of
14981498
Just serviceId -> subscribeServiceNotifications serviceId (n, idsHash)
@@ -1654,7 +1654,7 @@ client
16541654
subscribeQueueAndDeliver msg_ q qr@QueueRec {rcvServiceId} =
16551655
liftIO (TM.lookupIO entId $ subscriptions clnt) >>= \case
16561656
Nothing ->
1657-
deliver =<< sharedSubscribeQueue q SRecipientService rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub) rcvServices
1657+
deliver =<< sharedSubscribeQueue q rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub) rcvServices
16581658
Just s@Sub {subThread} -> do
16591659
stats <- asks serverStats
16601660
case subThread of
@@ -1756,23 +1756,21 @@ client
17561756

17571757
subscribeNotifications :: StoreQueue s -> NtfCreds -> M s BrokerMsg
17581758
subscribeNotifications q NtfCreds {ntfServiceId} = do
1759-
(hasSub, _) <- sharedSubscribeQueue q SNotifierService ntfServiceId ntfSubscribers ntfSubscriptions ntfServiceSubsCount (pure ()) ntfServices
1759+
(hasSub, _) <- sharedSubscribeQueue q ntfServiceId ntfSubscribers ntfSubscriptions ntfServiceSubsCount (pure ()) ntfServices
17601760
when (isNothing clntServiceId) $
17611761
asks serverStats >>= incStat . (if hasSub then ntfSubDuplicate else ntfSub)
17621762
pure $ SOK clntServiceId
17631763

17641764
sharedSubscribeQueue ::
1765-
(PartyI p, ServiceParty p) =>
17661765
StoreQueue s ->
1767-
SParty p ->
17681766
Maybe ServiceId ->
17691767
ServerSubscribers s ->
17701768
(Client s -> TMap QueueId sub) ->
17711769
(Client s -> TVar (Int64, IdsHash)) ->
17721770
STM sub ->
17731771
(ServerStats -> ServiceStats) ->
17741772
M s (Bool, Maybe sub)
1775-
sharedSubscribeQueue q party queueServiceId srvSubscribers clientSubs clientServiceSubs mkSub servicesSel = do
1773+
sharedSubscribeQueue q queueServiceId srvSubscribers clientSubs clientServiceSubs mkSub servicesSel = do
17761774
stats <- asks serverStats
17771775
let incSrvStat sel = incStat $ sel $ servicesSel stats
17781776
writeSub = writeTQueue (subQ srvSubscribers) (CSClient entId queueServiceId clntServiceId, clientId)

src/Simplex/Messaging/Server/QueueStore/Postgres.hs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ import Simplex.Messaging.SystemTime
9191
import Simplex.Messaging.TMap (TMap)
9292
import qualified Simplex.Messaging.TMap as TM
9393
import Simplex.Messaging.Transport (SMPServiceRole (..))
94-
import Simplex.Messaging.Util (eitherToMaybe, firstRow, ifM, maybeFirstRow, maybeFirstRow', tshow, (<$$>))
94+
import Simplex.Messaging.Util (eitherToMaybe, firstRow, ifM, maybeFirstRow, maybeFirstRow', tshow, (<$$>), ($>>=))
9595
import System.Exit (exitFailure)
9696
import System.IO (IOMode (..), hFlush, stdout)
9797
import UnliftIO.STM
@@ -504,6 +504,7 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
504504
atomically $ writeTVar (queueRec sq) $ Just q'
505505
withLog "setQueueService" st $ \sl -> logQueueService sl rId party serviceId
506506

507+
setQueueServices :: (PartyI p, ServiceParty p) => PostgresQueueStore q -> SParty p -> Maybe ServiceId -> [q] -> IO (Either ErrorType (M.Map RecipientId (Either ErrorType ())))
507508
setQueueServices _ _ _ [] = pure $ Right M.empty
508509
setQueueServices st party serviceId qs = E.uninterruptibleMask_ $ runExceptT $ do
509510
updated <- S.fromList <$> withDB' "setQueueServices" st (\db ->

0 commit comments

Comments
 (0)