diff options
author | sternenseemann <0rpkxez4ksa01gb3typccl0i@systemli.org> | 2020-12-27 00:48:52 +0100 |
---|---|---|
committer | sternenseemann <0rpkxez4ksa01gb3typccl0i@systemli.org> | 2020-12-27 00:48:52 +0100 |
commit | d00295594936aa3ff3e5a08e39de54f6ec4e5305 (patch) | |
tree | c3809068b83533cb222f815cd24acf420c8232c3 | |
parent | 9629feffd76ec19e215ba556304b5514229156dc (diff) |
feat(server): implement basic client server communication
* server will receive packets from clients and assign them a modifier * server will send clients the world and subsequent updates Missing: * implement disconnect mechanism, timeout mechanism, server will keep trying to reach clients * test actual input from client * give client something to control
-rw-r--r-- | debug-client/Main.hs | 39 | ||||
-rw-r--r-- | grav2ty.cabal | 13 | ||||
-rw-r--r-- | lib/Grav2ty/Control.hs | 13 | ||||
-rw-r--r-- | lib/Grav2ty/Protocol.hs | 94 | ||||
-rw-r--r-- | server/Main.hs | 176 |
5 files changed, 277 insertions, 58 deletions
diff --git a/debug-client/Main.hs b/debug-client/Main.hs new file mode 100644 index 0000000..b548eba --- /dev/null +++ b/debug-client/Main.hs @@ -0,0 +1,39 @@ +module Main where + +import Control.Monad (forever) +import Data.Attoparsec.ByteString (parseOnly) +import Network.Socket +import Network.Socket.ByteString +import System.Environment +import System.Posix.Signals + +import Grav2ty.Protocol + +grav2tyConnect :: String -> String -> IO () +grav2tyConnect host port = do + let hints = defaultHints { addrFlags = [AI_NUMERICSERV], addrSocketType = Datagram } + addr:_ <- getAddrInfo (Just hints) (Just host) (Just port) + print addr + sock <- socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr) + + installHandler sigTERM (Catch (close sock)) Nothing + installHandler sigINT (Catch (close sock)) Nothing + + connect sock (addrAddress addr) + + sendAll sock . renderMessage $ (ProtocolVersion protocolVersion :: Message Double) + + forever $ do + bytes <- recv sock (1024^2 * 100) + + case parseOnly messageParser bytes of + Left e -> putStrLn $ "Parse error: " ++ e + Right m -> print (m :: Message Double) + +main :: IO () +main = do + args <- getArgs + + case args of + [ host, port ] -> grav2tyConnect host port + _ -> error "wrong usage" diff --git a/grav2ty.cabal b/grav2ty.cabal index 8db1402..3a4f1e4 100644 --- a/grav2ty.cabal +++ b/grav2ty.cabal @@ -66,9 +66,22 @@ executable grav2ty-server , clock^>=0.8 , stm^>=2.5 , async^>=2.2 + , unix^>=2.7 + , attoparsec^>=0.13 + , hashable^>=1.3 hs-source-dirs: server default-language: Haskell2010 +executable grav2ty-debug-client + main-is: Main.hs + build-depends: base >=4.12 && <5 + , grav2ty-lib + , attoparsec^>=0.13 + , unix^>=2.7 + , network^>=3.1 + hs-source-dirs: debug-client + default-language: Haskell2010 + test-suite lib-test type: exitcode-stdio-1.0 main-is: test/Main.hs diff --git a/lib/Grav2ty/Control.hs b/lib/Grav2ty/Control.hs index d6f4e4e..f7484d7 100644 --- a/lib/Grav2ty/Control.hs +++ b/lib/Grav2ty/Control.hs @@ -15,10 +15,15 @@ import Linear.Vector import qualified Data.Map.Strict as M data Grav2tyUpdate a - = DeleteObject Id -- ^ Delete an object - | NewObject (Object a) -- ^ Add an object - | UpdateObject Id (Object a) -- ^ Change object with given id to new value --- | TickDone Tick -- ^ Last update of a tick signifying it's fully computed + -- emitted by tickUpdates and communication + = DeleteObject Id -- ^ Delete an object + | NewObject (Object a) -- ^ Add an object + | UpdateObject Id (Object a) -- ^ add or change object with given id + -- emitted by protocol communication only + | UpdateMod Modifier (Modification a) -- ^ set or add given 'Modifier' + | SetWorld (World a) + | SetTick Tick + | SetTimePerTick Int -- ^ new tick length in microseconds deriving (Show, Eq, Ord) projectile :: RealFloat a => (V2 a, V2 a) -> Integer -> Object a -> Object a diff --git a/lib/Grav2ty/Protocol.hs b/lib/Grav2ty/Protocol.hs index 9c9d071..f6b737b 100644 --- a/lib/Grav2ty/Protocol.hs +++ b/lib/Grav2ty/Protocol.hs @@ -12,15 +12,16 @@ module Grav2ty.Protocol , ErrorType (..) , renderMessage , messageParser - , Flat (..) - -- * Protocol Logic - , serverUpdateState - , clientUpdateState + -- * Mappings between 'Message's and 'Grav2tyUpdate's + , messageUpdateClient + , messageUpdateServer + , updateMessageServer ) where import Prelude hiding (take) import Grav2ty.Core +import qualified Grav2ty.Control as GC (Grav2tyUpdate (..)) import Grav2ty.Util.Serialization import Control.Lens ((%=), (.=), use) @@ -64,8 +65,10 @@ data Message a | AssignMods [Modifier] | NewWorld Tick (World a) | NewObject Tick Id (Object a) - | UpdateMod (ModMap a) + | UpdateMod Tick Modifier (Modification a) | TimePerTick Int + | DeleteObject Tick Id +-- TODO readd RequestMods Int for spectator support plus multiple users connections deriving (Show, Eq, Ord, Generic, Flat) toMaybe :: Bool -> a -> Maybe a @@ -82,19 +85,27 @@ instance Flat a => ToPacket (Message a) where toPacket (AssignMods ids) = Packet 2 . flat . map (\(Mod i) -> i) . filter doesModify $ ids toPacket (NewWorld tick world) = Packet 3 $ flat (tick, world) toPacket (NewObject tick id obj) = Packet 4 $ flat (tick, id, obj) - toPacket (UpdateMod modmap) = Packet 5 (flat modmap) + toPacket (UpdateMod t m mf) = Packet 5 (flat (t, m, mf)) toPacket (TimePerTick t) = Packet 6 (flat t) + toPacket (DeleteObject t i) = Packet 7 $ flat (t, i) + fromPacket (Packet 0 v) = toMaybe (BS.length v == 1) (ProtocolVersion $ BS.head v) fromPacket (Packet 1 e) = Error <$> rightToMaybe (unflat e) fromPacket (Packet 2 m) = AssignMods . map Mod <$> rightToMaybe (unflat m) - fromPacket (Packet 3 w) = case unflat w of - Left _ -> Nothing - Right (tick, world) -> Just $ NewWorld tick world - fromPacket (Packet 4 o) = case unflat o of - Left _ -> Nothing - Right (tick, id, obj) -> Just $ NewObject tick id obj - fromPacket (Packet 5 m) = UpdateMod <$> rightToMaybe (unflat m) + fromPacket (Packet 3 w) = + case unflat w of + Left _ -> Nothing + Right (tick, world) -> Just $ NewWorld tick world + fromPacket (Packet 4 o) = + case unflat o of + Left _ -> Nothing + Right (tick, id, obj) -> Just $ NewObject tick id obj + fromPacket (Packet 5 m) = + case unflat m of + Left _ -> Nothing + Right (t, m, mf) -> Just $ UpdateMod t m mf fromPacket (Packet 6 t) = TimePerTick <$> rightToMaybe (unflat t) + fromPacket (Packet 7 m) = uncurry DeleteObject <$> rightToMaybe (unflat m) fromPacket (Packet _ _) = Nothing bytes :: Int64 -> [Word8] @@ -129,28 +140,35 @@ messageParser :: Flat a => Parser (Message a) messageParser = packetParser >>= maybe (fail "Packet is no valid message") pure . fromPacket --- | Process messages from a client connected via an already established connection. --- Updates the State and ensures that the client won't set anything it shouldn't --- be allowed to (like 'Modifier's it isn't assigned) -serverUpdateState :: Monad m => [Modifier] -> Message p -> Grav2ty p g m () -serverUpdateState _ (ProtocolVersion _) = pure () -- protocol version does not change -serverUpdateState _ (AssignMods _) = pure () -- can only be done by the server -serverUpdateState _ (NewWorld _ _) = pure () -- can only be sent by the server -serverUpdateState _ (NewObject _ _ _) = pure () -- can only be sent by the server -serverUpdateState _ (TimePerTick _) = pure () -- only updated by the server -serverUpdateState _ (Error _) = pure () -- TODO Error Handling, Client Errors -serverUpdateState mods (UpdateMod modmap) = inputs %= insertMods (`elem` mods) modmap - -clientUpdateState :: Monad m => [Modifier] -> Message p -> Grav2ty p g m () -clientUpdateState _ (ProtocolVersion _) = pure () -- protocol version does not change -clientUpdateState _ (Error _) = pure () -- TODO error handling here? -clientUpdateState _ (TimePerTick t) = timePerTick .= t -clientUpdateState mods (UpdateMod modmap) = inputs %= insertMods (not . (`elem` mods)) modmap -clientUpdateState _ (NewWorld t w) = tick .= t >> world .= w -clientUpdateState _ (NewObject t i o) = tick .= t >> (() <$ setObject (Just i) o) - -insertMods :: (Modifier -> Bool) -> ModMap a -> ModMap a -> ModMap a -insertMods test from into = M.foldlWithKey' (\into mod content -> - if test mod - then M.insert mod content into - else into) into from +clientTickUpdate :: Tick -> Tick -> [GC.Grav2tyUpdate a] +clientTickUpdate current tick = + if tick > current + then [GC.SetTick tick] + else [] + +messageUpdateClient :: Tick -> Message a -> [GC.Grav2tyUpdate a] +messageUpdateClient _ (NewWorld t w) = [GC.SetTick t, GC.SetWorld w] +messageUpdateClient current (NewObject t i o) = + if t < current + then [] + else GC.UpdateObject t o : clientTickUpdate current t +messageUpdateClient current (UpdateMod t m mf) = + if t < current + then [] + else GC.UpdateMod m mf : clientTickUpdate current t +messageUpdateClient _ (TimePerTick tm) = [GC.SetTimePerTick tm] +messageUpdateClient _ _ = [] + +messageUpdateServer :: [Modifier] -> Message a -> [GC.Grav2tyUpdate a] +-- TODO factor in tick +messageUpdateServer allowed (UpdateMod _ m mf) = + if m `elem` allowed then [GC.UpdateMod m mf] else [] +messageUpdateServer _ _ = [] + +updateMessageServer :: Tick -> GC.Grav2tyUpdate a -> Maybe (Message a) +updateMessageServer current (GC.DeleteObject i) = Just $ DeleteObject current i +updateMessageServer current (GC.UpdateObject i o) = Just $ NewObject current i o +updateMessageServer current (GC.UpdateMod m mf) = Just $ UpdateMod current m mf +updateMessageServer current (GC.SetWorld w) = Just $ NewWorld current w +updateMessageServer current (GC.SetTimePerTick tm) = Just $ TimePerTick tm +updateMessageServer current _ = Nothing diff --git a/server/Main.hs b/server/Main.hs index 94b0673..6c6a39c 100644 --- a/server/Main.hs +++ b/server/Main.hs @@ -2,21 +2,44 @@ module Main where import Grav2ty.Core import Grav2ty.Control +import qualified Grav2ty.Protocol as GP import Control.Concurrent (threadDelay) import Control.Concurrent.Async import Control.Concurrent.STM +import Control.Concurrent.STM.TVar import Control.Concurrent.STM.TMVar import Control.Concurrent.STM.TChan -import Control.Lens ((%=)) -import Control.Monad (forever, forM_) -import Control.Monad.IO.Class (liftIO) +import Control.Lens ((%=), use) +import Control.Monad (forM_, unless, forever) +import Control.Monad.IO.Class (liftIO, MonadIO ()) import Control.Monad.Trans.State.Lazy +import Data.Attoparsec.ByteString (parseOnly) +import Data.Hashable import qualified Data.Map as M +import Data.Maybe (isJust, maybeToList) import Linear.V2 +import Network.Socket +import Network.Socket.ByteString (recvFrom, sendManyTo) import System.Clock +import System.Posix.Signals (installHandler, Handler (..), sigTERM, sigINT) import System.Ticked +instance Hashable SockAddr where + hashWithSalt s (SockAddrUnix str) = s + `hashWithSalt` str `hashWithSalt` (1 :: Int) + hashWithSalt s (SockAddrInet p h) = s + `hashWithSalt` (2 :: Int) + `hashWithSalt` (fromIntegral p :: Int) + `hashWithSalt` h + hashWithSalt s (SockAddrInet6 p f h scope) = s + `hashWithSalt` (3 :: Int) + `hashWithSalt` (fromIntegral p :: Int) + `hashWithSalt` f + `hashWithSalt` h + `hashWithSalt` scope + + initialState :: Grav2tyState Double () initialState = Grav2tyState 0 (10^6) mempty () (M.fromList @@ -27,6 +50,12 @@ initialState = Grav2tyState 0 (10^6) mempty () data TickThreadMsg a = TickThreadUpdates [Grav2tyUpdate a] | TickThreadDone Tick TimeSpec + | TickThreadSendWorld SockAddr + deriving (Show, Eq, Ord) + +data NetThreadMsg a + = NetThreadBroadcast [GP.Message a] + | NetThreadSendTo SockAddr [GP.Message a] deriving (Show, Eq, Ord) processTick :: TMVar (Grav2tyState Double ()) -> TChan (TickThreadMsg Double) -> IO () @@ -42,19 +71,37 @@ processTick svar chan = do . writeTChan chan $ TickThreadDone (_tick state) (diffTimeSpec before after) -processUpdates :: TMVar (Grav2tyState Double ()) -> TChan (TickThreadMsg Double) +processUpdates :: TMVar (Grav2tyState Double ()) + -> TChan (TickThreadMsg Double) + -> TChan (NetThreadMsg Double) -> Grav2ty Double () IO () -processUpdates svar chan = forever $ do - msg <- liftIO . atomically $ readTChan chan - liftIO $ print msg +processUpdates svar tickChan netChan = forever $ do + msg <- liftIO . atomically $ readTChan tickChan case msg of TickThreadUpdates updates -> forM_ updates $ \u -> do - liftIO $ print u - case u of - DeleteObject i -> delObject i - UpdateObject i o -> setObject (Just i) o >> pure () - NewObject o -> addObject o >> pure () + update' <- + case u of + DeleteObject i -> delObject i >> pure u + UpdateObject i o -> setObject (Just i) o >> pure u + NewObject o -> do + i <- addObject o + pure $ UpdateObject i o + _ -> error "Update type not implemented" -- TODO + + tick <- use tick + case GP.updateMessageServer tick update' of + Just m -> liftIO . atomically + . writeTChan netChan $ NetThreadBroadcast [m] + Nothing -> pure () + + -- TODO this is a bit ugly actually and a lot of indirection + TickThreadSendWorld addr -> do + world <- use world + tick <- use tick + + liftIO . atomically . writeTChan netChan + $ NetThreadSendTo addr [GP.NewWorld tick world] TickThreadDone t timespec -> do tick %= (+ 1) state <- get @@ -62,10 +109,85 @@ processUpdates svar chan = forever $ do ++ show (toNanoSecs timespec) ++ "ns" liftIO . atomically $ putTMVar svar state +netOut :: Socket + -> TVar (M.Map SockAddr Modifier) + -> TChan (NetThreadMsg Double) + -> IO () +netOut s clients netChan = forever $ do + cmd <- atomically $ readTChan netChan + + case cmd of + NetThreadBroadcast msgs -> readTVarIO clients >>= \addrs -> do + flip M.traverseWithKey addrs $ \addr _ -> + sendManyTo s (map GP.renderMessage msgs) addr + pure () + NetThreadSendTo addr msgs -> + sendManyTo s (map GP.renderMessage msgs) addr + +-- 100 MB +maxPacketLen :: Int +maxPacketLen = 100 * 1024^2 + +modifierForAddr :: SockAddr -> Modifier +modifierForAddr = Mod . fromIntegral . hash + +netIn :: Socket + -> TVar (M.Map SockAddr Modifier) + -> TChan (NetThreadMsg Double) + -> TChan (TickThreadMsg Double) + -> IO () +netIn s clients netChan tickChan = forever $ do + (bytes, addr) <- recvFrom s maxPacketLen + + case parseOnly GP.messageParser bytes of + Left err -> do + putStrLn $ "Could not parse message from " ++ show addr ++ ": " ++ err + atomically . writeTChan netChan + $ NetThreadSendTo addr [GP.Error GP.ErrorNoParse] + Right (GP.ProtocolVersion v) -> + if v == GP.protocolVersion + then let modifier = modifierForAddr addr + in atomically $ do + modifyTVar clients $ M.insert addr modifier + writeTChan tickChan $ TickThreadSendWorld addr + writeTChan netChan + $ NetThreadSendTo addr [GP.AssignMods [modifier]] + else do + putStrLn $ "Incompatible protocol version from " ++ show addr + atomically . writeTChan netChan + $ NetThreadSendTo addr [GP.Error GP.ErrorVersionMismatch] + Right x -> do + clientMod <- maybeToList . M.lookup addr <$> readTVarIO clients + atomically . writeTChan tickChan . TickThreadUpdates + $ GP.messageUpdateServer clientMod x + +getGrav2tyAddr :: IO AddrInfo +getGrav2tyAddr = head <$> getAddrInfo hints (Just "::") (Just "2001") + where hints = Just $ defaultHints + { addrFlags = [ AI_NUMERICSERV ] + , addrFamily = AF_INET6 + , addrSocketType = Datagram + } + main :: IO () main = do + addr <- getGrav2tyAddr + sock <- socket (addrFamily addr) (addrSocketType addr) $ addrProtocol addr + setSocketOption sock IPv6Only 0 + + bind sock (addrAddress addr) + putStrLn $ "Listening on " ++ show (addrAddress addr) + + shutdown <- newEmptyTMVarIO + + _ <- installHandler sigTERM (Catch (atomically $ putTMVar shutdown ())) Nothing + _ <- installHandler sigINT (Catch (atomically $ putTMVar shutdown ())) Nothing + stateForTick <- newTMVarIO initialState - tickChan <- newTChanIO + clients <- newTVarIO mempty + + tickChan <- atomically $ newBroadcastTChan + netOutChan <- atomically $ newBroadcastTChan tickThreads <- async . runTicked (_timePerTick initialState) @@ -74,11 +196,33 @@ main = do link tickThreads stateThread <- async $ do - execStateT (processUpdates stateForTick tickChan) initialState + tickChan' <- atomically $ dupTChan tickChan + execStateT (processUpdates stateForTick tickChan' netOutChan) initialState pure () link stateThread + + netInThread <- async $ netIn sock clients netOutChan tickChan + + netOutThread <- async $ do + netOutChan' <- atomically $ dupTChan netOutChan + netOut sock clients netOutChan' + pure () + + link netOutThread + link2 tickThreads stateThread + link2 stateThread netOutThread + + -- wait for shutdown signal + atomically $ takeTMVar shutdown + + -- stop tickThreads + cancel tickThreads + + -- close socket after killing netThreads + cancel netOutThread + cancel netInThread + close sock - wait tickThreads - wait stateThread + cancel stateThread |