aboutsummaryrefslogtreecommitdiff
path: root/src/Ssb/Peer/RPC
diff options
context:
space:
mode:
authorHaskell Guy <haskell.guy@localhost>2020-05-26 13:07:50 +0200
committerHaskell Guy <haskell.guy@localhost>2020-05-26 13:37:29 +0200
commit41cde99ec6189dbecca6803a5aa4f6f18142e8ba (patch)
tree7a0ceab0d516b8c3b7b49313100ae50c97e875c3 /src/Ssb/Peer/RPC
downloadssb-haskell-41cde99ec6189dbecca6803a5aa4f6f18142e8ba.tar.xz
initial commit
Diffstat (limited to 'src/Ssb/Peer/RPC')
-rw-r--r--src/Ssb/Peer/RPC/Gossip.hs139
-rw-r--r--src/Ssb/Peer/RPC/Room.hs330
-rw-r--r--src/Ssb/Peer/RPC/WhoAmI.hs49
3 files changed, 518 insertions, 0 deletions
diff --git a/src/Ssb/Peer/RPC/Gossip.hs b/src/Ssb/Peer/RPC/Gossip.hs
new file mode 100644
index 0000000..b97e4a4
--- /dev/null
+++ b/src/Ssb/Peer/RPC/Gossip.hs
@@ -0,0 +1,139 @@
+-- | This module implements Scuttlebutt's Remote Procedure Call for
+-- CreateHistoryStream.
+--
+-- For more information kindly refer the to protocol guide
+-- https://ssbc.github.io/scuttlebutt-protocol-guide
+
+module Ssb.Peer.RPC.Gossip where
+
+import Protolude hiding ( Identity )
+
+import Control.Concurrent.STM
+import qualified Data.Aeson as Aeson
+import Data.Aeson ( FromJSON
+ , ToJSON
+ )
+import Data.Default
+import qualified Data.Map.Strict as Map
+
+import Ssb.Aux
+import qualified Ssb.Feed as Feed
+import qualified Ssb.Peer.RPC as RPC
+
+
+-- | TODO: Comment
+-- | TODO: Naming
+-- | TODO: Keyed Message support
+-- | TODO: Proper Request default values setting
+data Request = Request
+ { id :: Feed.FeedID
+ , sequence :: Maybe Int
+ , limit :: Maybe Int
+ , live :: Maybe Bool
+ , old :: Maybe Bool
+ , keys :: Bool
+ } deriving (Generic,Show)
+
+newRequest :: Feed.FeedID -> Request
+newRequest id = Request { id = id
+ , sequence = Nothing
+ , limit = Nothing
+ , live = Just False
+ , old = Just False
+ , keys = True
+ }
+
+
+instance FromJSON Request
+
+instance ToJSON Request where
+ toJSON = Aeson.genericToJSON (Aeson.defaultOptions {Aeson.omitNothingFields = True})
+
+-- TODO: reduce friction for introducing RPC requests
+
+createHistoryStreamRequest :: Request -> RPC.Request [Request]
+createHistoryStreamRequest req = RPC.Request
+ { RPC.name = ["createHistoryStream"]
+ , RPC.typ = RPC.Source
+ , RPC.args = [req]
+ }
+
+createHistoryStream
+ :: FromJSON b
+ => RPC.ConnState
+ -> Request
+ -> a
+ -> (a -> Feed.VerifiableMessage b -> IO (Either Text a))
+ -> IO (Either Text a)
+createHistoryStream conn req init cmd = RPC.request
+ conn
+ (createHistoryStreamRequest req)
+ (RPC.foldStream cmd' init)
+ where
+ cmd' a payload = case payload of
+ RPC.JSONPayload buf -> do
+ msg <- Feed.decodeJSONVerifiableMessage buf
+ either (return . error) (cmd a) msg
+ v@otherwise -> return $ error "expected JSON but got something else"
+
+data KeyedMessage a = KeyedMessage
+ { key :: Feed.MessageID
+ , timestamp :: Int
+ , value :: Feed.Message a
+ } deriving (Generic,Show)
+
+instance (FromJSON a) => FromJSON (KeyedMessage a)
+
+instance (ToJSON a) => ToJSON (KeyedMessage a)
+
+--createKeyedHistoryStream
+-- :: RPC.ConnState
+-- -> Request
+-- -> (KeyedMessage -> IO (Either Text a))
+-- -> IO (Either Text a)
+--createHistoryStream conn req =
+-- RPC.request conn (createHistoryStreamRequest req)
+--
+
+newtype Gossiper a = Gossiper (TMVar (Feed.Feeds a))
+
+newGossiper :: ToJSON a => IO (Gossiper a)
+newGossiper = do
+ mVar <- newTMVarIO Feed.emptyFeeds
+ return $ Gossiper mVar
+
+addFeed :: ToJSON a => Gossiper a -> Feed.Feed a -> IO ()
+addFeed (Gossiper (mFeeds)) feed = do
+ atomically $ do
+ feeds <- takeTMVar mFeeds
+ putTMVar mFeeds (Feed.insert feed feeds)
+
+writeFeed :: ToJSON a => RPC.Stream -> Feed.Feed a -> IO (Either Text ())
+writeFeed stream (Feed.Feed _ msgs) = do
+ return <$> forM_
+ msgs
+ (\msg -> do
+ let msg' = Feed.encodeJSONVerifiableMessage msg
+ err <- RPC.writeStream stream def (RPC.JSONPayload msg')
+ either (\err -> print err) (\_ -> return ()) err
+ )
+
+instance ToJSON a => RPC.Handler (Gossiper a) where
+ endpoints h = [RPC.Endpoint ["createHistoryStream"] RPC.Source]
+
+ serve (Gossiper mFeeds) (RPC.Endpoint ["createHistoryStream"] RPC.Source) arg stream
+ = do
+ feeds <- atomically $ readTMVar mFeeds
+ let req = decodeJSON (encodeJSON arg) :: Either Text [Request]
+ case req of
+ Left err -> do
+ return . return $ ()
+ Right [] -> return . return $ ()
+ Right [arg] -> do
+ case Feed.lookup (id arg) feeds of
+ Just feed -> writeFeed stream feed
+ Nothing -> return . return $ ()
+
+ notifyConnect _ _ = return . return $ ()
+
+ notifyDisconnect _ _ = return . return $ ()
diff --git a/src/Ssb/Peer/RPC/Room.hs b/src/Ssb/Peer/RPC/Room.hs
new file mode 100644
index 0000000..47bc956
--- /dev/null
+++ b/src/Ssb/Peer/RPC/Room.hs
@@ -0,0 +1,330 @@
+-- | This module implements Scuttlebutt's Remote Procedure Call for
+-- Rooms.
+--
+-- For more information kindly refer [WHERE]
+
+-- TODO: Documentation for SSB-Room
+
+module Ssb.Peer.RPC.Room where
+
+
+import Protolude hiding ( Identity )
+import qualified Data.Aeson as Aeson
+import Control.Concurrent.STM
+import Data.Default
+import qualified Data.Map.Strict as Map
+import Data.Time.Clock ( UTCTime
+ , getCurrentTime
+ )
+import qualified Data.Text as Text
+
+import Ssb.Aux
+import qualified Ssb.Identity as Ssb
+import qualified Ssb.Pub as Ssb
+import Ssb.Network
+import qualified Ssb.Feed as Feed
+import qualified Ssb.Peer.RPC as RPC
+
+seed :: Text
+seed = "SSB+Room+PSK3TLYC2T86EHQCUHBUHASCASE18JBV24="
+
+data Invite = Invite
+ { host :: Host
+ , port :: Port
+ , key :: Ssb.PublicKey
+ } deriving (Eq, Show)
+
+formatInvite :: Invite -> Text
+formatInvite arg =
+ "net:"
+ <> host arg
+ <> ":"
+ <> port arg
+ <> "~shs"
+ <> ":"
+ <> formatPublicKey' arg
+ <> ":"
+ <> seed
+ where
+ formatPublicKey' arg =
+ Text.dropEnd 8 $ Text.drop 1 $ Ssb.formatPublicKey $ key arg
+
+-- TODO: Seriously, figure out how to use duplicate field names
+data Room = Room {
+ endpoints :: TMVar (Map Ssb.PublicKey (RPC.ConnState, [Tunnel]))
+ , notifyChange :: TChan Bool
+ , roomName :: Text
+ , roomDesc :: Text
+ , tunnels :: TMVar (Map Tunnel [ThreadId])
+ }
+
+newRoom :: Text -> Text -> IO Room
+newRoom name desc = do
+ endpoints <- newTMVarIO Map.empty
+ notifier <- newBroadcastTChanIO
+ tunnels <- newTMVarIO Map.empty
+ return $ Room endpoints notifier name desc tunnels
+
+getEndpoints :: Room -> IO [Ssb.PublicKey]
+getEndpoints h = do
+ let mVar = endpoints h
+ Map.keys <$> atomically (readTMVar mVar)
+
+lookUpPeer :: Room -> Ssb.PublicKey -> IO (Maybe RPC.ConnState)
+lookUpPeer h peer = do
+ endpoints' <- atomically $ readTMVar (endpoints h)
+ return $ fst <$> Map.lookup peer endpoints'
+
+errConnLimitReached = "peer limit reached"
+
+registerPeer :: Room -> RPC.Stream -> IO (Either Text ())
+registerPeer h stream = do
+ let mVar = endpoints h
+ atomically $ do
+ endpoints' <- takeTMVar mVar
+ if Map.size endpoints' == (maxBound :: Int)
+ then return $ Left errConnLimitReached
+ else do
+ putTMVar mVar
+ $ Map.insert (RPC.peer stream) (RPC.conn stream, []) endpoints'
+ writeTChan (notifyChange h) True
+ return $ Right ()
+
+unregisterPeer :: Room -> Ssb.PublicKey -> IO ()
+unregisterPeer h peer = do
+ let mVar = endpoints h
+ atomically $ do
+ endpoints' <- takeTMVar mVar
+ putTMVar mVar $ Map.delete peer endpoints'
+ writeTChan (notifyChange h) True
+
+data IsRoomResponse = IsRoomResponse {
+ name :: Text
+ , description :: Text
+ } deriving (Generic, Show)
+
+instance Aeson.FromJSON IsRoomResponse
+
+instance Aeson.ToJSON IsRoomResponse
+
+newIsRoomResponse :: Room -> IsRoomResponse
+newIsRoomResponse l = IsRoomResponse (roomName l) (roomDesc l)
+
+announceRequest :: RPC.Request [Text]
+announceRequest =
+ RPC.Request { name = ["tunnel", "announce"], typ = RPC.Async, args = [] }
+
+announce :: RPC.ConnState -> IO (Either Text ())
+announce conn = RPC.requestAsync conn announceRequest
+
+data ConnectRequest = ConnectRequest {
+ target :: Ssb.PublicKey
+ , portal :: Ssb.PublicKey
+ } deriving (Generic, Show)
+
+instance Aeson.FromJSON ConnectRequest
+
+instance Aeson.ToJSON ConnectRequest
+
+leaveRequest :: RPC.Request [Text]
+leaveRequest =
+ RPC.Request { name = ["tunnel", "leave"], typ = RPC.Async, args = [] }
+
+leave :: RPC.ConnState -> IO (Either Text ())
+leave conn = RPC.requestAsync conn leaveRequest
+
+pingRequest :: RPC.Request [Text]
+pingRequest =
+ RPC.Request { name = ["tunnel", "ping"], typ = RPC.Async, args = [] }
+
+ping :: RPC.ConnState -> IO (Either Text UTCTime)
+ping conn = RPC.requestAsync conn pingRequest
+
+-- | fork creates a new thread, incrementing the counter in the given mVar.
+fork mVar action = do
+ fork' <- newEmptyTMVarIO
+ atomically $ do
+ forks <- takeTMVar mVar
+ putTMVar mVar $ fork' : forks
+ forkFinally
+ action
+ (\_ -> do
+ print "exiting fork"
+ atomically $ putTMVar fork' ()
+ )
+
+-- | wait returns when all forks have completed.
+waitForkGroup mVar threads = do
+ cs <- atomically $ takeTMVar mVar
+ case cs of
+ [] -> return ()
+ m : ms -> do
+ atomically $ putTMVar mVar ms
+ atomically $ takeTMVar m
+ forM_ threads killThread
+ waitForkGroup mVar []
+
+forwardMessages :: RPC.Stream -> RPC.Stream -> IO (Either Text ())
+forwardMessages s1 s2 = do
+ mMsg <- RPC.readStream s1
+ case mMsg of
+ Nothing -> return $ Right ()
+ Just msg -> do
+ res <- RPC.writeStream s2 def msg
+ case res of
+ Left err -> return $ Left err
+ Right _ -> forwardMessages s1 s2
+
+type Tunnel = (Ssb.PublicKey, Ssb.PublicKey)
+
+newTunnel arg1 arg2 =
+ -- the tunnel's entries need to be ordered to make pairs unique
+ if arg1 < arg2 then (arg1, arg2) else (arg2, arg1)
+
+createTunnel :: Room -> (RPC.Stream, RPC.Stream) -> IO (Either Text ())
+createTunnel room (stream1, stream2) = do
+ let peer1 = RPC.connPeer $ RPC.conn stream1
+ let peer2 = RPC.connPeer $ RPC.conn stream2
+ --print
+ -- $ "creating tunnel for "
+ -- <> Ssb.formatPublicKey peer1
+ -- <> " <-> "
+ -- <> Ssb.formatPublicKey peer2
+ let tunnel = newTunnel peer1 peer2
+
+ exists <- atomically $ do
+ tunnels' <- readTMVar (tunnels room)
+ return $ Map.member tunnel tunnels'
+ if exists
+ then return $ Left "only one tunnel allowed"
+ else do
+ waiter <- newTMVarIO []
+ thread1 <- fork waiter $ forwardMessages stream1 stream2
+ thread2 <- fork waiter $ forwardMessages stream2 stream1
+ let threads = [thread1, thread2]
+
+ atomically $ do
+ endpoints' <- takeTMVar (endpoints room)
+ let endpoints'' = Map.adjust (addTunnel tunnel) (fst tunnel) endpoints'
+ let endpoints''' =
+ Map.adjust (addTunnel tunnel) (snd tunnel) endpoints''
+ putTMVar (endpoints room) endpoints'''
+
+ tunnels' <- takeTMVar (tunnels room)
+ let tunnels'' = Map.insert tunnel threads tunnels'
+ putTMVar (tunnels room) tunnels''
+
+ waitForkGroup waiter threads
+
+ atomically $ do
+ endpoints' <- takeTMVar (endpoints room)
+ let endpoints'' =
+ Map.adjust (removeTunnel tunnel) (fst tunnel) endpoints'
+ let endpoints''' =
+ Map.adjust (removeTunnel tunnel) (snd tunnel) endpoints''
+ putTMVar (endpoints room) endpoints'''
+
+ tunnels' <- takeTMVar (tunnels room)
+ putTMVar (tunnels room) $ Map.delete tunnel tunnels'
+
+ return . return $ ()
+ where
+ addTunnel arg (conn, tunnels) = (conn, tunnels ++ [arg])
+ removeTunnel arg (conn, tunnels) = (conn, filter (arg /=) tunnels)
+
+connect :: Room -> RPC.Stream -> ConnectRequest -> IO (Either Text ())
+connect room stream req = do
+ let tunnel = newTunnel (RPC.connPeer $ RPC.conn $ stream) (target req)
+ tunnels' <- atomically $ readTMVar (tunnels room)
+ mPeer <- lookUpPeer room (target req)
+ let peerConn = do
+ bool (return ()) errSelfNotAllowed (fst tunnel == snd tunnel)
+ bool (return ()) errOnlyUniqueTunnel (Map.member tunnel tunnels')
+ maybeToRight errPeerNotAvailable mPeer
+ case peerConn of
+ Left err -> do
+ print $ "errorr! " <> err
+ return $ Left err
+ Right conn -> RPC.request conn (rpcRequest req)
+ $ \peerStream -> createTunnel room (stream, peerStream)
+ where
+ errOnlyUniqueTunnel = Left $ "only unique tunnels are allowed"
+ errPeerNotAvailable = "peer is not connected" :: Text
+ errSelfNotAllowed = Left $ "connecting to self not allowed"
+ rpcRequest arg =
+ RPC.Request { name = ["tunnel", "connect"], typ = RPC.Duplex, args = [arg] }
+
+leave' :: Room -> Ssb.PublicKey -> IO (Either Text ())
+leave' room peer = do
+ endpoints' <- atomically $ readTMVar (endpoints room)
+ tunnels' <- atomically $ readTMVar (tunnels room)
+
+ let etunnels = fromMaybe mempty $ snd <$> Map.lookup peer endpoints'
+ forM_ etunnels $ \tunnel -> do
+ let threads = fromMaybe mempty $ Map.lookup tunnel tunnels'
+ forM_ threads killThread
+ unregisterPeer room peer
+ return . return $ ()
+
+instance RPC.Handler Room where
+ endpoints _ =
+ [ RPC.Endpoint ["tunnel", "announce"] RPC.Async
+ , RPC.Endpoint ["tunnel", "connect"] RPC.Duplex
+ , RPC.Endpoint ["tunnel", "endpoints"] RPC.Async
+ , RPC.Endpoint ["tunnel", "leave"] RPC.Async
+ , RPC.Endpoint ["tunnel", "isRoom"] RPC.Async
+ , RPC.Endpoint ["tunnel", "ping"] RPC.Async
+ ]
+
+ -- The announce and leave endpoints are defined by the server's JS, but
+ -- never called by the client. The official (NodeJS) project uses
+ -- disconnect notifications from the SSB-server.
+ --
+ serve h (RPC.Endpoint ["tunnel", "announce"] _) args stream =
+ registerPeer h stream
+
+ -- should decode request
+ serve h (RPC.Endpoint ["tunnel", "connect"] RPC.Duplex) args stream = do
+ let args' =
+ decodeJSON (toS $ Aeson.encode args) :: Either
+ Text
+ [ConnectRequest]
+ case args' of
+ Left err -> return $ Left err
+ Right [connReq] -> connect h stream connReq
+ otherwise -> return $ Left "bad target argument"
+
+ serve h (RPC.Endpoint ["tunnel", "endpoints"] _) _ stream = do
+ err <- registerPeer h stream
+ case err of
+ Left msg -> return $ Left msg
+ Right _ -> do
+ change <- atomically $ dupTChan $ notifyChange h
+ while $ do
+ endpoints' <- getEndpoints h
+ let resp = filter (/= RPC.peer stream) endpoints'
+ res <- RPC.writeStreamJSON stream resp
+ if isLeft res then return False else atomically $ readTChan change
+ return $ Right ()
+ where
+ while f = do
+ continue <- f
+ if continue then (while f) else return False
+
+ serve room (RPC.Endpoint ["tunnel", "leave"] _) _ stream =
+ leave' room (RPC.peer stream)
+
+ serve room (RPC.Endpoint ["tunnel", "isRoom"] _) _ stream =
+ RPC.writeStreamJSON stream (newIsRoomResponse room)
+
+ serve room (RPC.Endpoint ["tunnel", "ping"] _) _ stream = do
+ resp <- getCurrentTime
+ RPC.writeStreamJSON stream resp
+
+ serve room endpoint@otherwise arg stream = (RPC.notFoundHandlerFunc endpoint) arg stream
+
+ notifyConnect _ _ = return . return $ ()
+
+ notifyDisconnect room peer = do
+ _ <- leave' room peer
+ return . return $ ()
diff --git a/src/Ssb/Peer/RPC/WhoAmI.hs b/src/Ssb/Peer/RPC/WhoAmI.hs
new file mode 100644
index 0000000..be979fc
--- /dev/null
+++ b/src/Ssb/Peer/RPC/WhoAmI.hs
@@ -0,0 +1,49 @@
+-- | This module implements Scuttlebutt's Remote Procedure Call for
+-- Ping.
+--
+-- For more information kindly refer [WHERE]
+
+-- TODO: Update above documentation
+
+module Ssb.Peer.RPC.WhoAmI where
+
+import Protolude hiding ( Identity )
+import Data.Aeson as Aeson (FromJSON,ToJSON)
+
+import qualified Ssb.Identity as Ssb
+import qualified Ssb.Feed as Feed
+import qualified Ssb.Peer.RPC as RPC
+
+whoAmIRequest :: RPC.Request [Text]
+whoAmIRequest = RPC.Request {
+ name = ["whoami"]
+ , typ = RPC.Async
+ , args = []
+ }
+
+newtype WhoAmIResponse = WhoAmIResponse
+ { id :: Feed.FeedID
+ } deriving (Eq,Generic,Show)
+
+instance FromJSON WhoAmIResponse
+
+instance ToJSON WhoAmIResponse
+
+whoAmI
+ :: RPC.ConnState
+ -> IO (Either Text WhoAmIResponse)
+whoAmI conn = RPC.requestAsync conn whoAmIRequest
+
+newtype Handler = Handler ()
+
+newHandler :: Handler
+newHandler = Handler ()
+
+instance RPC.Handler Handler where
+ endpoints h = [RPC.Endpoint ["whoami"] RPC.Async]
+
+ serve (Handler ssbID) (RPC.Endpoint ["whoami"] RPC.Async) _ stream =
+ RPC.writeStreamJSON stream (WhoAmIResponse $ Feed.FeedID (RPC.peer stream))
+
+ notifyConnect _ _ = return . return $ ()
+ notifyDisconnect _ _ = return . return $ ()