@@ -1654,9 +1654,7 @@ client
16541654 subscribeQueueAndDeliver msg_ q qr@ QueueRec {rcvServiceId} =
16551655 liftIO (TM. lookupIO entId $ subscriptions clnt) >>= \ case
16561656 Nothing ->
1657- sharedSubscribeQueue q SRecipientService rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub ) rcvServices >>= \ case
1658- Left e -> pure (err e, Nothing )
1659- Right s -> deliver s
1657+ deliver =<< sharedSubscribeQueue q SRecipientService rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub ) rcvServices
16601658 Just s@ Sub {subThread} -> do
16611659 stats <- asks serverStats
16621660 case subThread of
@@ -1757,13 +1755,11 @@ client
17571755 else liftIO (updateQueueTime (queueStore ms) q t) >>= either (pure . err') (action q)
17581756
17591757 subscribeNotifications :: StoreQueue s -> NtfCreds -> M s BrokerMsg
1760- subscribeNotifications q NtfCreds {ntfServiceId} =
1761- sharedSubscribeQueue q SNotifierService ntfServiceId ntfSubscribers ntfSubscriptions ntfServiceSubsCount (pure () ) ntfServices >>= \ case
1762- Left e -> pure $ ERR e
1763- Right (hasSub, _) -> do
1764- when (isNothing clntServiceId) $
1765- asks serverStats >>= incStat . (if hasSub then ntfSubDuplicate else ntfSub)
1766- pure $ SOK clntServiceId
1758+ subscribeNotifications q NtfCreds {ntfServiceId} = do
1759+ (hasSub, _) <- sharedSubscribeQueue q SNotifierService ntfServiceId ntfSubscribers ntfSubscriptions ntfServiceSubsCount (pure () ) ntfServices
1760+ when (isNothing clntServiceId) $
1761+ asks serverStats >>= incStat . (if hasSub then ntfSubDuplicate else ntfSub)
1762+ pure $ SOK clntServiceId
17671763
17681764 sharedSubscribeQueue ::
17691765 (PartyI p , ServiceParty p ) =>
@@ -1775,7 +1771,7 @@ client
17751771 (Client s -> TVar (Int64 , IdsHash )) ->
17761772 STM sub ->
17771773 (ServerStats -> ServiceStats ) ->
1778- M s (Either ErrorType ( Bool , Maybe sub ) )
1774+ M s (Bool , Maybe sub )
17791775 sharedSubscribeQueue q party queueServiceId srvSubscribers clientSubs clientServiceSubs mkSub servicesSel = do
17801776 stats <- asks serverStats
17811777 let incSrvStat sel = incStat $ sel $ servicesSel stats
@@ -1790,33 +1786,32 @@ client
17901786 incSrvStat srvSubCount
17911787 incSrvStat srvSubQueues
17921788 incSrvStat srvAssocDuplicate
1793- pure $ Right (hasSub, Nothing )
1794- | otherwise -> runExceptT $ do
1795- -- association already done in prepareBatch
1796- hasSub <- atomically $ (<$ incServiceQueueSubs) =<< hasServiceSub
1797- atomically writeSub
1798- liftIO $ do
1799- unless hasSub $ incSrvStat srvSubCount
1800- incSrvStat srvSubQueues
1801- incSrvStat $ maybe srvAssocNew (const srvAssocUpdated) queueServiceId
1802- pure (hasSub, Nothing )
1789+ pure (hasSub, Nothing )
1790+ | otherwise -> do
1791+ -- association already done in prepareBatchSubs
1792+ hasSub <- atomically $ (<$ incServiceQueueSubs) =<< hasServiceSub
1793+ atomically writeSub
1794+ unless hasSub $ incSrvStat srvSubCount
1795+ incSrvStat srvSubQueues
1796+ incSrvStat $ maybe srvAssocNew (const srvAssocUpdated) queueServiceId
1797+ pure (hasSub, Nothing )
18031798 where
18041799 hasServiceSub = ((0 /= ) . fst ) <$> readTVar (clientServiceSubs clnt)
18051800 -- This function is used when queue association with the service is created.
18061801 incServiceQueueSubs = modifyTVar' (clientServiceSubs clnt) $ addServiceSubs (1 , queueIdHash (recipientId q)) -- service count and IDS hash
18071802 Nothing -> case queueServiceId of
1808- Just _ -> runExceptT $ do
1809- -- unassociation already done in prepareBatch
1810- liftIO $ incSrvStat srvAssocRemoved
1811- -- getSubscription may be Just for receiving service, where clientSubs also hold active deliveries for service subscriptions.
1812- -- For notification service it can only be Just if storage and session states diverge.
1813- r <- atomically $ getSubscription >>= newSub
1814- atomically writeSub
1815- pure r
1803+ Just _ -> do
1804+ -- unassociation already done in prepareBatchSubs
1805+ incSrvStat srvAssocRemoved
1806+ -- getSubscription may be Just for receiving service, where clientSubs also hold active deliveries for service subscriptions.
1807+ -- For notification service it can only be Just if storage and session states diverge.
1808+ r <- atomically $ getSubscription >>= newSub
1809+ atomically writeSub
1810+ pure r
18161811 Nothing -> do
18171812 r@ (hasSub, _) <- atomically $ getSubscription >>= newSub
18181813 unless hasSub $ atomically writeSub
1819- pure $ Right r
1814+ pure r
18201815 where
18211816 getSubscription = TM. lookup entId $ clientSubs clnt
18221817 newSub = \ case
0 commit comments