diff options
Diffstat (limited to 'src/Ssb/Peer/RPC/Room.hs')
-rw-r--r-- | src/Ssb/Peer/RPC/Room.hs | 330 |
1 files changed, 330 insertions, 0 deletions
diff --git a/src/Ssb/Peer/RPC/Room.hs b/src/Ssb/Peer/RPC/Room.hs new file mode 100644 index 0000000..47bc956 --- /dev/null +++ b/src/Ssb/Peer/RPC/Room.hs @@ -0,0 +1,330 @@ +-- | This module implements Scuttlebutt's Remote Procedure Call for +-- Rooms. +-- +-- For more information kindly refer [WHERE] + +-- TODO: Documentation for SSB-Room + +module Ssb.Peer.RPC.Room where + + +import Protolude hiding ( Identity ) +import qualified Data.Aeson as Aeson +import Control.Concurrent.STM +import Data.Default +import qualified Data.Map.Strict as Map +import Data.Time.Clock ( UTCTime + , getCurrentTime + ) +import qualified Data.Text as Text + +import Ssb.Aux +import qualified Ssb.Identity as Ssb +import qualified Ssb.Pub as Ssb +import Ssb.Network +import qualified Ssb.Feed as Feed +import qualified Ssb.Peer.RPC as RPC + +seed :: Text +seed = "SSB+Room+PSK3TLYC2T86EHQCUHBUHASCASE18JBV24=" + +data Invite = Invite + { host :: Host + , port :: Port + , key :: Ssb.PublicKey + } deriving (Eq, Show) + +formatInvite :: Invite -> Text +formatInvite arg = + "net:" + <> host arg + <> ":" + <> port arg + <> "~shs" + <> ":" + <> formatPublicKey' arg + <> ":" + <> seed + where + formatPublicKey' arg = + Text.dropEnd 8 $ Text.drop 1 $ Ssb.formatPublicKey $ key arg + +-- TODO: Seriously, figure out how to use duplicate field names +data Room = Room { + endpoints :: TMVar (Map Ssb.PublicKey (RPC.ConnState, [Tunnel])) + , notifyChange :: TChan Bool + , roomName :: Text + , roomDesc :: Text + , tunnels :: TMVar (Map Tunnel [ThreadId]) + } + +newRoom :: Text -> Text -> IO Room +newRoom name desc = do + endpoints <- newTMVarIO Map.empty + notifier <- newBroadcastTChanIO + tunnels <- newTMVarIO Map.empty + return $ Room endpoints notifier name desc tunnels + +getEndpoints :: Room -> IO [Ssb.PublicKey] +getEndpoints h = do + let mVar = endpoints h + Map.keys <$> atomically (readTMVar mVar) + +lookUpPeer :: Room -> Ssb.PublicKey -> IO (Maybe RPC.ConnState) +lookUpPeer h peer = do + endpoints' <- atomically $ readTMVar (endpoints h) + return $ fst <$> Map.lookup peer endpoints' + +errConnLimitReached = "peer limit reached" + +registerPeer :: Room -> RPC.Stream -> IO (Either Text ()) +registerPeer h stream = do + let mVar = endpoints h + atomically $ do + endpoints' <- takeTMVar mVar + if Map.size endpoints' == (maxBound :: Int) + then return $ Left errConnLimitReached + else do + putTMVar mVar + $ Map.insert (RPC.peer stream) (RPC.conn stream, []) endpoints' + writeTChan (notifyChange h) True + return $ Right () + +unregisterPeer :: Room -> Ssb.PublicKey -> IO () +unregisterPeer h peer = do + let mVar = endpoints h + atomically $ do + endpoints' <- takeTMVar mVar + putTMVar mVar $ Map.delete peer endpoints' + writeTChan (notifyChange h) True + +data IsRoomResponse = IsRoomResponse { + name :: Text + , description :: Text + } deriving (Generic, Show) + +instance Aeson.FromJSON IsRoomResponse + +instance Aeson.ToJSON IsRoomResponse + +newIsRoomResponse :: Room -> IsRoomResponse +newIsRoomResponse l = IsRoomResponse (roomName l) (roomDesc l) + +announceRequest :: RPC.Request [Text] +announceRequest = + RPC.Request { name = ["tunnel", "announce"], typ = RPC.Async, args = [] } + +announce :: RPC.ConnState -> IO (Either Text ()) +announce conn = RPC.requestAsync conn announceRequest + +data ConnectRequest = ConnectRequest { + target :: Ssb.PublicKey + , portal :: Ssb.PublicKey + } deriving (Generic, Show) + +instance Aeson.FromJSON ConnectRequest + +instance Aeson.ToJSON ConnectRequest + +leaveRequest :: RPC.Request [Text] +leaveRequest = + RPC.Request { name = ["tunnel", "leave"], typ = RPC.Async, args = [] } + +leave :: RPC.ConnState -> IO (Either Text ()) +leave conn = RPC.requestAsync conn leaveRequest + +pingRequest :: RPC.Request [Text] +pingRequest = + RPC.Request { name = ["tunnel", "ping"], typ = RPC.Async, args = [] } + +ping :: RPC.ConnState -> IO (Either Text UTCTime) +ping conn = RPC.requestAsync conn pingRequest + +-- | fork creates a new thread, incrementing the counter in the given mVar. +fork mVar action = do + fork' <- newEmptyTMVarIO + atomically $ do + forks <- takeTMVar mVar + putTMVar mVar $ fork' : forks + forkFinally + action + (\_ -> do + print "exiting fork" + atomically $ putTMVar fork' () + ) + +-- | wait returns when all forks have completed. +waitForkGroup mVar threads = do + cs <- atomically $ takeTMVar mVar + case cs of + [] -> return () + m : ms -> do + atomically $ putTMVar mVar ms + atomically $ takeTMVar m + forM_ threads killThread + waitForkGroup mVar [] + +forwardMessages :: RPC.Stream -> RPC.Stream -> IO (Either Text ()) +forwardMessages s1 s2 = do + mMsg <- RPC.readStream s1 + case mMsg of + Nothing -> return $ Right () + Just msg -> do + res <- RPC.writeStream s2 def msg + case res of + Left err -> return $ Left err + Right _ -> forwardMessages s1 s2 + +type Tunnel = (Ssb.PublicKey, Ssb.PublicKey) + +newTunnel arg1 arg2 = + -- the tunnel's entries need to be ordered to make pairs unique + if arg1 < arg2 then (arg1, arg2) else (arg2, arg1) + +createTunnel :: Room -> (RPC.Stream, RPC.Stream) -> IO (Either Text ()) +createTunnel room (stream1, stream2) = do + let peer1 = RPC.connPeer $ RPC.conn stream1 + let peer2 = RPC.connPeer $ RPC.conn stream2 + --print + -- $ "creating tunnel for " + -- <> Ssb.formatPublicKey peer1 + -- <> " <-> " + -- <> Ssb.formatPublicKey peer2 + let tunnel = newTunnel peer1 peer2 + + exists <- atomically $ do + tunnels' <- readTMVar (tunnels room) + return $ Map.member tunnel tunnels' + if exists + then return $ Left "only one tunnel allowed" + else do + waiter <- newTMVarIO [] + thread1 <- fork waiter $ forwardMessages stream1 stream2 + thread2 <- fork waiter $ forwardMessages stream2 stream1 + let threads = [thread1, thread2] + + atomically $ do + endpoints' <- takeTMVar (endpoints room) + let endpoints'' = Map.adjust (addTunnel tunnel) (fst tunnel) endpoints' + let endpoints''' = + Map.adjust (addTunnel tunnel) (snd tunnel) endpoints'' + putTMVar (endpoints room) endpoints''' + + tunnels' <- takeTMVar (tunnels room) + let tunnels'' = Map.insert tunnel threads tunnels' + putTMVar (tunnels room) tunnels'' + + waitForkGroup waiter threads + + atomically $ do + endpoints' <- takeTMVar (endpoints room) + let endpoints'' = + Map.adjust (removeTunnel tunnel) (fst tunnel) endpoints' + let endpoints''' = + Map.adjust (removeTunnel tunnel) (snd tunnel) endpoints'' + putTMVar (endpoints room) endpoints''' + + tunnels' <- takeTMVar (tunnels room) + putTMVar (tunnels room) $ Map.delete tunnel tunnels' + + return . return $ () + where + addTunnel arg (conn, tunnels) = (conn, tunnels ++ [arg]) + removeTunnel arg (conn, tunnels) = (conn, filter (arg /=) tunnels) + +connect :: Room -> RPC.Stream -> ConnectRequest -> IO (Either Text ()) +connect room stream req = do + let tunnel = newTunnel (RPC.connPeer $ RPC.conn $ stream) (target req) + tunnels' <- atomically $ readTMVar (tunnels room) + mPeer <- lookUpPeer room (target req) + let peerConn = do + bool (return ()) errSelfNotAllowed (fst tunnel == snd tunnel) + bool (return ()) errOnlyUniqueTunnel (Map.member tunnel tunnels') + maybeToRight errPeerNotAvailable mPeer + case peerConn of + Left err -> do + print $ "errorr! " <> err + return $ Left err + Right conn -> RPC.request conn (rpcRequest req) + $ \peerStream -> createTunnel room (stream, peerStream) + where + errOnlyUniqueTunnel = Left $ "only unique tunnels are allowed" + errPeerNotAvailable = "peer is not connected" :: Text + errSelfNotAllowed = Left $ "connecting to self not allowed" + rpcRequest arg = + RPC.Request { name = ["tunnel", "connect"], typ = RPC.Duplex, args = [arg] } + +leave' :: Room -> Ssb.PublicKey -> IO (Either Text ()) +leave' room peer = do + endpoints' <- atomically $ readTMVar (endpoints room) + tunnels' <- atomically $ readTMVar (tunnels room) + + let etunnels = fromMaybe mempty $ snd <$> Map.lookup peer endpoints' + forM_ etunnels $ \tunnel -> do + let threads = fromMaybe mempty $ Map.lookup tunnel tunnels' + forM_ threads killThread + unregisterPeer room peer + return . return $ () + +instance RPC.Handler Room where + endpoints _ = + [ RPC.Endpoint ["tunnel", "announce"] RPC.Async + , RPC.Endpoint ["tunnel", "connect"] RPC.Duplex + , RPC.Endpoint ["tunnel", "endpoints"] RPC.Async + , RPC.Endpoint ["tunnel", "leave"] RPC.Async + , RPC.Endpoint ["tunnel", "isRoom"] RPC.Async + , RPC.Endpoint ["tunnel", "ping"] RPC.Async + ] + + -- The announce and leave endpoints are defined by the server's JS, but + -- never called by the client. The official (NodeJS) project uses + -- disconnect notifications from the SSB-server. + -- + serve h (RPC.Endpoint ["tunnel", "announce"] _) args stream = + registerPeer h stream + + -- should decode request + serve h (RPC.Endpoint ["tunnel", "connect"] RPC.Duplex) args stream = do + let args' = + decodeJSON (toS $ Aeson.encode args) :: Either + Text + [ConnectRequest] + case args' of + Left err -> return $ Left err + Right [connReq] -> connect h stream connReq + otherwise -> return $ Left "bad target argument" + + serve h (RPC.Endpoint ["tunnel", "endpoints"] _) _ stream = do + err <- registerPeer h stream + case err of + Left msg -> return $ Left msg + Right _ -> do + change <- atomically $ dupTChan $ notifyChange h + while $ do + endpoints' <- getEndpoints h + let resp = filter (/= RPC.peer stream) endpoints' + res <- RPC.writeStreamJSON stream resp + if isLeft res then return False else atomically $ readTChan change + return $ Right () + where + while f = do + continue <- f + if continue then (while f) else return False + + serve room (RPC.Endpoint ["tunnel", "leave"] _) _ stream = + leave' room (RPC.peer stream) + + serve room (RPC.Endpoint ["tunnel", "isRoom"] _) _ stream = + RPC.writeStreamJSON stream (newIsRoomResponse room) + + serve room (RPC.Endpoint ["tunnel", "ping"] _) _ stream = do + resp <- getCurrentTime + RPC.writeStreamJSON stream resp + + serve room endpoint@otherwise arg stream = (RPC.notFoundHandlerFunc endpoint) arg stream + + notifyConnect _ _ = return . return $ () + + notifyDisconnect room peer = do + _ <- leave' room peer + return . return $ () |