aboutsummaryrefslogtreecommitdiff
path: root/src/Ssb/Peer/RPC/Room.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Ssb/Peer/RPC/Room.hs')
-rw-r--r--src/Ssb/Peer/RPC/Room.hs330
1 files changed, 330 insertions, 0 deletions
diff --git a/src/Ssb/Peer/RPC/Room.hs b/src/Ssb/Peer/RPC/Room.hs
new file mode 100644
index 0000000..47bc956
--- /dev/null
+++ b/src/Ssb/Peer/RPC/Room.hs
@@ -0,0 +1,330 @@
+-- | This module implements Scuttlebutt's Remote Procedure Call for
+-- Rooms.
+--
+-- For more information kindly refer [WHERE]
+
+-- TODO: Documentation for SSB-Room
+
+module Ssb.Peer.RPC.Room where
+
+
+import Protolude hiding ( Identity )
+import qualified Data.Aeson as Aeson
+import Control.Concurrent.STM
+import Data.Default
+import qualified Data.Map.Strict as Map
+import Data.Time.Clock ( UTCTime
+ , getCurrentTime
+ )
+import qualified Data.Text as Text
+
+import Ssb.Aux
+import qualified Ssb.Identity as Ssb
+import qualified Ssb.Pub as Ssb
+import Ssb.Network
+import qualified Ssb.Feed as Feed
+import qualified Ssb.Peer.RPC as RPC
+
+seed :: Text
+seed = "SSB+Room+PSK3TLYC2T86EHQCUHBUHASCASE18JBV24="
+
+data Invite = Invite
+ { host :: Host
+ , port :: Port
+ , key :: Ssb.PublicKey
+ } deriving (Eq, Show)
+
+formatInvite :: Invite -> Text
+formatInvite arg =
+ "net:"
+ <> host arg
+ <> ":"
+ <> port arg
+ <> "~shs"
+ <> ":"
+ <> formatPublicKey' arg
+ <> ":"
+ <> seed
+ where
+ formatPublicKey' arg =
+ Text.dropEnd 8 $ Text.drop 1 $ Ssb.formatPublicKey $ key arg
+
+-- TODO: Seriously, figure out how to use duplicate field names
+data Room = Room {
+ endpoints :: TMVar (Map Ssb.PublicKey (RPC.ConnState, [Tunnel]))
+ , notifyChange :: TChan Bool
+ , roomName :: Text
+ , roomDesc :: Text
+ , tunnels :: TMVar (Map Tunnel [ThreadId])
+ }
+
+newRoom :: Text -> Text -> IO Room
+newRoom name desc = do
+ endpoints <- newTMVarIO Map.empty
+ notifier <- newBroadcastTChanIO
+ tunnels <- newTMVarIO Map.empty
+ return $ Room endpoints notifier name desc tunnels
+
+getEndpoints :: Room -> IO [Ssb.PublicKey]
+getEndpoints h = do
+ let mVar = endpoints h
+ Map.keys <$> atomically (readTMVar mVar)
+
+lookUpPeer :: Room -> Ssb.PublicKey -> IO (Maybe RPC.ConnState)
+lookUpPeer h peer = do
+ endpoints' <- atomically $ readTMVar (endpoints h)
+ return $ fst <$> Map.lookup peer endpoints'
+
+errConnLimitReached = "peer limit reached"
+
+registerPeer :: Room -> RPC.Stream -> IO (Either Text ())
+registerPeer h stream = do
+ let mVar = endpoints h
+ atomically $ do
+ endpoints' <- takeTMVar mVar
+ if Map.size endpoints' == (maxBound :: Int)
+ then return $ Left errConnLimitReached
+ else do
+ putTMVar mVar
+ $ Map.insert (RPC.peer stream) (RPC.conn stream, []) endpoints'
+ writeTChan (notifyChange h) True
+ return $ Right ()
+
+unregisterPeer :: Room -> Ssb.PublicKey -> IO ()
+unregisterPeer h peer = do
+ let mVar = endpoints h
+ atomically $ do
+ endpoints' <- takeTMVar mVar
+ putTMVar mVar $ Map.delete peer endpoints'
+ writeTChan (notifyChange h) True
+
+data IsRoomResponse = IsRoomResponse {
+ name :: Text
+ , description :: Text
+ } deriving (Generic, Show)
+
+instance Aeson.FromJSON IsRoomResponse
+
+instance Aeson.ToJSON IsRoomResponse
+
+newIsRoomResponse :: Room -> IsRoomResponse
+newIsRoomResponse l = IsRoomResponse (roomName l) (roomDesc l)
+
+announceRequest :: RPC.Request [Text]
+announceRequest =
+ RPC.Request { name = ["tunnel", "announce"], typ = RPC.Async, args = [] }
+
+announce :: RPC.ConnState -> IO (Either Text ())
+announce conn = RPC.requestAsync conn announceRequest
+
+data ConnectRequest = ConnectRequest {
+ target :: Ssb.PublicKey
+ , portal :: Ssb.PublicKey
+ } deriving (Generic, Show)
+
+instance Aeson.FromJSON ConnectRequest
+
+instance Aeson.ToJSON ConnectRequest
+
+leaveRequest :: RPC.Request [Text]
+leaveRequest =
+ RPC.Request { name = ["tunnel", "leave"], typ = RPC.Async, args = [] }
+
+leave :: RPC.ConnState -> IO (Either Text ())
+leave conn = RPC.requestAsync conn leaveRequest
+
+pingRequest :: RPC.Request [Text]
+pingRequest =
+ RPC.Request { name = ["tunnel", "ping"], typ = RPC.Async, args = [] }
+
+ping :: RPC.ConnState -> IO (Either Text UTCTime)
+ping conn = RPC.requestAsync conn pingRequest
+
+-- | fork creates a new thread, incrementing the counter in the given mVar.
+fork mVar action = do
+ fork' <- newEmptyTMVarIO
+ atomically $ do
+ forks <- takeTMVar mVar
+ putTMVar mVar $ fork' : forks
+ forkFinally
+ action
+ (\_ -> do
+ print "exiting fork"
+ atomically $ putTMVar fork' ()
+ )
+
+-- | wait returns when all forks have completed.
+waitForkGroup mVar threads = do
+ cs <- atomically $ takeTMVar mVar
+ case cs of
+ [] -> return ()
+ m : ms -> do
+ atomically $ putTMVar mVar ms
+ atomically $ takeTMVar m
+ forM_ threads killThread
+ waitForkGroup mVar []
+
+forwardMessages :: RPC.Stream -> RPC.Stream -> IO (Either Text ())
+forwardMessages s1 s2 = do
+ mMsg <- RPC.readStream s1
+ case mMsg of
+ Nothing -> return $ Right ()
+ Just msg -> do
+ res <- RPC.writeStream s2 def msg
+ case res of
+ Left err -> return $ Left err
+ Right _ -> forwardMessages s1 s2
+
+type Tunnel = (Ssb.PublicKey, Ssb.PublicKey)
+
+newTunnel arg1 arg2 =
+ -- the tunnel's entries need to be ordered to make pairs unique
+ if arg1 < arg2 then (arg1, arg2) else (arg2, arg1)
+
+createTunnel :: Room -> (RPC.Stream, RPC.Stream) -> IO (Either Text ())
+createTunnel room (stream1, stream2) = do
+ let peer1 = RPC.connPeer $ RPC.conn stream1
+ let peer2 = RPC.connPeer $ RPC.conn stream2
+ --print
+ -- $ "creating tunnel for "
+ -- <> Ssb.formatPublicKey peer1
+ -- <> " <-> "
+ -- <> Ssb.formatPublicKey peer2
+ let tunnel = newTunnel peer1 peer2
+
+ exists <- atomically $ do
+ tunnels' <- readTMVar (tunnels room)
+ return $ Map.member tunnel tunnels'
+ if exists
+ then return $ Left "only one tunnel allowed"
+ else do
+ waiter <- newTMVarIO []
+ thread1 <- fork waiter $ forwardMessages stream1 stream2
+ thread2 <- fork waiter $ forwardMessages stream2 stream1
+ let threads = [thread1, thread2]
+
+ atomically $ do
+ endpoints' <- takeTMVar (endpoints room)
+ let endpoints'' = Map.adjust (addTunnel tunnel) (fst tunnel) endpoints'
+ let endpoints''' =
+ Map.adjust (addTunnel tunnel) (snd tunnel) endpoints''
+ putTMVar (endpoints room) endpoints'''
+
+ tunnels' <- takeTMVar (tunnels room)
+ let tunnels'' = Map.insert tunnel threads tunnels'
+ putTMVar (tunnels room) tunnels''
+
+ waitForkGroup waiter threads
+
+ atomically $ do
+ endpoints' <- takeTMVar (endpoints room)
+ let endpoints'' =
+ Map.adjust (removeTunnel tunnel) (fst tunnel) endpoints'
+ let endpoints''' =
+ Map.adjust (removeTunnel tunnel) (snd tunnel) endpoints''
+ putTMVar (endpoints room) endpoints'''
+
+ tunnels' <- takeTMVar (tunnels room)
+ putTMVar (tunnels room) $ Map.delete tunnel tunnels'
+
+ return . return $ ()
+ where
+ addTunnel arg (conn, tunnels) = (conn, tunnels ++ [arg])
+ removeTunnel arg (conn, tunnels) = (conn, filter (arg /=) tunnels)
+
+connect :: Room -> RPC.Stream -> ConnectRequest -> IO (Either Text ())
+connect room stream req = do
+ let tunnel = newTunnel (RPC.connPeer $ RPC.conn $ stream) (target req)
+ tunnels' <- atomically $ readTMVar (tunnels room)
+ mPeer <- lookUpPeer room (target req)
+ let peerConn = do
+ bool (return ()) errSelfNotAllowed (fst tunnel == snd tunnel)
+ bool (return ()) errOnlyUniqueTunnel (Map.member tunnel tunnels')
+ maybeToRight errPeerNotAvailable mPeer
+ case peerConn of
+ Left err -> do
+ print $ "errorr! " <> err
+ return $ Left err
+ Right conn -> RPC.request conn (rpcRequest req)
+ $ \peerStream -> createTunnel room (stream, peerStream)
+ where
+ errOnlyUniqueTunnel = Left $ "only unique tunnels are allowed"
+ errPeerNotAvailable = "peer is not connected" :: Text
+ errSelfNotAllowed = Left $ "connecting to self not allowed"
+ rpcRequest arg =
+ RPC.Request { name = ["tunnel", "connect"], typ = RPC.Duplex, args = [arg] }
+
+leave' :: Room -> Ssb.PublicKey -> IO (Either Text ())
+leave' room peer = do
+ endpoints' <- atomically $ readTMVar (endpoints room)
+ tunnels' <- atomically $ readTMVar (tunnels room)
+
+ let etunnels = fromMaybe mempty $ snd <$> Map.lookup peer endpoints'
+ forM_ etunnels $ \tunnel -> do
+ let threads = fromMaybe mempty $ Map.lookup tunnel tunnels'
+ forM_ threads killThread
+ unregisterPeer room peer
+ return . return $ ()
+
+instance RPC.Handler Room where
+ endpoints _ =
+ [ RPC.Endpoint ["tunnel", "announce"] RPC.Async
+ , RPC.Endpoint ["tunnel", "connect"] RPC.Duplex
+ , RPC.Endpoint ["tunnel", "endpoints"] RPC.Async
+ , RPC.Endpoint ["tunnel", "leave"] RPC.Async
+ , RPC.Endpoint ["tunnel", "isRoom"] RPC.Async
+ , RPC.Endpoint ["tunnel", "ping"] RPC.Async
+ ]
+
+ -- The announce and leave endpoints are defined by the server's JS, but
+ -- never called by the client. The official (NodeJS) project uses
+ -- disconnect notifications from the SSB-server.
+ --
+ serve h (RPC.Endpoint ["tunnel", "announce"] _) args stream =
+ registerPeer h stream
+
+ -- should decode request
+ serve h (RPC.Endpoint ["tunnel", "connect"] RPC.Duplex) args stream = do
+ let args' =
+ decodeJSON (toS $ Aeson.encode args) :: Either
+ Text
+ [ConnectRequest]
+ case args' of
+ Left err -> return $ Left err
+ Right [connReq] -> connect h stream connReq
+ otherwise -> return $ Left "bad target argument"
+
+ serve h (RPC.Endpoint ["tunnel", "endpoints"] _) _ stream = do
+ err <- registerPeer h stream
+ case err of
+ Left msg -> return $ Left msg
+ Right _ -> do
+ change <- atomically $ dupTChan $ notifyChange h
+ while $ do
+ endpoints' <- getEndpoints h
+ let resp = filter (/= RPC.peer stream) endpoints'
+ res <- RPC.writeStreamJSON stream resp
+ if isLeft res then return False else atomically $ readTChan change
+ return $ Right ()
+ where
+ while f = do
+ continue <- f
+ if continue then (while f) else return False
+
+ serve room (RPC.Endpoint ["tunnel", "leave"] _) _ stream =
+ leave' room (RPC.peer stream)
+
+ serve room (RPC.Endpoint ["tunnel", "isRoom"] _) _ stream =
+ RPC.writeStreamJSON stream (newIsRoomResponse room)
+
+ serve room (RPC.Endpoint ["tunnel", "ping"] _) _ stream = do
+ resp <- getCurrentTime
+ RPC.writeStreamJSON stream resp
+
+ serve room endpoint@otherwise arg stream = (RPC.notFoundHandlerFunc endpoint) arg stream
+
+ notifyConnect _ _ = return . return $ ()
+
+ notifyDisconnect room peer = do
+ _ <- leave' room peer
+ return . return $ ()