File: PubSubTest.hs

package info (click to toggle)
haskell-hedis 0.15.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 372 kB
  • sloc: haskell: 3,857; makefile: 2
file content (173 lines) | stat: -rw-r--r-- 7,058 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
{-# LANGUAGE CPP, OverloadedStrings, DeriveDataTypeable #-}
module PubSubTest (testPubSubThreaded) where

import Control.Concurrent
import Control.Monad
import Control.Concurrent.Async
import Control.Exception
import Data.Typeable
import qualified Data.List
import Data.Text
import Data.ByteString
import Control.Concurrent.STM
import qualified Test.Framework as Test
import qualified Test.Framework.Providers.HUnit as Test (testCase)
import qualified Test.HUnit as HUnit

import Database.Redis

testPubSubThreaded :: [Connection -> Test.Test]
testPubSubThreaded = [removeAllTest, callbackErrorTest, removeFromUnregister]

-- | A handler label to be able to distinguish the handlers from one another
-- to help make sure we unregister the correct handler.
type HandlerLabel = Text

data TestMsg = MsgFromChannel HandlerLabel ByteString
             | MsgFromPChannel HandlerLabel RedisChannel ByteString
  deriving (Show, Eq)

type MsgVar = TVar [TestMsg]

-- | A handler that just writes the message into the TVar
handler :: HandlerLabel -> MsgVar -> MessageCallback
handler label ref msg = atomically $
  modifyTVar ref $ \x -> x ++ [MsgFromChannel label msg]

-- | A pattern handler that just writes the message into the TVar
phandler :: HandlerLabel -> MsgVar -> PMessageCallback
phandler label ref chan msg = atomically $
  modifyTVar ref $ \x -> x ++ [MsgFromPChannel label chan msg]

-- | Wait for a given message to be received
waitForMessage :: MsgVar -> HandlerLabel -> ByteString -> IO ()
waitForMessage ref label msg = atomically $ do
  let expected = MsgFromChannel label msg
  lst <- readTVar ref
  unless (expected `Prelude.elem` lst) retry
  writeTVar ref $ Prelude.filter (/= expected) lst

-- | Wait for a given pattern message to be received
waitForPMessage :: MsgVar -> HandlerLabel -> RedisChannel -> ByteString -> IO ()
waitForPMessage ref label chan msg = atomically $ do
  let expected = MsgFromPChannel label chan msg
  lst <- readTVar ref
  unless (expected `Prelude.elem` lst) retry
  writeTVar ref $ Prelude.filter (/= expected) lst

expectRedisChannels :: Connection -> [RedisChannel] -> IO ()
expectRedisChannels conn expected = do
  actual <- runRedis conn $ sendRequest ["PUBSUB", "CHANNELS"]
  case actual of
    Left err -> HUnit.assertFailure $ "Error geting channels: " ++ show err
    Right s -> HUnit.assertEqual "redis channels" (Data.List.sort s) (Data.List.sort expected)

-- | Test basic messages, plus using removeChannels
removeAllTest :: Connection -> Test.Test
removeAllTest conn = Test.testCase "Multithreaded Pub/Sub - basic" $ do
  msgVar <- newTVarIO []
  initialComplete <- newTVarIO False
  ctrl <- newPubSubController [("foo1", handler "InitialFoo1" msgVar), ("foo2", handler "InitialFoo2" msgVar)]
                              [("bar1:*", phandler "InitialBar1" msgVar), ("bar2:*", phandler "InitialBar2" msgVar)]
  withAsync (pubSubForever conn ctrl (atomically $ writeTVar initialComplete True)) $ \_ -> do
    -- wait for initial
    atomically $ readTVar initialComplete >>= \b -> if b then return () else retry
    expectRedisChannels conn ["foo1", "foo2"]

    runRedis conn $ publish "foo1" "Hello"
    waitForMessage msgVar "InitialFoo1" "Hello"

    runRedis conn $ publish "bar2:zzz" "World"
    waitForPMessage msgVar "InitialBar2" "bar2:zzz" "World"

    -- subscribe to foo1 and bar1 again
    addChannelsAndWait ctrl [("foo1", handler "NewFoo1" msgVar)] [("bar1:*", phandler "NewBar1" msgVar)]
    expectRedisChannels conn ["foo1", "foo2"]

    runRedis conn $ publish "foo1" "abcdef"
    waitForMessage msgVar "InitialFoo1" "abcdef"
    waitForMessage msgVar "NewFoo1" "abcdef"

    -- unsubscribe from foo1 and bar1
    removeChannelsAndWait ctrl ["foo1", "unusued"] ["bar1:*", "unused:*"]
    expectRedisChannels conn ["foo2"]

    -- foo2 and bar2 are still subscribed
    runRedis conn $ publish "foo2" "12345"
    waitForMessage msgVar "InitialFoo2" "12345"

    runRedis conn $ publish "bar2:aaa" "0987"
    waitForPMessage msgVar "InitialBar2" "bar2:aaa" "0987"

data TestError = TestError ByteString
  deriving (Eq, Show, Typeable)
instance Exception TestError

-- | Test an error thrown from a message handler
callbackErrorTest :: Connection -> Test.Test
callbackErrorTest conn = Test.testCase "Multithreaded Pub/Sub - error in handler" $ do
  initialComplete <- newTVarIO False
  ctrl <- newPubSubController [("foo", throwIO . TestError)] []

  thread <- async (pubSubForever conn ctrl (atomically $ writeTVar initialComplete True))
  atomically $ readTVar initialComplete >>= \b -> if b then return () else retry

  runRedis conn $ publish "foo" "Hello"

  ret <- waitCatch thread
  case ret of
    Left (SomeException e) | cast e == Just (TestError "Hello") -> return ()
    _ -> HUnit.assertFailure $ "Did not properly throw error from message thread " ++ show ret

-- | Test removing channels by using the return value of 'addHandlersAndWait'.
removeFromUnregister :: Connection -> Test.Test
removeFromUnregister conn = Test.testCase "Multithreaded Pub/Sub - unregister handlers" $ do
  msgVar <- newTVarIO []
  initialComplete <- newTVarIO False
  ctrl <- newPubSubController [] []
  withAsync (pubSubForever conn ctrl (atomically $ writeTVar initialComplete True)) $ \_ -> do
    atomically $ readTVar initialComplete >>= \b -> if b then return () else retry

    -- register to some channels
    void $ addChannelsAndWait ctrl
        [("abc", handler "InitialAbc" msgVar), ("xyz", handler "InitialXyz" msgVar)]
        [("def:*", phandler "InitialDef" msgVar), ("uvw", phandler "InitialUvw" msgVar)]
    expectRedisChannels conn ["abc", "xyz"]

    runRedis conn $ publish "abc" "Hello"
    waitForMessage msgVar "InitialAbc" "Hello"

    -- register to some more channels
    unreg <- addChannelsAndWait ctrl
        [("abc", handler "SecondAbc"  msgVar), ("123", handler "Second123" msgVar)]
        [("def:*", phandler "SecondDef" msgVar), ("890:*", phandler "Second890" msgVar)]
    expectRedisChannels conn ["abc", "xyz", "123"]

    -- check messages on all channels
    runRedis conn $ publish "abc" "World"
    waitForMessage msgVar "InitialAbc" "World"
    waitForMessage msgVar "SecondAbc" "World"

    runRedis conn $ publish "123" "World2"
    waitForMessage msgVar "Second123" "World2"

    runRedis conn $ publish "def:bbbb" "World3"
    waitForPMessage msgVar "InitialDef" "def:bbbb" "World3"
    waitForPMessage msgVar "SecondDef" "def:bbbb" "World3"

    runRedis conn $ publish "890:tttt" "World4"
    waitForPMessage msgVar "Second890" "890:tttt" "World4"

    -- unregister
    unreg

    -- we have no way of waiting until unregister actually happened, so just delay and hope
    threadDelay $ 1000*1000 -- 1 second
    expectRedisChannels conn ["abc", "xyz"]

    -- now only initial should be around. In particular, abc should still be subscribed
    runRedis conn $ publish "abc" "World5"
    waitForMessage msgVar "InitialAbc" "World5"

    runRedis conn $ publish "def:cccc" "World6"
    waitForPMessage msgVar "InitialDef" "def:cccc" "World6"