1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277
|
{- Copyright 2017 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU AGPL version 3 or higher.
-}
{-# LANGUAGE OverloadedStrings, BangPatterns #-}
module Server where
import Types
import CmdLine
import WebSockets
import SessionID
import Log
import Network.Wai
import Network.Wai.Handler.Warp
import Network.Wai.Handler.WebSockets
import Network.WebSockets hiding (Message)
import qualified Network.WebSockets as WS
import Network.HTTP.Types
import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.STM.TMChan
import Control.Concurrent.Async
import Control.Exception
import Control.Monad
import Data.Maybe
import qualified Data.Map as M
import qualified Data.Text as T
import Data.Time.Clock.POSIX
import System.IO
import System.Directory
import System.Mem.Weak
import Network.Mail.Mime
import System.Environment
type ServerState = M.Map SessionID Session
newServerState :: IO (TVar ServerState)
newServerState = newTVarIO M.empty
-- | A session consists of a broadcast TMChan, which both users and
-- developers write messages to. Writes are stored in the log file,
-- and a log lock allows atomic access to the log file for replays.
data Session = Session (TMChan (Broadcast Log)) (TVar Handle) (TMVar LogLock)
data LogLock = LogLock
-- | A broadcast message, with the ThreadId of the sending thread
-- (which probably wants to ignore the message it sent).
data Broadcast a = Broadcast a (Weak ThreadId)
newSession :: TVar Handle -> IO Session
newSession loghv = Session
<$> newBroadcastTMChanIO
<*> pure loghv
<*> newTMVarIO LogLock
listenSession :: Session -> STM (TMChan (Broadcast Log))
listenSession (Session bchan _ _) = dupTMChan bchan
-- | While writing a log to the session the LogLock is drained until
-- the write has reached the log file. This prevents concurrent writes
-- to the file, and allows writes to be blocked while reading the log file.
writeSession :: Weak ThreadId -> Session -> Log -> IO ()
writeSession tid (Session bchan loghv loglock) l = do
(ll, logh) <- atomically $ (,)
<$> takeTMVar loglock
<*> readTVar loghv
writeLogHandle l logh
atomically $ do
putTMVar loglock ll
writeTMChan bchan (Broadcast l tid)
-- | Run an action with the log file quiescent (and its write handle closed),
-- and nothing being added to the session's broadcast TMChan.
preventWriteWhile :: Session -> ServerOpts -> SessionID -> IO a -> IO a
preventWriteWhile (Session _ loghv loglock) o sid a = bracket setup cleanup go
where
setup = do
(ll, logh) <- atomically $ (,)
<$> takeTMVar loglock
<*> readTVar loghv
hClose logh
return ll
cleanup ll = do
let f = sessionLogFile (serverDirectory o) sid
h <- openFile f AppendMode
atomically $ do
putTMVar loglock ll
writeTVar loghv h
go _ = a
closeSession :: Session -> STM ()
closeSession (Session bchan _ _) = closeTMChan bchan
server :: ServerOpts -> IO ()
server o = do
o' <- checkEnv o
runSettings settings . app o' =<< newServerState
where
settings =
-- Prefer IPv6 but allow IPv4 as well
-- (Workaround for
-- https://github.com/jaspervdj/websockets/issues/140)
setHost "*6" $
setPort (serverPort o) $
defaultSettings
checkEnv :: ServerOpts -> IO ServerOpts
checkEnv o = go <$> lookupEnv "DEBUG_ME_FROM_EMAIL"
where
go Nothing = o
go (Just email) = o { serverEmail = Just (T.pack email) }
app :: ServerOpts -> TVar ServerState -> Application
app o ssv = websocketsOr connectionOptions (websocketApp o ssv) webapp
where
webapp _ respond = respond $
responseLBS status400 [] "This is a debug-me server, it does not serve any html pages."
websocketApp :: ServerOpts -> TVar ServerState -> WS.ServerApp
websocketApp o ssv pending_conn = do
conn <- WS.acceptRequest pending_conn
_v <- negotiateWireVersion conn
r <- receiveData conn
case r of
SelectMode ClientSends (InitMode email) -> user email o ssv conn
SelectMode ClientSends (ConnectMode t) ->
case mkSessionID (T.unpack t) of
Nothing -> protocolError conn "Invalid session id!"
Just sid -> developer o ssv sid conn
_ -> protocolError conn "Expected SelectMode"
user :: EmailAddress -> ServerOpts -> TVar ServerState -> WS.Connection -> IO ()
user email o ssv conn = do
sid <- withSessionID (serverDirectory o) $ \(loghv, sid) -> do
sendBinaryData conn (Ready ServerSends sid)
bracket (setup sid loghv) (cleanup sid) go
return sid
doneSessionLog email o sid
where
setup sid loghv = do
session <- newSession loghv
atomically $ modifyTVar' ssv $ M.insert sid session
return session
cleanup sid session = do
atomically $ do
closeSession session
modifyTVar' ssv $ M.delete sid
go session = do
mytid <- mkWeakThreadId =<< myThreadId
userchan <- atomically $ listenSession session
_ <- relaytouser userchan
`race` relayfromuser mytid session
return ()
-- Relay all messages from the user's websocket to the
-- session broadcast channel.
-- (The user is allowed to send Developer messages too.. perhaps
-- they got them from a developer connected to them some other
-- way.)
relayfromuser mytid session = relayFromSocket conn $ \msg -> do
l <- mkLog msg <$> getPOSIXTime
writeSession mytid session l
-- Relay Developer messages from the channel to the user's websocket.
relaytouser userchan = do
v <- atomically $ readTMChan userchan
case v of
Just (Broadcast l _from) -> case loggedMessage l of
Developer m -> do
sendBinaryData conn (AnyMessage (Developer m))
relaytouser userchan
User _ -> relaytouser userchan
Nothing -> return ()
developer :: ServerOpts -> TVar ServerState -> SessionID -> WS.Connection -> IO ()
developer o ssv sid conn = bracket setup cleanup go
where
setup = atomically $ M.lookup sid <$> readTVar ssv
cleanup _ = return ()
go Nothing = do
exists <- doesFileExist $
sessionLogFile (serverDirectory o) sid
if exists
then do
sendBinaryData conn (Ready ServerSends sid)
replayBacklog o sid conn
sendBinaryData conn Done
else protocolError conn "Unknown session ID"
go (Just session) = do
sendBinaryData conn (Ready ServerSends sid)
devchan <- replayBacklogAndListen o sid session conn
mytid <- mkWeakThreadId =<< myThreadId
_ <- relayfromdeveloper mytid session
`concurrently` relaytodeveloper mytid devchan
return ()
-- Relay all Developer amessages from the developer's websocket
-- to the broadcast channel.
relayfromdeveloper mytid session = relayFromSocket conn
$ \msg -> case msg of
Developer _ -> do
l <- mkLog msg <$> getPOSIXTime
writeSession mytid session l
-- developer cannot send User messages
User _ -> return ()
-- Relay user messages from the developer's clone of the
-- broadcast channel to the developer's websocket.
relaytodeveloper mytid devchan = do
v <- atomically $ readTMChan devchan
case v of
Just (Broadcast l from) -> do
let sendit = sendBinaryData conn
(AnyMessage $ loggedMessage l)
case loggedMessage l of
User _ -> sendit
-- Relay messages from other
-- developers, without looping
-- back the developer's own messages.
Developer _ -> do
rfrom <- deRefWeak from
rmy <- deRefWeak mytid
if rfrom == rmy
then return ()
else sendit
relaytodeveloper mytid devchan
Nothing -> do
sendBinaryData conn Done
return ()
-- | Replay the log of what's happened in the session so far,
-- and return a channel that will get new session activity.
--
-- This is done atomically; even if new activity arrives while it's
-- running nothing more will be logged until the log file has been
-- replayed and the channel set up.
--
-- Note that the session may appear to freeze for other users while
-- this is running.
replayBacklogAndListen :: ServerOpts -> SessionID -> Session -> WS.Connection -> IO (TMChan (Broadcast Log))
replayBacklogAndListen o sid session conn =
preventWriteWhile session o sid $ do
replayBacklog o sid conn
atomically $ listenSession session
replayBacklog :: ServerOpts -> SessionID -> WS.Connection -> IO ()
replayBacklog o sid conn = do
ls <- streamLog (sessionLogFile (serverDirectory o) sid)
forM_ ls $ \l -> case loggedMessage <$> l of
Right m -> sendBinaryData conn (AnyMessage m)
Left _ -> return ()
doneSessionLog :: EmailAddress -> ServerOpts -> SessionID -> IO ()
doneSessionLog email o sid = do
let logfile = sessionLogFile (serverDirectory o) sid
emailSessionLog email o logfile
if serverDeleteOldLogs o
then removeFile logfile
else return ()
emailSessionLog :: EmailAddress -> ServerOpts -> FilePath -> IO ()
emailSessionLog email o logfile
| isemail = renderSendMail
=<< simpleMail to from subject body body [("text/plain", logfile)]
| otherwise = return ()
where
to = Address Nothing email
from = Address Nothing $ fromMaybe "postmaster" (serverEmail o)
subject = "Your recent debug-me session"
body = "Attached is the log from your recent debug-me session."
isemail = "@" `T.isInfixOf` email
|