about summary refs log tree commit diff
diff options
context:
space:
mode:
authorsternenseemann <0rpkxez4ksa01gb3typccl0i@systemli.org>2020-12-27 00:48:52 +0100
committersternenseemann <0rpkxez4ksa01gb3typccl0i@systemli.org>2020-12-27 00:48:52 +0100
commitd00295594936aa3ff3e5a08e39de54f6ec4e5305 (patch)
treec3809068b83533cb222f815cd24acf420c8232c3
parent9629feffd76ec19e215ba556304b5514229156dc (diff)
feat(server): implement basic client server communication
* server will receive packets from clients and assign them a modifier
* server will send clients the world and subsequent updates

Missing:
* implement disconnect mechanism, timeout mechanism, server will keep
  trying to reach clients
* test actual input from client
* give client something to control
-rw-r--r--debug-client/Main.hs39
-rw-r--r--grav2ty.cabal13
-rw-r--r--lib/Grav2ty/Control.hs13
-rw-r--r--lib/Grav2ty/Protocol.hs94
-rw-r--r--server/Main.hs176
5 files changed, 277 insertions, 58 deletions
diff --git a/debug-client/Main.hs b/debug-client/Main.hs
new file mode 100644
index 0000000..b548eba
--- /dev/null
+++ b/debug-client/Main.hs
@@ -0,0 +1,39 @@
+module Main where
+
+import Control.Monad (forever)
+import Data.Attoparsec.ByteString (parseOnly)
+import Network.Socket
+import Network.Socket.ByteString
+import System.Environment
+import System.Posix.Signals
+
+import Grav2ty.Protocol
+
+grav2tyConnect :: String -> String -> IO ()
+grav2tyConnect host port = do
+  let hints = defaultHints { addrFlags = [AI_NUMERICSERV], addrSocketType = Datagram }
+  addr:_ <- getAddrInfo (Just hints) (Just host) (Just port)
+  print addr
+  sock <- socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr)
+
+  installHandler sigTERM (Catch (close sock)) Nothing
+  installHandler sigINT  (Catch (close sock)) Nothing
+
+  connect sock (addrAddress addr)
+
+  sendAll sock . renderMessage $ (ProtocolVersion protocolVersion :: Message Double)
+
+  forever $ do
+    bytes <- recv sock (1024^2 * 100)
+
+    case parseOnly messageParser bytes of
+      Left e -> putStrLn $ "Parse error: " ++ e
+      Right m -> print (m :: Message Double)
+
+main :: IO ()
+main = do
+  args <- getArgs
+
+  case args of
+    [ host, port ] -> grav2tyConnect host port
+    _ -> error "wrong usage"
diff --git a/grav2ty.cabal b/grav2ty.cabal
index 8db1402..3a4f1e4 100644
--- a/grav2ty.cabal
+++ b/grav2ty.cabal
@@ -66,9 +66,22 @@ executable grav2ty-server
                      , clock^>=0.8
                      , stm^>=2.5
                      , async^>=2.2
+                     , unix^>=2.7
+                     , attoparsec^>=0.13
+                     , hashable^>=1.3
   hs-source-dirs:      server
   default-language:    Haskell2010
 
+executable grav2ty-debug-client
+  main-is:             Main.hs
+  build-depends:       base >=4.12 && <5
+                     , grav2ty-lib
+                     , attoparsec^>=0.13
+                     , unix^>=2.7
+                     , network^>=3.1
+  hs-source-dirs:      debug-client
+  default-language:    Haskell2010
+
 test-suite lib-test
   type:                exitcode-stdio-1.0
   main-is:             test/Main.hs
diff --git a/lib/Grav2ty/Control.hs b/lib/Grav2ty/Control.hs
index d6f4e4e..f7484d7 100644
--- a/lib/Grav2ty/Control.hs
+++ b/lib/Grav2ty/Control.hs
@@ -15,10 +15,15 @@ import Linear.Vector
 import qualified Data.Map.Strict as M
 
 data Grav2tyUpdate a
-  = DeleteObject Id              -- ^ Delete an object
-  | NewObject (Object a)         -- ^ Add an object
-  | UpdateObject Id (Object a)   -- ^ Change object with given id to new value
---  | TickDone Tick                -- ^ Last update of a tick signifying it's fully computed
+  -- emitted by tickUpdates and communication
+  = DeleteObject Id                     -- ^ Delete an object
+  | NewObject (Object a)                -- ^ Add an object
+  | UpdateObject Id (Object a)          -- ^ add or change object with given id
+  -- emitted by protocol communication only
+  | UpdateMod Modifier (Modification a) -- ^ set or add given 'Modifier'
+  | SetWorld (World a)
+  | SetTick Tick
+  | SetTimePerTick Int               -- ^ new tick length in microseconds
   deriving (Show, Eq, Ord)
 
 projectile :: RealFloat a => (V2 a, V2 a) -> Integer -> Object a -> Object a
diff --git a/lib/Grav2ty/Protocol.hs b/lib/Grav2ty/Protocol.hs
index 9c9d071..f6b737b 100644
--- a/lib/Grav2ty/Protocol.hs
+++ b/lib/Grav2ty/Protocol.hs
@@ -12,15 +12,16 @@ module Grav2ty.Protocol
   , ErrorType (..)
   , renderMessage
   , messageParser
-  , Flat (..)
-  -- * Protocol Logic
-  , serverUpdateState
-  , clientUpdateState
+    -- * Mappings between 'Message's and 'Grav2tyUpdate's
+  , messageUpdateClient
+  , messageUpdateServer
+  , updateMessageServer
   ) where
 
 import Prelude hiding (take)
 
 import Grav2ty.Core
+import qualified Grav2ty.Control as GC (Grav2tyUpdate (..))
 import Grav2ty.Util.Serialization
 
 import Control.Lens ((%=), (.=), use)
@@ -64,8 +65,10 @@ data Message a
   | AssignMods [Modifier]
   | NewWorld Tick (World a)
   | NewObject Tick Id (Object a)
-  | UpdateMod (ModMap a)
+  | UpdateMod Tick Modifier (Modification a)
   | TimePerTick Int
+  | DeleteObject Tick Id
+-- TODO readd RequestMods Int for spectator support plus multiple users connections
   deriving (Show, Eq, Ord, Generic, Flat)
 
 toMaybe :: Bool -> a -> Maybe a
@@ -82,19 +85,27 @@ instance Flat a => ToPacket (Message a) where
   toPacket (AssignMods ids) = Packet 2 . flat . map (\(Mod i) -> i) . filter doesModify $ ids
   toPacket (NewWorld tick world) = Packet 3 $ flat (tick, world)
   toPacket (NewObject tick id obj) = Packet 4 $ flat (tick, id, obj)
-  toPacket (UpdateMod modmap) = Packet 5 (flat modmap)
+  toPacket (UpdateMod t m mf) = Packet 5 (flat (t, m, mf))
   toPacket (TimePerTick t) = Packet 6 (flat t)
+  toPacket (DeleteObject t i) = Packet 7 $ flat (t, i)
+
   fromPacket (Packet 0 v) = toMaybe (BS.length v == 1) (ProtocolVersion $ BS.head v)
   fromPacket (Packet 1 e) = Error <$> rightToMaybe (unflat e)
   fromPacket (Packet 2 m) = AssignMods . map Mod <$> rightToMaybe (unflat m)
-  fromPacket (Packet 3 w) = case unflat w of
-                              Left _ -> Nothing
-                              Right (tick, world) -> Just $ NewWorld tick world
-  fromPacket (Packet 4 o) = case unflat o of
-                              Left _ -> Nothing
-                              Right (tick, id, obj) -> Just $ NewObject tick id obj
-  fromPacket (Packet 5 m) = UpdateMod <$> rightToMaybe (unflat m)
+  fromPacket (Packet 3 w) =
+    case unflat w of
+      Left _ -> Nothing
+      Right (tick, world) -> Just $ NewWorld tick world
+  fromPacket (Packet 4 o) =
+    case unflat o of
+      Left _ -> Nothing
+      Right (tick, id, obj) -> Just $ NewObject tick id obj
+  fromPacket (Packet 5 m) =
+    case unflat m of
+      Left _ -> Nothing
+      Right (t, m, mf) -> Just $ UpdateMod t m mf
   fromPacket (Packet 6 t) = TimePerTick <$> rightToMaybe (unflat t)
+  fromPacket (Packet 7 m) = uncurry DeleteObject <$> rightToMaybe (unflat m)
   fromPacket (Packet _ _) = Nothing
 
 bytes :: Int64 -> [Word8]
@@ -129,28 +140,35 @@ messageParser :: Flat a => Parser (Message a)
 messageParser = packetParser >>=
   maybe (fail "Packet is no valid message") pure . fromPacket
 
--- | Process messages from a client connected via an already established connection.
---   Updates the State and ensures that the client won't set anything it shouldn't
---   be allowed to (like 'Modifier's it isn't assigned)
-serverUpdateState :: Monad m => [Modifier] -> Message p -> Grav2ty p g m ()
-serverUpdateState _ (ProtocolVersion _) = pure () -- protocol version does not change
-serverUpdateState _ (AssignMods _) = pure ()      -- can only be done by the server
-serverUpdateState _ (NewWorld _ _) = pure ()      -- can only be sent by the server
-serverUpdateState _ (NewObject _ _ _) = pure ()   -- can only be sent by the server
-serverUpdateState _ (TimePerTick _) = pure ()     -- only updated by the server
-serverUpdateState _ (Error _) = pure ()           -- TODO Error Handling, Client Errors
-serverUpdateState mods (UpdateMod modmap) = inputs %= insertMods (`elem` mods) modmap
-
-clientUpdateState :: Monad m => [Modifier] -> Message p -> Grav2ty p g m ()
-clientUpdateState _ (ProtocolVersion _) = pure () -- protocol version does not change
-clientUpdateState _ (Error _) = pure ()           -- TODO error handling here?
-clientUpdateState _ (TimePerTick t) = timePerTick .= t
-clientUpdateState mods (UpdateMod modmap) = inputs %= insertMods (not . (`elem` mods)) modmap
-clientUpdateState _ (NewWorld t w) = tick .= t >> world .= w
-clientUpdateState _ (NewObject t i o) = tick .= t >> (() <$ setObject (Just i) o)
-
-insertMods :: (Modifier -> Bool) -> ModMap a -> ModMap a -> ModMap a
-insertMods test from into = M.foldlWithKey' (\into mod content ->
-  if test mod
-     then M.insert mod content into
-     else into) into from
+clientTickUpdate :: Tick -> Tick -> [GC.Grav2tyUpdate a]
+clientTickUpdate current tick =
+  if tick > current
+    then [GC.SetTick tick]
+    else []
+
+messageUpdateClient :: Tick -> Message a -> [GC.Grav2tyUpdate a]
+messageUpdateClient _ (NewWorld t w) = [GC.SetTick t, GC.SetWorld w]
+messageUpdateClient current (NewObject t i o) =
+ if t < current
+   then []
+   else GC.UpdateObject t o : clientTickUpdate current t
+messageUpdateClient current (UpdateMod t m mf) =
+  if t < current
+    then []
+    else GC.UpdateMod m mf : clientTickUpdate current t
+messageUpdateClient _ (TimePerTick tm) = [GC.SetTimePerTick tm]
+messageUpdateClient _ _ = []
+
+messageUpdateServer :: [Modifier] -> Message a -> [GC.Grav2tyUpdate a]
+-- TODO factor in tick
+messageUpdateServer allowed (UpdateMod _ m mf) =
+  if m `elem` allowed then [GC.UpdateMod m mf] else []
+messageUpdateServer _ _ = []
+
+updateMessageServer :: Tick -> GC.Grav2tyUpdate a -> Maybe (Message a)
+updateMessageServer current (GC.DeleteObject i) = Just $ DeleteObject current i
+updateMessageServer current (GC.UpdateObject i o) = Just $ NewObject current i o
+updateMessageServer current (GC.UpdateMod m mf) = Just $ UpdateMod current m mf
+updateMessageServer current (GC.SetWorld w) = Just $ NewWorld current w
+updateMessageServer current (GC.SetTimePerTick tm) = Just $ TimePerTick tm
+updateMessageServer current _ = Nothing
diff --git a/server/Main.hs b/server/Main.hs
index 94b0673..6c6a39c 100644
--- a/server/Main.hs
+++ b/server/Main.hs
@@ -2,21 +2,44 @@ module Main where
 
 import Grav2ty.Core
 import Grav2ty.Control
+import qualified Grav2ty.Protocol as GP
 
 import Control.Concurrent (threadDelay)
 import Control.Concurrent.Async
 import Control.Concurrent.STM
+import Control.Concurrent.STM.TVar
 import Control.Concurrent.STM.TMVar
 import Control.Concurrent.STM.TChan
-import Control.Lens ((%=))
-import Control.Monad (forever, forM_)
-import Control.Monad.IO.Class (liftIO)
+import Control.Lens ((%=), use)
+import Control.Monad (forM_, unless, forever)
+import Control.Monad.IO.Class (liftIO, MonadIO ())
 import Control.Monad.Trans.State.Lazy
+import Data.Attoparsec.ByteString (parseOnly)
+import Data.Hashable
 import qualified Data.Map as M
+import Data.Maybe (isJust, maybeToList)
 import Linear.V2
+import Network.Socket
+import Network.Socket.ByteString (recvFrom, sendManyTo)
 import System.Clock
+import System.Posix.Signals (installHandler, Handler (..), sigTERM, sigINT)
 import System.Ticked
 
+instance Hashable SockAddr where
+  hashWithSalt s (SockAddrUnix str) = s
+    `hashWithSalt` str `hashWithSalt` (1 :: Int)
+  hashWithSalt s (SockAddrInet p h) = s
+    `hashWithSalt` (2 :: Int)
+    `hashWithSalt` (fromIntegral p :: Int)
+    `hashWithSalt` h
+  hashWithSalt s (SockAddrInet6 p f h scope) = s
+    `hashWithSalt` (3 :: Int)
+    `hashWithSalt` (fromIntegral p :: Int)
+    `hashWithSalt` f
+    `hashWithSalt` h
+    `hashWithSalt` scope
+
+
 initialState :: Grav2tyState Double ()
 initialState = Grav2tyState 0 (10^6) mempty ()
   (M.fromList
@@ -27,6 +50,12 @@ initialState = Grav2tyState 0 (10^6) mempty ()
 data TickThreadMsg a
   = TickThreadUpdates [Grav2tyUpdate a]
   | TickThreadDone Tick TimeSpec
+  | TickThreadSendWorld SockAddr
+  deriving (Show, Eq, Ord)
+
+data NetThreadMsg a
+  = NetThreadBroadcast [GP.Message a]
+  | NetThreadSendTo SockAddr [GP.Message a]
   deriving (Show, Eq, Ord)
 
 processTick :: TMVar (Grav2tyState Double ()) -> TChan (TickThreadMsg Double) -> IO ()
@@ -42,19 +71,37 @@ processTick svar chan = do
     . writeTChan chan
     $ TickThreadDone (_tick state) (diffTimeSpec before after)
 
-processUpdates :: TMVar (Grav2tyState Double ()) -> TChan (TickThreadMsg Double)
+processUpdates :: TMVar (Grav2tyState Double ())
+               -> TChan (TickThreadMsg Double)
+               -> TChan (NetThreadMsg Double)
                -> Grav2ty Double () IO ()
-processUpdates svar chan = forever $ do
-  msg <- liftIO . atomically $ readTChan chan
-  liftIO $ print msg
+processUpdates svar tickChan netChan = forever $ do
+  msg <- liftIO . atomically $ readTChan tickChan
 
   case msg of
     TickThreadUpdates updates -> forM_ updates $ \u -> do
-      liftIO $ print u
-      case u of
-        DeleteObject i -> delObject i
-        UpdateObject i o -> setObject (Just i) o >> pure ()
-        NewObject o -> addObject o >> pure ()
+      update' <-
+        case u of
+          DeleteObject i -> delObject i >> pure u
+          UpdateObject i o -> setObject (Just i) o >> pure u
+          NewObject o -> do
+            i <- addObject o
+            pure $ UpdateObject i o
+          _ -> error "Update type not implemented" -- TODO
+
+      tick <- use tick
+      case GP.updateMessageServer tick update' of
+        Just m -> liftIO . atomically
+          . writeTChan netChan $ NetThreadBroadcast [m]
+        Nothing -> pure ()
+
+    -- TODO this is a bit ugly actually and a lot of indirection
+    TickThreadSendWorld addr -> do
+      world <- use world
+      tick <- use tick
+
+      liftIO . atomically . writeTChan netChan
+        $ NetThreadSendTo addr [GP.NewWorld tick world]
     TickThreadDone t timespec -> do
       tick %= (+ 1)
       state <- get
@@ -62,10 +109,85 @@ processUpdates svar chan = forever $ do
         ++ show (toNanoSecs timespec) ++ "ns"
       liftIO . atomically $ putTMVar svar state
 
+netOut :: Socket
+       -> TVar (M.Map SockAddr Modifier)
+       -> TChan (NetThreadMsg Double)
+       -> IO ()
+netOut s clients netChan = forever $ do
+  cmd <- atomically $ readTChan netChan
+
+  case cmd of
+    NetThreadBroadcast msgs -> readTVarIO clients >>= \addrs -> do
+      flip M.traverseWithKey addrs $ \addr _ ->
+        sendManyTo s (map GP.renderMessage msgs) addr
+      pure ()
+    NetThreadSendTo addr msgs ->
+      sendManyTo s (map GP.renderMessage msgs) addr
+
+-- 100 MB
+maxPacketLen :: Int
+maxPacketLen = 100 * 1024^2
+
+modifierForAddr :: SockAddr -> Modifier
+modifierForAddr = Mod . fromIntegral . hash
+
+netIn :: Socket
+      -> TVar (M.Map SockAddr Modifier)
+      -> TChan (NetThreadMsg Double)
+      -> TChan (TickThreadMsg Double)
+      -> IO ()
+netIn s clients netChan tickChan = forever $ do
+  (bytes, addr) <- recvFrom s maxPacketLen
+
+  case parseOnly GP.messageParser bytes of
+    Left err -> do
+      putStrLn $ "Could not parse message from " ++ show addr ++ ": " ++ err
+      atomically . writeTChan netChan
+        $ NetThreadSendTo addr [GP.Error GP.ErrorNoParse]
+    Right (GP.ProtocolVersion v) ->
+      if v == GP.protocolVersion
+        then let modifier = modifierForAddr addr
+              in atomically $ do
+                modifyTVar clients $ M.insert addr modifier
+                writeTChan tickChan $ TickThreadSendWorld addr
+                writeTChan netChan
+                  $ NetThreadSendTo addr [GP.AssignMods [modifier]]
+        else do
+          putStrLn $ "Incompatible protocol version from " ++ show addr
+          atomically . writeTChan netChan
+            $ NetThreadSendTo addr [GP.Error GP.ErrorVersionMismatch]
+    Right x -> do
+      clientMod <- maybeToList . M.lookup addr <$> readTVarIO clients
+      atomically . writeTChan tickChan . TickThreadUpdates
+        $ GP.messageUpdateServer clientMod x
+
+getGrav2tyAddr :: IO AddrInfo
+getGrav2tyAddr = head <$> getAddrInfo hints (Just "::") (Just "2001")
+  where hints = Just $ defaultHints
+          { addrFlags = [ AI_NUMERICSERV ]
+          , addrFamily = AF_INET6
+          , addrSocketType = Datagram
+          }
+
 main :: IO ()
 main = do
+  addr <- getGrav2tyAddr
+  sock <- socket (addrFamily addr) (addrSocketType addr) $ addrProtocol addr
+  setSocketOption sock IPv6Only 0
+
+  bind sock (addrAddress addr)
+  putStrLn $ "Listening on " ++ show (addrAddress addr)
+
+  shutdown <- newEmptyTMVarIO
+
+  _ <- installHandler sigTERM (Catch (atomically $ putTMVar shutdown ())) Nothing
+  _ <- installHandler sigINT  (Catch (atomically $ putTMVar shutdown ())) Nothing
+
   stateForTick <- newTMVarIO initialState
-  tickChan <- newTChanIO
+  clients <- newTVarIO mempty
+
+  tickChan <- atomically $ newBroadcastTChan
+  netOutChan <- atomically $ newBroadcastTChan
 
   tickThreads <- async
     . runTicked (_timePerTick initialState)
@@ -74,11 +196,33 @@ main = do
   link tickThreads
 
   stateThread <- async $ do
-    execStateT (processUpdates stateForTick tickChan) initialState
+    tickChan' <- atomically $ dupTChan tickChan
+    execStateT (processUpdates stateForTick tickChan' netOutChan) initialState
     pure ()
 
   link stateThread
+
+  netInThread <- async $ netIn sock clients netOutChan tickChan
+
+  netOutThread <- async $ do
+    netOutChan' <- atomically $ dupTChan netOutChan
+    netOut sock clients netOutChan'
+    pure ()
+
+  link netOutThread
+
   link2 tickThreads stateThread
+  link2 stateThread netOutThread
+
+  -- wait for shutdown signal
+  atomically $ takeTMVar shutdown
+
+  -- stop tickThreads
+  cancel tickThreads
+
+  -- close socket after killing netThreads
+  cancel netOutThread
+  cancel netInThread
+  close sock
 
-  wait tickThreads
-  wait stateThread
+  cancel stateThread