@@ -505,7 +505,29 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
505505 withLog " setQueueService" st $ \ sl -> logQueueService sl rId party serviceId
506506
507507 setQueueServices _ _ _ [] = pure $ Right M. empty
508- setQueueServices _ _ _ _ = pure $ Right M. empty -- TODO batch implementation
508+ setQueueServices st party serviceId qs = E. uninterruptibleMask_ $ runExceptT $ do
509+ updated <- S. fromList <$> withDB' " setQueueServices" st (\ db ->
510+ map fromOnly <$> DB. query db updateQuery (serviceId, In (map recipientId qs)))
511+ results <- liftIO $ forM qs $ \ sq -> do
512+ let rId = recipientId sq
513+ (rId,) <$> if S. member rId updated
514+ then readQueueRecIO (queueRec sq) $>>= \ q -> do
515+ atomically $ writeTVar (queueRec sq) $ Just $ updateRec q
516+ withLog " setQueueServices" st $ \ sl -> logQueueService sl rId party serviceId
517+ pure $ Right ()
518+ else pure $ Left AUTH
519+ pure $ M. fromList results
520+ where
521+ updateQuery = case party of
522+ SRecipientService ->
523+ " UPDATE msg_queues SET rcv_service_id = ? WHERE recipient_id IN ? AND deleted_at IS NULL RETURNING recipient_id"
524+ SNotifierService ->
525+ " UPDATE msg_queues SET ntf_service_id = ? WHERE recipient_id IN ? AND notifier_id IS NOT NULL AND deleted_at IS NULL RETURNING recipient_id"
526+ updateRec q = case party of
527+ SRecipientService -> q {rcvServiceId = serviceId}
528+ SNotifierService -> case notifier q of
529+ Just nc -> q {notifier = Just nc {ntfServiceId = serviceId}}
530+ Nothing -> q
509531
510532 getQueueNtfServices :: PostgresQueueStore q -> [(NotifierId , a )] -> IO (Either ErrorType ([(Maybe ServiceId , [(NotifierId , a )])], [(NotifierId , a )]))
511533 getQueueNtfServices st ntfs = E. uninterruptibleMask_ $ runExceptT $ do
0 commit comments