-
-
Notifications
You must be signed in to change notification settings - Fork 94
Expand file tree
/
Copy pathServer.hs
More file actions
2471 lines (2367 loc) · 135 KB
/
Server.hs
File metadata and controls
2471 lines (2367 loc) · 135 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedLists #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
-- |
-- Module : Simplex.Messaging.Server
-- Copyright : (c) simplex.chat
-- License : AGPL-3
--
-- Maintainer : chat@simplex.chat
-- Stability : experimental
-- Portability : non-portable
--
-- This module defines SMP protocol server with in-memory persistence
-- and optional append only log of SMP queue records.
--
-- See https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md
module Simplex.Messaging.Server
( runSMPServer,
runSMPServerBlocking,
controlPortAuth,
importMessages,
exportMessages,
printMessageStats,
disconnectTransport,
verifyCmdAuthorization,
dummyVerifyCmd,
randomId,
AttachHTTP,
MessageStats (..),
)
where
import Control.Concurrent.STM (throwSTM)
import qualified Control.Exception as E
import Control.Logger.Simple
import Control.Monad
import Control.Monad.Except
import Control.Monad.IO.Unlift
import Control.Monad.Reader
import Control.Monad.Trans.Except
import Control.Monad.STM (retry)
import Crypto.Random (ChaChaDRG)
import Data.Bifunctor (first, second)
import Data.ByteString.Base64 (encode)
import qualified Data.ByteString.Builder as BLD
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Lazy.Char8 as LB
import Data.Constraint (Dict (..))
import Data.Dynamic (toDyn)
import Data.Either (fromRight, partitionEithers)
import Data.Foldable (foldrM)
import Data.Functor (($>))
import Data.IORef
import Data.Int (Int64)
import qualified Data.IntMap.Strict as IM
import qualified Data.IntSet as IS
import Data.List (foldl', intercalate)
import Data.List.NonEmpty (NonEmpty (..), (<|))
import qualified Data.List.NonEmpty as L
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Maybe (fromMaybe, isJust, isNothing)
import Data.Semigroup (Sum (..))
import qualified Data.Set as S
import Data.Text (Text)
import qualified Data.Text as T
import Data.Text.Encoding (decodeLatin1)
import qualified Data.Text.IO as T
import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime)
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
import Data.Time.Format.ISO8601 (iso8601Show)
import Data.Type.Equality
import Data.Typeable (cast)
import qualified Data.X509 as X
import qualified Data.X509.Validation as XV
import GHC.Conc.Signal
import GHC.IORef (atomicSwapIORef)
import GHC.Stats (getRTSStats)
import GHC.TypeLits (KnownNat)
import Network.Socket (ServiceName, Socket, socketToHandle)
import qualified Network.TLS as TLS
import Numeric.Natural (Natural)
import Simplex.Messaging.Agent.Lock
import Simplex.Messaging.Client (ProtocolClient (thParams), ProtocolClientError (..), SMPClient, SMPClientError, clientHandlers, forwardSMPTransmission, smpProxyError, temporaryClientError)
import Simplex.Messaging.Client.Agent (OwnServer, SMPClientAgent (..), SMPClientAgentEvent (..), closeSMPClientAgent, getSMPServerClient'', isOwnServer, lookupSMPServerClient, getConnectedSMPServerClient)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.Control
import Simplex.Messaging.Server.Env.STM as Env
import Simplex.Messaging.Server.Expiration
import Simplex.Messaging.Server.MsgStore
import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgStore, JournalQueue (..), getJournalQueueMessages)
import Simplex.Messaging.Server.MsgStore.STM
import Simplex.Messaging.Server.MsgStore.Types
import Simplex.Messaging.Server.NtfStore
import Simplex.Messaging.Server.Prometheus
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.Server.QueueStore.QueueInfo
import Simplex.Messaging.Server.QueueStore.Types
import Simplex.Messaging.Server.Stats
import Simplex.Messaging.Server.StoreLog (foldLogLines)
import Simplex.Messaging.SystemTime
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport
import Simplex.Messaging.Transport.Buffer (trimCR)
import Simplex.Messaging.Transport.Server
import Simplex.Messaging.Util
import Simplex.Messaging.Version
import System.Environment (lookupEnv)
import System.Exit (exitFailure, exitSuccess)
import System.IO (hPrint, hPutStrLn, hSetNewlineMode, universalNewlineMode)
import System.Mem.Weak (deRefWeak)
import UnliftIO (timeout)
import UnliftIO.Concurrent
import UnliftIO.Directory (doesFileExist, renameFile)
import UnliftIO.Exception
import UnliftIO.IO
import UnliftIO.STM
#if MIN_VERSION_base(4,18,0)
import Data.List (sort)
import GHC.Conc (listThreads, threadStatus)
import GHC.Conc.Sync (threadLabel)
#endif
#if defined(dbServerPostgres)
import Simplex.Messaging.Server.MsgStore.Postgres (exportDbMessages, getDbMessageStats)
#endif
-- | Runs an SMP server using passed configuration.
--
-- See a full server here: https://github.com/simplex-chat/simplexmq/blob/master/apps/smp-server/Main.hs
runSMPServer :: MsgStoreClass s => ServerConfig s -> Maybe AttachHTTP -> IO ()
runSMPServer cfg attachHTTP_ = do
started <- newEmptyTMVarIO
runSMPServerBlocking started cfg attachHTTP_
-- | Runs an SMP server using passed configuration with signalling.
--
-- This function uses passed TMVar to signal when the server is ready to accept TCP requests (True)
-- and when it is disconnected from the TCP socket once the server thread is killed (False).
runSMPServerBlocking :: MsgStoreClass s => TMVar Bool -> ServerConfig s -> Maybe AttachHTTP -> IO ()
runSMPServerBlocking started cfg attachHTTP_ = newEnv cfg >>= runReaderT (smpServer started cfg attachHTTP_)
type M s a = ReaderT (Env s) IO a
type AttachHTTP = Socket -> TLS.Context -> IO ()
-- actions used in serverThread to reduce STM transaction scope
data ClientSubAction
= CSAEndSub QueueId -- end single direct queue subscription
| CSAEndServiceSub QueueId -- end service subscription to one queue
| CSADecreaseSubs (Int64, IdsHash) -- reduce service subscriptions when cancelling. Fixed number is used to correctly handle race conditions when service resubscribes
type PrevClientSub s = (Client s, ClientSubAction, (EntityId, BrokerMsg))
smpServer :: forall s. MsgStoreClass s => TMVar Bool -> ServerConfig s -> Maybe AttachHTTP -> M s ()
smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOptions} attachHTTP_ = do
s <- asks server
pa <- asks proxyAgent
msgStats_ <- processServerMessages startOptions
ntfStats <- restoreServerNtfs
liftIO $ mapM_ (printMessageStats "messages") msgStats_
liftIO $ printMessageStats "notifications" ntfStats
restoreServerStats msgStats_ ntfStats
when (maintenance startOptions) $ do
liftIO $ putStrLn "Server started in 'maintenance' mode, exiting"
stopServer s
liftIO $ exitSuccess
raceAny_
( serverThread "server subscribers" s subscribers subscriptions serviceSubsCount (Just cancelSub)
: serverThread "server ntfSubscribers" s ntfSubscribers ntfSubscriptions ntfServiceSubsCount Nothing
: deliverNtfsThread s
: sendPendingEvtsThread s
: receiveFromProxyAgent pa
: expireNtfsThread cfg
: sigIntHandlerThread
: map runServer transports
<> expireMessagesThread_ cfg
<> serverStatsThread_ cfg
<> prometheusMetricsThread_ cfg
<> controlPortThread_ cfg
)
`finally` stopServer s
where
runServer :: (ServiceName, ASrvTransport, AddHTTP) -> M s ()
runServer (tcpPort, ATransport t, addHTTP) = do
smpCreds@(srvCert, srvKey) <- asks tlsServerCreds
httpCreds_ <- asks httpServerCreds
ss <- liftIO newSocketState
asks sockets >>= atomically . (`modifyTVar'` ((tcpPort, ss) :))
srvSignKey <- either fail pure $ C.x509ToPrivate' srvKey
env <- ask
liftIO $ case (httpCreds_, attachHTTP_) of
(Just httpCreds, Just attachHTTP) | addHTTP ->
runTransportServerState_ ss started tcpPort defaultSupportedParamsHTTPS combinedCreds tCfg $ \s (sniUsed, h) ->
case cast h of
Just (TLS {tlsContext} :: TLS 'TServer) | sniUsed -> labelMyThread "https client" >> attachHTTP s tlsContext
_ -> runClient srvCert srvSignKey t h `runReaderT` env
where
combinedCreds = TLSServerCredential {credential = smpCreds, sniCredential = Just httpCreds}
_ ->
runTransportServerState ss started tcpPort defaultSupportedParams smpCreds tCfg $ \h -> runClient srvCert srvSignKey t h `runReaderT` env
sigIntHandlerThread :: M s ()
sigIntHandlerThread = do
flagINT <- newEmptyTMVarIO
let sigINT = 2 -- CONST_SIGINT value
sigIntAction = \_ptr -> atomically $ void $ tryPutTMVar flagINT ()
sigIntHandler = Just (sigIntAction, toDyn ())
void $ liftIO $ setHandler sigINT sigIntHandler
atomically $ readTMVar flagINT
logNote "Received SIGINT, stopping server..."
stopServer :: Server s -> M s ()
stopServer s = do
asks serverActive >>= atomically . (`writeTVar` False)
logNote "Saving server state..."
withLock' (savingLock s) "final" $ saveServer True >> closeServer
logNote "Server stopped"
saveServer :: Bool -> M s ()
saveServer drainMsgs = do
ms <- asks msgStore_
liftIO $ saveServerMessages drainMsgs ms >> closeMsgStore (fromMsgStore ms)
saveServerNtfs
saveServerStats
closeServer :: M s ()
closeServer = asks (smpAgent . proxyAgent) >>= liftIO . closeSMPClientAgent
serverThread ::
forall sub. String ->
Server s ->
(Server s -> ServerSubscribers s) ->
(Client s -> TMap QueueId sub) ->
(Client s -> TVar (Int64, IdsHash)) ->
Maybe (sub -> IO ()) ->
M s ()
serverThread label srv srvSubscribers clientSubs clientServiceSubs unsub_ = do
labelMyThread label
liftIO . forever $ do
-- Reading clients outside of `updateSubscribers` transaction to avoid transaction re-evaluation on each new connected client.
-- In case client disconnects during the transaction (its `connected` property is read),
-- the transaction will still be re-evaluated, and the client won't be stored as subscribed.
sub@(_, clntId) <- atomically $ readTQueue subQ
c_ <- getServerClient clntId srv
atomically (updateSubscribers c_ sub)
>>= endPreviousSubscriptions
where
ServerSubscribers {subQ, queueSubscribers, serviceSubscribers, totalServiceSubs, subClients, pendingEvents} = srvSubscribers srv
updateSubscribers :: Maybe (Client s) -> (ClientSub, ClientId) -> STM [PrevClientSub s]
updateSubscribers c_ (clntSub, clntId) = case c_ of
Just c@Client {connected} -> ifM (readTVar connected) (updateSubConnected c) updateSubDisconnected
Nothing -> updateSubDisconnected
where
updateSubConnected c = case clntSub of
CSClient qId prevServiceId serviceId_ -> do
modifyTVar' subClients $ IS.insert clntId -- add ID to server's subscribed cients
as'' <- if prevServiceId == serviceId_ then pure [] else endServiceSub prevServiceId qId END
case serviceId_ of
Just serviceId -> do
modifyTVar' totalServiceSubs $ addServiceSubs (1, queueIdHash qId) -- server count and IDs hash for all services
as <- endQueueSub qId END
as' <- cancelServiceSubs serviceId =<< upsertSubscribedClient serviceId c serviceSubscribers
pure $ as ++ as' ++ as''
Nothing -> do
as <- prevSub qId END (CSAEndSub qId) =<< upsertSubscribedClient qId c queueSubscribers
pure $ as ++ as''
CSDeleted qId serviceId -> do
removeWhenNoSubs c
as <- endQueueSub qId DELD
as' <- endServiceSub serviceId qId DELD
pure $ as ++ as'
CSService serviceId changedSubs -> do
modifyTVar' subClients $ IS.insert clntId -- add ID to server's subscribed cients
modifyTVar' totalServiceSubs $ addServiceSubs changedSubs -- server count and IDs hash for all services
cancelServiceSubs serviceId =<< upsertSubscribedClient serviceId c serviceSubscribers
updateSubDisconnected = case clntSub of
-- do not insert client if it is already disconnected, but send END/DELD to any other client subscribed to this queue or service
CSClient qId prevServiceId serviceId -> do
as <- endQueueSub qId END
as' <- endServiceSub serviceId qId END
as'' <- if prevServiceId == serviceId then pure [] else endServiceSub prevServiceId qId END
pure $ as ++ as' ++ as''
CSDeleted qId serviceId -> do
as <- endQueueSub qId DELD
as' <- endServiceSub serviceId qId DELD
pure $ as ++ as'
CSService serviceId _ -> cancelServiceSubs serviceId =<< lookupSubscribedClient serviceId serviceSubscribers
endQueueSub :: QueueId -> BrokerMsg -> STM [PrevClientSub s]
endQueueSub qId msg = prevSub qId msg (CSAEndSub qId) =<< lookupDeleteSubscribedClient qId queueSubscribers
endServiceSub :: Maybe ServiceId -> QueueId -> BrokerMsg -> STM [PrevClientSub s]
endServiceSub Nothing _ _ = pure []
endServiceSub (Just serviceId) qId msg = prevSub qId msg (CSAEndServiceSub qId) =<< lookupSubscribedClient serviceId serviceSubscribers
prevSub :: QueueId -> BrokerMsg -> ClientSubAction -> Maybe (Client s) -> STM [PrevClientSub s]
prevSub qId msg action =
checkAnotherClient $ \c -> pure [(c, action, (qId, msg))]
cancelServiceSubs :: ServiceId -> Maybe (Client s) -> STM [PrevClientSub s]
cancelServiceSubs serviceId =
checkAnotherClient $ \c -> do
changedSubs@(n, idsHash) <- swapTVar (clientServiceSubs c) (0, mempty)
pure [(c, CSADecreaseSubs changedSubs, (serviceId, ENDS n idsHash))]
checkAnotherClient :: (Client s -> STM [PrevClientSub s]) -> Maybe (Client s) -> STM [PrevClientSub s]
checkAnotherClient mkSub = \case
Just c@Client {clientId, connected} | clntId /= clientId ->
ifM (readTVar connected) (mkSub c) (pure [])
_ -> pure []
endPreviousSubscriptions :: [PrevClientSub s] -> IO ()
endPreviousSubscriptions = mapM_ $ \(c, subAction, evt) -> do
atomically $ modifyTVar' pendingEvents $ IM.alter (Just . maybe [evt] (evt <|)) (clientId c)
case subAction of
CSAEndSub qId -> atomically (endSub c qId) >>= a unsub_
where
a (Just unsub) (Just s) = unsub s
a _ _ = pure ()
CSAEndServiceSub qId -> atomically $ do
modifyTVar' (clientServiceSubs c) decrease
modifyTVar' totalServiceSubs decrease
where
decrease = subtractServiceSubs (1, queueIdHash qId)
CSADecreaseSubs changedSubs -> do
atomically $ modifyTVar' totalServiceSubs $ subtractServiceSubs changedSubs
forM_ unsub_ $ \unsub -> atomically (swapTVar (clientSubs c) M.empty) >>= mapM_ unsub
where
endSub :: Client s -> QueueId -> STM (Maybe sub)
endSub c qId = TM.lookupDelete qId (clientSubs c) >>= (removeWhenNoSubs c $>)
-- remove client from server's subscribed cients
removeWhenNoSubs c = do
noClientSubs <- null <$> readTVar (clientSubs c)
noServiceSubs <- ((0 ==) . fst) <$> readTVar (clientServiceSubs c)
when (noClientSubs && noServiceSubs) $ modifyTVar' subClients $ IS.delete (clientId c)
deliverNtfsThread :: Server s -> M s ()
deliverNtfsThread srv@Server {ntfSubscribers = ServerSubscribers {subClients, serviceSubscribers}} = do
ntfInt <- asks $ ntfDeliveryInterval . config
ms <- asks msgStore
ns' <- asks ntfStore
stats <- asks serverStats
liftIO $ forever $ do
threadDelay ntfInt
runDeliverNtfs ms ns' stats
where
runDeliverNtfs :: s -> NtfStore -> ServerStats -> IO ()
runDeliverNtfs ms (NtfStore ns) stats = do
ntfs <- M.assocs <$> readTVarIO ns
unless (null ntfs) $
getQueueNtfServices @(StoreQueue s) (queueStore ms) ntfs >>= \case
Left e -> logError $ "NOTIFICATIONS: getQueueNtfServices error " <> tshow e
Right (sNtfs, deleted) -> do
forM_ sNtfs $ \(serviceId_, ntfs') -> case serviceId_ of
Just sId -> getSubscribedClient sId serviceSubscribers >>= mapM_ (deliverServiceNtfs ntfs')
Nothing -> do -- legacy code that does almost the same as before for non-service subscribers
cIds <- IS.toList <$> readTVarIO subClients
forM_ cIds $ \cId -> getServerClient cId srv >>= mapM_ (deliverQueueNtfs ntfs')
atomically $ modifyTVar' ns (`M.withoutKeys` S.fromList (map fst deleted))
where
deliverQueueNtfs ntfs' c@Client {ntfSubscriptions} =
whenM (currentClient readTVarIO c) $ do
subs <- readTVarIO ntfSubscriptions
unless (M.null subs) $ do
let ntfs'' = filter (\(nId, _) -> M.member nId subs) ntfs'
tryAny (atomically $ flushSubscribedNtfs ntfs'' c) >>= updateNtfStats c
deliverServiceNtfs ntfs' cv = readTVarIO cv >>= mapM_ deliver
where
deliver c = tryAny (atomically $ withSubscribed $ flushSubscribedNtfs ntfs') >>= updateNtfStats c
withSubscribed :: (Client s -> STM Int) -> STM Int
withSubscribed a = readTVar cv >>= maybe (throwSTM $ userError "service unsubscribed") a
flushSubscribedNtfs :: [(NotifierId, TVar [MsgNtf])] -> Client s' -> STM Int
flushSubscribedNtfs ntfs c@Client {sndQ} = do
ts_ <- foldM addNtfs [] ntfs
forM_ (L.nonEmpty ts_) $ \ts -> do
let cancelNtfs s = throwSTM $ userError $ s <> ", " <> show (length ts_) <> " ntfs kept"
unlessM (currentClient readTVar c) $ cancelNtfs "not current client"
whenM (isFullTBQueue sndQ) $ cancelNtfs "sending queue full"
writeTBQueue sndQ (ts, [])
pure $ length ts_
currentClient :: Monad m => (forall a. TVar a -> m a) -> Client s' -> m Bool
currentClient rd Client {clientId, connected} = (&&) <$> rd connected <*> (IS.member clientId <$> rd subClients)
addNtfs :: [Transmission BrokerMsg] -> (NotifierId, TVar [MsgNtf]) -> STM [Transmission BrokerMsg]
addNtfs acc (nId, v) =
readTVar v >>= \case
[] -> pure acc
ntfs -> do
writeTVar v []
pure $ foldl' (\acc' ntf -> nmsg nId ntf : acc') acc ntfs -- reverses, to order by time
nmsg nId MsgNtf {ntfNonce, ntfEncMeta} = (NoCorrId, nId, NMSG ntfNonce ntfEncMeta)
updateNtfStats :: Client s' -> Either SomeException Int -> IO ()
updateNtfStats Client {clientId} = \case
Right 0 -> pure ()
Right len -> do
atomicModifyIORef'_ (ntfCount stats) (subtract len)
atomicModifyIORef'_ (msgNtfs stats) (+ len)
atomicModifyIORef'_ (msgNtfsB stats) (+ (len `div` 80 + 1)) -- up to 80 NMSG in the batch
Left e -> logNote $ "NOTIFICATIONS: cancelled for client #" <> tshow clientId <> ", reason: " <> tshow e
sendPendingEvtsThread :: Server s -> M s ()
sendPendingEvtsThread srv@Server {subscribers, ntfSubscribers} = do
endInt <- asks $ pendingENDInterval . config
stats <- asks serverStats
liftIO $ forever $ do
threadDelay endInt
sendPending subscribers stats
sendPending ntfSubscribers stats
where
sendPending ServerSubscribers {pendingEvents} stats = do
pending <- atomically $ swapTVar pendingEvents IM.empty
unless (null pending) $ forM_ (IM.assocs pending) $ \(cId, evts) ->
getServerClient cId srv >>= mapM_ (enqueueEvts evts)
where
enqueueEvts evts c@Client {connected, sndQ} =
whenM (readTVarIO connected) $ do
sent <- atomically $ tryWriteTBQueue sndQ (ts, [])
if sent
then updateEndStats
else -- if queue is full it can block
forkClient c "sendPendingEvtsThread.queueEvts" $
atomically (writeTBQueue sndQ (ts, [])) >> updateEndStats
where
ts = L.map (\(entId, evt) -> (NoCorrId, entId, evt)) evts
-- this accounts for both END and DELD events
updateEndStats = do
let len = L.length evts
when (len > 0) $ do
atomicModifyIORef'_ (qSubEnd stats) (+ len)
atomicModifyIORef'_ (qSubEndB stats) (+ (len `div` 255 + 1)) -- up to 255 ENDs or DELDs in the batch
receiveFromProxyAgent :: ProxyAgent -> M s ()
receiveFromProxyAgent ProxyAgent {smpAgent = SMPClientAgent {agentQ}} =
forever $
atomically (readTBQueue agentQ) >>= \case
CAConnected srv _service_ -> logInfo $ "SMP server connected " <> showServer' srv
CADisconnected srv qIds -> logError $ "SMP server disconnected " <> showServer' srv <> " / subscriptions: " <> tshow (length qIds)
-- the errors below should never happen - messaging proxy does not make any subscriptions
CASubscribed srv serviceId qIds -> logError $ "SMP server subscribed queues " <> asService <> showServer' srv <> " / subscriptions: " <> tshow (length qIds)
where
asService = if isJust serviceId then "as service " else ""
CASubError srv errs -> logError $ "SMP server subscription errors " <> showServer' srv <> " / errors: " <> tshow (length errs)
CAServiceDisconnected {} -> logError "CAServiceDisconnected"
CAServiceSubscribed {} -> logError "CAServiceSubscribed"
CAServiceSubError {} -> logError "CAServiceSubError"
CAServiceUnavailable {} -> logError "CAServiceUnavailable"
where
showServer' = decodeLatin1 . strEncode . host
expireMessagesThread_ :: ServerConfig s -> [M s ()]
expireMessagesThread_ ServerConfig {messageExpiration = Just msgExp} = [expireMessagesThread msgExp]
expireMessagesThread_ _ = []
expireMessagesThread :: ExpirationConfig -> M s ()
expireMessagesThread ExpirationConfig {checkInterval, ttl} = do
ms <- asks msgStore
let interval = checkInterval * 1000000
stats <- asks serverStats
labelMyThread "expireMessagesThread"
liftIO $ forever $ expire ms stats interval
where
expire :: s -> ServerStats -> Int64 -> IO ()
expire ms stats interval = do
threadDelay' interval
logNote "Started expiring messages..."
n <- compactQueues @(StoreQueue s) $ queueStore ms
when (n > 0) $ logNote $ "Removed " <> tshow n <> " old deleted queues from the database."
now <- systemSeconds <$> getSystemTime
tryAny (expireOldMessages False ms now ttl) >>= \case
Right msgStats@MessageStats {storedMsgsCount = stored, expiredMsgsCount = expired} -> do
atomicWriteIORef (msgCount stats) stored
atomicModifyIORef'_ (msgExpired stats) (+ expired)
printMessageStats "STORE: messages" msgStats
Left e -> logError $ "STORE: expireOldMessages, error expiring messages, " <> tshow e
expireNtfsThread :: ServerConfig s -> M s ()
expireNtfsThread ServerConfig {notificationExpiration = expCfg} = do
ns <- asks ntfStore
let interval = checkInterval expCfg * 1000000
stats <- asks serverStats
labelMyThread "expireNtfsThread"
liftIO $ forever $ do
threadDelay' interval
old <- expireBeforeEpoch expCfg
expired <- deleteExpiredNtfs ns old
when (expired > 0) $ do
atomicModifyIORef'_ (msgNtfExpired stats) (+ expired)
atomicModifyIORef'_ (ntfCount stats) (subtract expired)
serverStatsThread_ :: ServerConfig s -> [M s ()]
serverStatsThread_ ServerConfig {logStatsInterval = Just interval, logStatsStartTime, serverStatsLogFile} =
[logServerStats logStatsStartTime interval serverStatsLogFile]
serverStatsThread_ _ = []
logServerStats :: Int64 -> Int64 -> FilePath -> M s ()
logServerStats startAt logInterval statsFilePath = do
labelMyThread "logServerStats"
initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime
liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath
liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0)
ss@ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedAllB, qDeletedNew, qDeletedSecured, qSub, qSubAllB, qSubAuth, qSubDuplicate, qSubProhibited, qSubEnd, qSubEndB, ntfCreated, ntfDeleted, ntfDeletedB, ntfSub, ntfSubB, ntfSubAuth, ntfSubDuplicate, msgSent, msgSentAuth, msgSentQuota, msgSentLarge, msgRecv, msgRecvGet, msgGet, msgGetNoMsg, msgGetAuth, msgGetDuplicate, msgGetProhibited, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount, ntfCount, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv, rcvServices, ntfServices}
<- asks serverStats
st <- asks msgStore
EntityCounts {queueCount, notifierCount, rcvServiceCount, ntfServiceCount, rcvServiceQueuesCount, ntfServiceQueuesCount} <-
liftIO $ getEntityCounts @(StoreQueue s) $ queueStore st
let interval = 1000000 * logInterval
forever $ do
withFile statsFilePath AppendMode $ \h -> liftIO $ do
hSetBuffering h LineBuffering
ts <- getCurrentTime
fromTime' <- atomicSwapIORef fromTime ts
qCreated' <- atomicSwapIORef qCreated 0
qSecured' <- atomicSwapIORef qSecured 0
qDeletedAll' <- atomicSwapIORef qDeletedAll 0
qDeletedAllB' <- atomicSwapIORef qDeletedAllB 0
qDeletedNew' <- atomicSwapIORef qDeletedNew 0
qDeletedSecured' <- atomicSwapIORef qDeletedSecured 0
qSub' <- atomicSwapIORef qSub 0
qSubAllB' <- atomicSwapIORef qSubAllB 0
qSubAuth' <- atomicSwapIORef qSubAuth 0
qSubDuplicate' <- atomicSwapIORef qSubDuplicate 0
qSubProhibited' <- atomicSwapIORef qSubProhibited 0
qSubEnd' <- atomicSwapIORef qSubEnd 0
qSubEndB' <- atomicSwapIORef qSubEndB 0
ntfCreated' <- atomicSwapIORef ntfCreated 0
ntfDeleted' <- atomicSwapIORef ntfDeleted 0
ntfDeletedB' <- atomicSwapIORef ntfDeletedB 0
ntfSub' <- atomicSwapIORef ntfSub 0
ntfSubB' <- atomicSwapIORef ntfSubB 0
ntfSubAuth' <- atomicSwapIORef ntfSubAuth 0
ntfSubDuplicate' <- atomicSwapIORef ntfSubDuplicate 0
msgSent' <- atomicSwapIORef msgSent 0
msgSentAuth' <- atomicSwapIORef msgSentAuth 0
msgSentQuota' <- atomicSwapIORef msgSentQuota 0
msgSentLarge' <- atomicSwapIORef msgSentLarge 0
msgRecv' <- atomicSwapIORef msgRecv 0
msgRecvGet' <- atomicSwapIORef msgRecvGet 0
msgGet' <- atomicSwapIORef msgGet 0
msgGetNoMsg' <- atomicSwapIORef msgGetNoMsg 0
msgGetAuth' <- atomicSwapIORef msgGetAuth 0
msgGetDuplicate' <- atomicSwapIORef msgGetDuplicate 0
msgGetProhibited' <- atomicSwapIORef msgGetProhibited 0
msgExpired' <- atomicSwapIORef msgExpired 0
ps <- liftIO $ periodStatCounts activeQueues ts
msgSentNtf' <- atomicSwapIORef msgSentNtf 0
msgRecvNtf' <- atomicSwapIORef msgRecvNtf 0
psNtf <- liftIO $ periodStatCounts activeQueuesNtf ts
msgNtfs' <- atomicSwapIORef (msgNtfs ss) 0
msgNtfsB' <- atomicSwapIORef (msgNtfsB ss) 0
msgNtfNoSub' <- atomicSwapIORef (msgNtfNoSub ss) 0
msgNtfLost' <- atomicSwapIORef (msgNtfLost ss) 0
msgNtfExpired' <- atomicSwapIORef (msgNtfExpired ss) 0
_qBlocked <- atomicSwapIORef (qBlocked ss) 0 -- not logged, only reset
pRelays' <- getResetProxyStatsData pRelays
pRelaysOwn' <- getResetProxyStatsData pRelaysOwn
pMsgFwds' <- getResetProxyStatsData pMsgFwds
pMsgFwdsOwn' <- getResetProxyStatsData pMsgFwdsOwn
pMsgFwdsRecv' <- atomicSwapIORef pMsgFwdsRecv 0
rcvServices' <- getServiceStatsData rcvServices
ntfServices' <- getServiceStatsData ntfServices
qCount' <- readIORef qCount
msgCount' <- readIORef msgCount
ntfCount' <- readIORef ntfCount
T.hPutStrLn h $
T.intercalate
","
( [ T.pack $ iso8601Show $ utctDay fromTime',
tshow qCreated',
tshow qSecured',
tshow qDeletedAll',
tshow msgSent',
tshow msgRecv',
dayCount ps,
weekCount ps,
monthCount ps,
tshow msgSentNtf',
tshow msgRecvNtf',
dayCount psNtf,
weekCount psNtf,
monthCount psNtf,
tshow qCount',
tshow msgCount',
tshow msgExpired',
tshow qDeletedNew',
tshow qDeletedSecured'
]
<> showProxyStats pRelays'
<> showProxyStats pRelaysOwn'
<> showProxyStats pMsgFwds'
<> showProxyStats pMsgFwdsOwn'
<> [ tshow pMsgFwdsRecv',
tshow qSub',
tshow qSubAuth',
tshow qSubDuplicate',
tshow qSubProhibited',
tshow msgSentAuth',
tshow msgSentQuota',
tshow msgSentLarge',
tshow msgNtfs',
tshow msgNtfNoSub',
tshow msgNtfLost',
"0", -- qSubNoMsg' is removed for performance.
-- Use qSubAllB for the approximate number of all subscriptions.
-- Average observed batch size is 25-30 subscriptions.
tshow msgRecvGet',
tshow msgGet',
tshow msgGetNoMsg',
tshow msgGetAuth',
tshow msgGetDuplicate',
tshow msgGetProhibited',
"0", -- dayCount psSub; psSub is removed to reduce memory usage
"0", -- weekCount psSub
"0", -- monthCount psSub
tshow queueCount,
tshow ntfCreated',
tshow ntfDeleted',
tshow ntfSub',
tshow ntfSubAuth',
tshow ntfSubDuplicate',
tshow notifierCount,
tshow qDeletedAllB',
tshow qSubAllB',
tshow qSubEnd',
tshow qSubEndB',
tshow ntfDeletedB',
tshow ntfSubB',
tshow msgNtfsB',
tshow msgNtfExpired',
tshow ntfCount',
tshow rcvServiceCount,
tshow ntfServiceCount,
tshow rcvServiceQueuesCount,
tshow ntfServiceQueuesCount
]
<> showServiceStats rcvServices'
<> showServiceStats ntfServices'
)
liftIO $ threadDelay' interval
where
showProxyStats ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther} =
map tshow [_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther]
showServiceStats ServiceStatsData {_srvAssocNew, _srvAssocDuplicate, _srvAssocUpdated, _srvAssocRemoved, _srvSubCount, _srvSubDuplicate, _srvSubQueues, _srvSubEnd} =
map tshow [_srvAssocNew, _srvAssocDuplicate, _srvAssocUpdated, _srvAssocRemoved, _srvSubCount, _srvSubDuplicate, _srvSubQueues, _srvSubEnd]
prometheusMetricsThread_ :: ServerConfig s -> [M s ()]
prometheusMetricsThread_ ServerConfig {prometheusInterval = Just interval, prometheusMetricsFile} =
[savePrometheusMetrics interval prometheusMetricsFile]
prometheusMetricsThread_ _ = []
savePrometheusMetrics :: Int -> FilePath -> M s ()
savePrometheusMetrics saveInterval metricsFile = do
labelMyThread "savePrometheusMetrics"
liftIO $ putStrLn $ "Prometheus metrics saved every " <> show saveInterval <> " seconds to " <> metricsFile
st <- asks msgStore
ss <- asks serverStats
env <- ask
rtsOpts <- liftIO $ maybe ("set " <> rtsOptionsEnv) T.pack <$> lookupEnv (T.unpack rtsOptionsEnv)
let interval = 1000000 * saveInterval
liftIO $ forever $ do
threadDelay interval
ts <- getCurrentTime
sm <- getServerMetrics st ss rtsOpts
rtm <- getRealTimeMetrics env
T.writeFile metricsFile $ prometheusMetrics sm rtm ts
getServerMetrics :: s -> ServerStats -> Text -> IO ServerMetrics
getServerMetrics st ss rtsOptions = do
d <- getServerStatsData ss
let ps = periodStatDataCounts $ _activeQueues d
psNtf = periodStatDataCounts $ _activeQueuesNtf d
entityCounts <- getEntityCounts @(StoreQueue s) $ queueStore st
pure ServerMetrics {statsData = d, activeQueueCounts = ps, activeNtfCounts = psNtf, entityCounts, rtsOptions}
getRealTimeMetrics :: Env s -> IO RealTimeMetrics
getRealTimeMetrics Env {sockets, msgStore_ = ms, server = srv@Server {subscribers, ntfSubscribers}} = do
socketStats <- mapM (traverse getSocketStats) =<< readTVarIO sockets
#if MIN_VERSION_base(4,18,0)
threadsCount <- length <$> listThreads
#else
let threadsCount = 0
#endif
clientsCount <- IM.size <$> getServerClients srv
(deliveredSubs, deliveredTimes) <- getDeliveredMetrics =<< getSystemSeconds
smpSubs <- getSubscribersMetrics subscribers
ntfSubs <- getSubscribersMetrics ntfSubscribers
loadedCounts <- loadedQueueCounts $ fromMsgStore ms
pure RealTimeMetrics {socketStats, threadsCount, clientsCount, deliveredSubs, deliveredTimes, smpSubs, ntfSubs, loadedCounts}
where
getSubscribersMetrics ServerSubscribers {queueSubscribers, serviceSubscribers, totalServiceSubs, subClients} = do
subsCount <- M.size <$> getSubscribedClients queueSubscribers
subClientsCount <- IS.size <$> readTVarIO subClients
subServicesCount <- M.size <$> getSubscribedClients serviceSubscribers
subServiceSubsCount <- fst <$> readTVarIO totalServiceSubs
pure RTSubscriberMetrics {subsCount, subClientsCount, subServicesCount, subServiceSubsCount}
getDeliveredMetrics ts' = foldM countClnt (RTSubscriberMetrics 0 0 0 0, emptyTimeBuckets) =<< getServerClients srv
where
countClnt acc@(metrics, times) Client {subscriptions} = do
(cnt, times') <- foldM countSubs (0, times) =<< readTVarIO subscriptions
pure $ if cnt > 0
then (metrics {subsCount = subsCount metrics + cnt, subClientsCount = subClientsCount metrics + 1}, times')
else acc
countSubs acc@(!cnt, times) Sub {delivered} = do
delivered_ <- readTVarIO delivered
pure $ case delivered_ of
Nothing -> acc
Just (_, ts) -> (cnt + 1, updateTimeBuckets ts ts' times)
runClient :: Transport c => X.CertificateChain -> C.APrivateSignKey -> TProxy c 'TServer -> c 'TServer -> M s ()
runClient srvCert srvSignKey tp h = do
ms <- asks msgStore
g <- asks random
idSize <- asks $ queueIdBytes . config
kh <- asks serverIdentity
ks <- atomically . C.generateKeyPair =<< asks random
ServerConfig {smpServerVRange, smpHandshakeTimeout} <- asks config
labelMyThread $ "smp handshake for " <> transportName tp
liftIO (timeout smpHandshakeTimeout . runExceptT $ smpServerHandshake srvCert srvSignKey h ks kh smpServerVRange $ getClientService ms g idSize) >>= \case
Just (Right th) -> runClientTransport th
_ -> pure ()
getClientService :: s -> TVar ChaChaDRG -> Int -> SMPServiceRole -> X.CertificateChain -> XV.Fingerprint -> ExceptT TransportError IO ServiceId
getClientService ms g idSize role cert fp = do
newServiceId <- EntityId <$> atomically (C.randomBytes idSize g)
ts <- liftIO getSystemDate
let sr = ServiceRec {serviceId = newServiceId, serviceRole = role, serviceCert = cert, serviceCertHash = fp, serviceCreatedAt = ts}
withExceptT (const $ TEHandshake BAD_SERVICE) $ ExceptT $
getCreateService @(StoreQueue s) (queueStore ms) sr
controlPortThread_ :: ServerConfig s -> [M s ()]
controlPortThread_ ServerConfig {controlPort = Just port} = [runCPServer port]
controlPortThread_ _ = []
runCPServer :: ServiceName -> M s ()
runCPServer port = do
srv <- asks server
cpStarted <- newEmptyTMVarIO
u <- askUnliftIO
liftIO $ do
labelMyThread "control port server"
runLocalTCPServer cpStarted port $ runCPClient u srv
where
runCPClient :: UnliftIO (ReaderT (Env s) IO) -> Server s -> Socket -> IO ()
runCPClient u srv sock = do
labelMyThread "control port client"
h <- socketToHandle sock ReadWriteMode
hSetBuffering h LineBuffering
hSetNewlineMode h universalNewlineMode
hPutStrLn h "SMP server control port\n'help' for supported commands"
role <- newTVarIO CPRNone
cpLoop h role
where
cpLoop h role = do
s <- trimCR <$> B.hGetLine h
case strDecode s of
Right CPQuit -> hClose h
Right cmd -> logCmd s cmd >> processCP h role cmd >> cpLoop h role
Left err -> hPutStrLn h ("error: " <> err) >> cpLoop h role
logCmd s cmd = when shouldLog $ logWarn $ "ControlPort: " <> tshow s
where
shouldLog = case cmd of
CPAuth _ -> False
CPHelp -> False
CPQuit -> False
CPSkip -> False
_ -> True
processCP h role = \case
CPAuth auth -> controlPortAuth h user admin role auth
where
ServerConfig {controlPortUserAuth = user, controlPortAdminAuth = admin} = cfg
CPSuspend -> withAdminRole $ hPutStrLn h "suspend not implemented"
CPResume -> withAdminRole $ hPutStrLn h "resume not implemented"
CPClients -> withAdminRole $ do
cls <- getServerClients srv
hPutStrLn h "clientId,sessionId,connected,createdAt,rcvActiveAt,sndActiveAt,age,subscriptions"
forM_ (IM.toList cls) $ \(cid, Client {clientTHParams = THandleParams {sessionId}, connected, createdAt, rcvActiveAt, sndActiveAt, subscriptions}) -> do
connected' <- bshow <$> readTVarIO connected
rcvActiveAt' <- strEncode <$> readTVarIO rcvActiveAt
sndActiveAt' <- strEncode <$> readTVarIO sndActiveAt
now <- liftIO getSystemTime
let age = systemSeconds now - systemSeconds createdAt
subscriptions' <- bshow . M.size <$> readTVarIO subscriptions
hPutStrLn h . B.unpack $ B.intercalate "," [bshow cid, encode sessionId, connected', strEncode createdAt, rcvActiveAt', sndActiveAt', bshow age, subscriptions']
CPStats -> withUserRole $ do
ss <- unliftIO u $ asks serverStats
st <- unliftIO u $ asks msgStore
EntityCounts {queueCount, notifierCount, rcvServiceCount, ntfServiceCount, rcvServiceQueuesCount, ntfServiceQueuesCount} <-
getEntityCounts @(StoreQueue s) $ queueStore st
let getStat :: (ServerStats -> IORef a) -> IO a
getStat var = readIORef (var ss)
putStat :: Show a => String -> (ServerStats -> IORef a) -> IO ()
putStat label var = getStat var >>= \v -> hPutStrLn h $ label <> ": " <> show v
putProxyStat :: String -> (ServerStats -> ProxyStats) -> IO ()
putProxyStat label var = do
ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther} <- getProxyStatsData $ var ss
hPutStrLn h $ label <> ": requests=" <> show _pRequests <> ", successes=" <> show _pSuccesses <> ", errorsConnect=" <> show _pErrorsConnect <> ", errorsCompat=" <> show _pErrorsCompat <> ", errorsOther=" <> show _pErrorsOther
putStat "fromTime" fromTime
putStat "qCreated" qCreated
putStat "qSecured" qSecured
putStat "qDeletedAll" qDeletedAll
putStat "qDeletedAllB" qDeletedAllB
putStat "qDeletedNew" qDeletedNew
putStat "qDeletedSecured" qDeletedSecured
getStat (day . activeQueues) >>= \v -> hPutStrLn h $ "daily active queues: " <> show (IS.size v)
-- removed to reduce memory usage
-- getStat (day . subscribedQueues) >>= \v -> hPutStrLn h $ "daily subscribed queues: " <> show (S.size v)
putStat "qSub" qSub
putStat "qSubAllB" qSubAllB
putStat "qSubEnd" qSubEnd
putStat "qSubEndB" qSubEndB
subs <- (,,) <$> getStat qSubAuth <*> getStat qSubDuplicate <*> getStat qSubProhibited
hPutStrLn h $ "other SUB events (auth, duplicate, prohibited): " <> show subs
putStat "msgSent" msgSent
putStat "msgRecv" msgRecv
putStat "msgRecvGet" msgRecvGet
putStat "msgGet" msgGet
putStat "msgGetNoMsg" msgGetNoMsg
gets <- (,,) <$> getStat msgGetAuth <*> getStat msgGetDuplicate <*> getStat msgGetProhibited
hPutStrLn h $ "other GET events (auth, duplicate, prohibited): " <> show gets
putStat "msgSentNtf" msgSentNtf
putStat "msgRecvNtf" msgRecvNtf
putStat "msgNtfs" msgNtfs
putStat "msgNtfsB" msgNtfsB
putStat "msgNtfExpired" msgNtfExpired
putStat "qCount" qCount
hPutStrLn h $ "qCount 2: " <> show queueCount
hPutStrLn h $ "notifiers: " <> show notifierCount
putStat "msgCount" msgCount
putStat "ntfCount" ntfCount
readTVarIO role >>= \case
CPRAdmin -> do
NtfStore ns <- unliftIO u $ asks ntfStore
ntfCount2 <- liftIO . foldM (\(!n) q -> (n +) . length <$> readTVarIO q) 0 =<< readTVarIO ns
hPutStrLn h $ "ntfCount 2: " <> show ntfCount2
_ -> pure ()
putProxyStat "pRelays" pRelays
putProxyStat "pRelaysOwn" pRelaysOwn
putProxyStat "pMsgFwds" pMsgFwds
putProxyStat "pMsgFwdsOwn" pMsgFwdsOwn
putStat "pMsgFwdsRecv" pMsgFwdsRecv
hPutStrLn h $ "rcvServiceCount: " <> show rcvServiceCount
hPutStrLn h $ "ntfServiceCount: " <> show ntfServiceCount
hPutStrLn h $ "rcvServiceQueuesCount: " <> show rcvServiceQueuesCount
hPutStrLn h $ "ntfServiceQueuesCount: " <> show ntfServiceQueuesCount
CPStatsRTS -> getRTSStats >>= hPrint h
CPThreads -> withAdminRole $ do
#if MIN_VERSION_base(4,18,0)
threads <- liftIO listThreads
hPutStrLn h $ "Threads: " <> show (length threads)
forM_ (sort threads) $ \tid -> do
label <- threadLabel tid
status <- threadStatus tid
hPutStrLn h $ show tid <> " (" <> show status <> ") " <> fromMaybe "" label
#else
hPutStrLn h "Not available on GHC 8.10"
#endif
CPSockets -> withUserRole $ unliftIO u (asks sockets) >>= readTVarIO >>= mapM_ putSockets
where
putSockets (tcpPort, socketsState) = do
ss <- getSocketStats socketsState
hPutStrLn h $ "Sockets for port " <> tcpPort <> ":"
hPutStrLn h $ "accepted: " <> show (socketsAccepted ss)
hPutStrLn h $ "closed: " <> show (socketsClosed ss)
hPutStrLn h $ "active: " <> show (socketsActive ss)
hPutStrLn h $ "leaked: " <> show (socketsLeaked ss)
CPSocketThreads -> withAdminRole $ do
#if MIN_VERSION_base(4,18,0)
unliftIO u (asks sockets) >>= readTVarIO >>= mapM_ putSocketThreads
where
putSocketThreads (tcpPort, (_, _, active')) = do
active <- readTVarIO active'
forM_ (IM.toList active) $ \(sid, tid') ->
deRefWeak tid' >>= \case
Nothing -> hPutStrLn h $ intercalate "," [tcpPort, show sid, "", "gone", ""]
Just tid -> do
label <- threadLabel tid
status <- threadStatus tid
hPutStrLn h $ intercalate "," [tcpPort, show sid, show tid, show status, fromMaybe "" label]
#else
hPutStrLn h "Not available on GHC 8.10"
#endif
CPServerInfo -> readTVarIO role >>= \case
CPRNone -> do
logError "Unauthorized control port command"
hPutStrLn h "AUTH"
r -> do
#if MIN_VERSION_base(4,18,0)
threads <- liftIO listThreads
hPutStrLn h $ "Threads: " <> show (length threads)
#else
hPutStrLn h "Threads: not available on GHC 8.10"
#endif
let Server {subscribers, ntfSubscribers} = srv
activeClients <- getServerClients srv
hPutStrLn h $ "Clients: " <> show (IM.size activeClients)
when (r == CPRAdmin) $ do
clQs <- clientTBQueueLengths' activeClients
hPutStrLn h $ "Client queues (rcvQ, sndQ, msgQ): " <> show clQs
(smpSubCnt, smpSubCntByGroup, smpClCnt, smpClQs) <- countClientSubs subscriptions (Just countSMPSubs) activeClients
hPutStrLn h $ "SMP subscriptions (via clients): " <> show smpSubCnt
hPutStrLn h $ "SMP subscriptions (by group: NoSub, SubPending, SubThread, ProhibitSub): " <> show smpSubCntByGroup
hPutStrLn h $ "SMP subscribed clients (via clients): " <> show smpClCnt
hPutStrLn h $ "SMP subscribed clients queues (via clients, rcvQ, sndQ, msgQ): " <> show smpClQs
(ntfSubCnt, _, ntfClCnt, ntfClQs) <- countClientSubs ntfSubscriptions Nothing activeClients
hPutStrLn h $ "Ntf subscriptions (via clients): " <> show ntfSubCnt
hPutStrLn h $ "Ntf subscribed clients (via clients): " <> show ntfClCnt
hPutStrLn h $ "Ntf subscribed clients queues (via clients, rcvQ, sndQ, msgQ): " <> show ntfClQs
putSubscribersInfo "SMP" subscribers False
putSubscribersInfo "Ntf" ntfSubscribers True
where
putSubscribersInfo :: String -> ServerSubscribers s -> Bool -> IO ()
putSubscribersInfo protoName ServerSubscribers {queueSubscribers, subClients} showIds = do
activeSubs <- getSubscribedClients queueSubscribers
hPutStrLn h $ protoName <> " subscriptions: " <> show (M.size activeSubs)
-- TODO [certs rcv] service subscriptions
clnts <- countSubClients activeSubs
hPutStrLn h $ protoName <> " subscribed clients: " <> show (IS.size clnts) <> (if showIds then " " <> show (IS.toList clnts) else "")
clnts' <- readTVarIO subClients
hPutStrLn h $ protoName <> " subscribed clients count 2: " <> show (IS.size clnts') <> (if showIds then " " <> show clnts' else "")
where
countSubClients :: Map QueueId (TVar (Maybe (Client s))) -> IO IS.IntSet
countSubClients = foldM (\ !s c -> maybe s ((`IS.insert` s) . clientId) <$> readTVarIO c) IS.empty
countClientSubs :: (Client s -> TMap QueueId a) -> Maybe (Map QueueId a -> IO (Int, Int, Int, Int)) -> IM.IntMap (Client s) -> IO (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural))
countClientSubs subSel countSubs_ = foldM addSubs (0, (0, 0, 0, 0), 0, (0, 0, 0))
where
addSubs :: (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural)) -> Client s -> IO (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural))
addSubs (!subCnt, cnts@(!c1, !c2, !c3, !c4), !clCnt, !qs) cl = do
subs <- readTVarIO $ subSel cl
cnts' <- case countSubs_ of
Nothing -> pure cnts
Just countSubs -> do
(c1', c2', c3', c4') <- countSubs subs
pure (c1 + c1', c2 + c2', c3 + c3', c4 + c4')
let cnt = M.size subs
clCnt' = if cnt == 0 then clCnt else clCnt + 1
qs' <- if cnt == 0 then pure qs else addQueueLengths qs cl
pure (subCnt + cnt, cnts', clCnt', qs')
clientTBQueueLengths' :: Foldable t => t (Client s) -> IO (Natural, Natural, Natural)
clientTBQueueLengths' = foldM addQueueLengths (0, 0, 0)
addQueueLengths (!rl, !sl, !ml) cl = do
(rl', sl', ml') <- queueLengths cl
pure (rl + rl', sl + sl', ml + ml')
queueLengths Client {rcvQ, sndQ, msgQ} = do
rl <- atomically $ lengthTBQueue rcvQ
sl <- atomically $ lengthTBQueue sndQ
ml <- atomically $ lengthTBQueue msgQ
pure (rl, sl, ml)
countSMPSubs :: Map QueueId Sub -> IO (Int, Int, Int, Int)
countSMPSubs = foldM countSubs (0, 0, 0, 0)
where
countSubs (c1, c2, c3, c4) Sub {subThread} = case subThread of
ServerSub t -> do
st <- readTVarIO t
pure $ case st of
NoSub -> (c1 + 1, c2, c3, c4)
SubPending -> (c1, c2 + 1, c3, c4)
SubThread _ -> (c1, c2, c3 + 1, c4)
ProhibitSub -> pure (c1, c2, c3, c4 + 1)
CPDelete qId -> withAdminRole $ unliftIO u $ do
st <- asks msgStore
r <- liftIO $ runExceptT $ do
(q, _) <- ExceptT $ getSenderQueue st qId
ExceptT $ deleteQueueSize st q
case r of
Left e -> liftIO $ hPutStrLn h $ "error: " <> show e
Right (qr, numDeleted) -> do
updateDeletedStats qr
liftIO $ hPutStrLn h $ "ok, " <> show numDeleted <> " messages deleted"
CPStatus qId -> withUserRole $ unliftIO u $ do
st <- asks msgStore
q <- liftIO $ getSenderQueue st qId
liftIO $ hPutStrLn h $ case q of
Left e -> "error: " <> show e
Right (_, QueueRec {queueMode, status, updatedAt}) ->
"status: " <> show status <> ", updatedAt: " <> show updatedAt <> ", queueMode: " <> show queueMode
CPBlock qId info -> withUserRole $ unliftIO u $ do
st <- asks msgStore
stats <- asks serverStats
blocked <- liftIO $ readIORef $ qBlocked stats
let quota = dailyBlockQueueQuota cfg
if blocked >= quota && quota /= 0
then liftIO $ hPutStrLn h $ "error: reached limit of " <> show quota <> " queues blocked daily"
else do
r <- liftIO $ runExceptT $ do
(q, QueueRec {status}) <- ExceptT $ getSenderQueue st qId
let rId = recipientId q