diff options
author | Haskell Guy <haskell.guy@localhost> | 2020-05-26 13:07:50 +0200 |
---|---|---|
committer | Haskell Guy <haskell.guy@localhost> | 2020-05-26 13:37:29 +0200 |
commit | 41cde99ec6189dbecca6803a5aa4f6f18142e8ba (patch) | |
tree | 7a0ceab0d516b8c3b7b49313100ae50c97e875c3 /src/Ssb/Peer/RPC | |
download | ssb-haskell-41cde99ec6189dbecca6803a5aa4f6f18142e8ba.tar.xz |
initial commit
Diffstat (limited to 'src/Ssb/Peer/RPC')
-rw-r--r-- | src/Ssb/Peer/RPC/Gossip.hs | 139 | ||||
-rw-r--r-- | src/Ssb/Peer/RPC/Room.hs | 330 | ||||
-rw-r--r-- | src/Ssb/Peer/RPC/WhoAmI.hs | 49 |
3 files changed, 518 insertions, 0 deletions
diff --git a/src/Ssb/Peer/RPC/Gossip.hs b/src/Ssb/Peer/RPC/Gossip.hs new file mode 100644 index 0000000..b97e4a4 --- /dev/null +++ b/src/Ssb/Peer/RPC/Gossip.hs @@ -0,0 +1,139 @@ +-- | This module implements Scuttlebutt's Remote Procedure Call for +-- CreateHistoryStream. +-- +-- For more information kindly refer the to protocol guide +-- https://ssbc.github.io/scuttlebutt-protocol-guide + +module Ssb.Peer.RPC.Gossip where + +import Protolude hiding ( Identity ) + +import Control.Concurrent.STM +import qualified Data.Aeson as Aeson +import Data.Aeson ( FromJSON + , ToJSON + ) +import Data.Default +import qualified Data.Map.Strict as Map + +import Ssb.Aux +import qualified Ssb.Feed as Feed +import qualified Ssb.Peer.RPC as RPC + + +-- | TODO: Comment +-- | TODO: Naming +-- | TODO: Keyed Message support +-- | TODO: Proper Request default values setting +data Request = Request + { id :: Feed.FeedID + , sequence :: Maybe Int + , limit :: Maybe Int + , live :: Maybe Bool + , old :: Maybe Bool + , keys :: Bool + } deriving (Generic,Show) + +newRequest :: Feed.FeedID -> Request +newRequest id = Request { id = id + , sequence = Nothing + , limit = Nothing + , live = Just False + , old = Just False + , keys = True + } + + +instance FromJSON Request + +instance ToJSON Request where + toJSON = Aeson.genericToJSON (Aeson.defaultOptions {Aeson.omitNothingFields = True}) + +-- TODO: reduce friction for introducing RPC requests + +createHistoryStreamRequest :: Request -> RPC.Request [Request] +createHistoryStreamRequest req = RPC.Request + { RPC.name = ["createHistoryStream"] + , RPC.typ = RPC.Source + , RPC.args = [req] + } + +createHistoryStream + :: FromJSON b + => RPC.ConnState + -> Request + -> a + -> (a -> Feed.VerifiableMessage b -> IO (Either Text a)) + -> IO (Either Text a) +createHistoryStream conn req init cmd = RPC.request + conn + (createHistoryStreamRequest req) + (RPC.foldStream cmd' init) + where + cmd' a payload = case payload of + RPC.JSONPayload buf -> do + msg <- Feed.decodeJSONVerifiableMessage buf + either (return . error) (cmd a) msg + v@otherwise -> return $ error "expected JSON but got something else" + +data KeyedMessage a = KeyedMessage + { key :: Feed.MessageID + , timestamp :: Int + , value :: Feed.Message a + } deriving (Generic,Show) + +instance (FromJSON a) => FromJSON (KeyedMessage a) + +instance (ToJSON a) => ToJSON (KeyedMessage a) + +--createKeyedHistoryStream +-- :: RPC.ConnState +-- -> Request +-- -> (KeyedMessage -> IO (Either Text a)) +-- -> IO (Either Text a) +--createHistoryStream conn req = +-- RPC.request conn (createHistoryStreamRequest req) +-- + +newtype Gossiper a = Gossiper (TMVar (Feed.Feeds a)) + +newGossiper :: ToJSON a => IO (Gossiper a) +newGossiper = do + mVar <- newTMVarIO Feed.emptyFeeds + return $ Gossiper mVar + +addFeed :: ToJSON a => Gossiper a -> Feed.Feed a -> IO () +addFeed (Gossiper (mFeeds)) feed = do + atomically $ do + feeds <- takeTMVar mFeeds + putTMVar mFeeds (Feed.insert feed feeds) + +writeFeed :: ToJSON a => RPC.Stream -> Feed.Feed a -> IO (Either Text ()) +writeFeed stream (Feed.Feed _ msgs) = do + return <$> forM_ + msgs + (\msg -> do + let msg' = Feed.encodeJSONVerifiableMessage msg + err <- RPC.writeStream stream def (RPC.JSONPayload msg') + either (\err -> print err) (\_ -> return ()) err + ) + +instance ToJSON a => RPC.Handler (Gossiper a) where + endpoints h = [RPC.Endpoint ["createHistoryStream"] RPC.Source] + + serve (Gossiper mFeeds) (RPC.Endpoint ["createHistoryStream"] RPC.Source) arg stream + = do + feeds <- atomically $ readTMVar mFeeds + let req = decodeJSON (encodeJSON arg) :: Either Text [Request] + case req of + Left err -> do + return . return $ () + Right [] -> return . return $ () + Right [arg] -> do + case Feed.lookup (id arg) feeds of + Just feed -> writeFeed stream feed + Nothing -> return . return $ () + + notifyConnect _ _ = return . return $ () + + notifyDisconnect _ _ = return . return $ () diff --git a/src/Ssb/Peer/RPC/Room.hs b/src/Ssb/Peer/RPC/Room.hs new file mode 100644 index 0000000..47bc956 --- /dev/null +++ b/src/Ssb/Peer/RPC/Room.hs @@ -0,0 +1,330 @@ +-- | This module implements Scuttlebutt's Remote Procedure Call for +-- Rooms. +-- +-- For more information kindly refer [WHERE] + +-- TODO: Documentation for SSB-Room + +module Ssb.Peer.RPC.Room where + + +import Protolude hiding ( Identity ) +import qualified Data.Aeson as Aeson +import Control.Concurrent.STM +import Data.Default +import qualified Data.Map.Strict as Map +import Data.Time.Clock ( UTCTime + , getCurrentTime + ) +import qualified Data.Text as Text + +import Ssb.Aux +import qualified Ssb.Identity as Ssb +import qualified Ssb.Pub as Ssb +import Ssb.Network +import qualified Ssb.Feed as Feed +import qualified Ssb.Peer.RPC as RPC + +seed :: Text +seed = "SSB+Room+PSK3TLYC2T86EHQCUHBUHASCASE18JBV24=" + +data Invite = Invite + { host :: Host + , port :: Port + , key :: Ssb.PublicKey + } deriving (Eq, Show) + +formatInvite :: Invite -> Text +formatInvite arg = + "net:" + <> host arg + <> ":" + <> port arg + <> "~shs" + <> ":" + <> formatPublicKey' arg + <> ":" + <> seed + where + formatPublicKey' arg = + Text.dropEnd 8 $ Text.drop 1 $ Ssb.formatPublicKey $ key arg + +-- TODO: Seriously, figure out how to use duplicate field names +data Room = Room { + endpoints :: TMVar (Map Ssb.PublicKey (RPC.ConnState, [Tunnel])) + , notifyChange :: TChan Bool + , roomName :: Text + , roomDesc :: Text + , tunnels :: TMVar (Map Tunnel [ThreadId]) + } + +newRoom :: Text -> Text -> IO Room +newRoom name desc = do + endpoints <- newTMVarIO Map.empty + notifier <- newBroadcastTChanIO + tunnels <- newTMVarIO Map.empty + return $ Room endpoints notifier name desc tunnels + +getEndpoints :: Room -> IO [Ssb.PublicKey] +getEndpoints h = do + let mVar = endpoints h + Map.keys <$> atomically (readTMVar mVar) + +lookUpPeer :: Room -> Ssb.PublicKey -> IO (Maybe RPC.ConnState) +lookUpPeer h peer = do + endpoints' <- atomically $ readTMVar (endpoints h) + return $ fst <$> Map.lookup peer endpoints' + +errConnLimitReached = "peer limit reached" + +registerPeer :: Room -> RPC.Stream -> IO (Either Text ()) +registerPeer h stream = do + let mVar = endpoints h + atomically $ do + endpoints' <- takeTMVar mVar + if Map.size endpoints' == (maxBound :: Int) + then return $ Left errConnLimitReached + else do + putTMVar mVar + $ Map.insert (RPC.peer stream) (RPC.conn stream, []) endpoints' + writeTChan (notifyChange h) True + return $ Right () + +unregisterPeer :: Room -> Ssb.PublicKey -> IO () +unregisterPeer h peer = do + let mVar = endpoints h + atomically $ do + endpoints' <- takeTMVar mVar + putTMVar mVar $ Map.delete peer endpoints' + writeTChan (notifyChange h) True + +data IsRoomResponse = IsRoomResponse { + name :: Text + , description :: Text + } deriving (Generic, Show) + +instance Aeson.FromJSON IsRoomResponse + +instance Aeson.ToJSON IsRoomResponse + +newIsRoomResponse :: Room -> IsRoomResponse +newIsRoomResponse l = IsRoomResponse (roomName l) (roomDesc l) + +announceRequest :: RPC.Request [Text] +announceRequest = + RPC.Request { name = ["tunnel", "announce"], typ = RPC.Async, args = [] } + +announce :: RPC.ConnState -> IO (Either Text ()) +announce conn = RPC.requestAsync conn announceRequest + +data ConnectRequest = ConnectRequest { + target :: Ssb.PublicKey + , portal :: Ssb.PublicKey + } deriving (Generic, Show) + +instance Aeson.FromJSON ConnectRequest + +instance Aeson.ToJSON ConnectRequest + +leaveRequest :: RPC.Request [Text] +leaveRequest = + RPC.Request { name = ["tunnel", "leave"], typ = RPC.Async, args = [] } + +leave :: RPC.ConnState -> IO (Either Text ()) +leave conn = RPC.requestAsync conn leaveRequest + +pingRequest :: RPC.Request [Text] +pingRequest = + RPC.Request { name = ["tunnel", "ping"], typ = RPC.Async, args = [] } + +ping :: RPC.ConnState -> IO (Either Text UTCTime) +ping conn = RPC.requestAsync conn pingRequest + +-- | fork creates a new thread, incrementing the counter in the given mVar. +fork mVar action = do + fork' <- newEmptyTMVarIO + atomically $ do + forks <- takeTMVar mVar + putTMVar mVar $ fork' : forks + forkFinally + action + (\_ -> do + print "exiting fork" + atomically $ putTMVar fork' () + ) + +-- | wait returns when all forks have completed. +waitForkGroup mVar threads = do + cs <- atomically $ takeTMVar mVar + case cs of + [] -> return () + m : ms -> do + atomically $ putTMVar mVar ms + atomically $ takeTMVar m + forM_ threads killThread + waitForkGroup mVar [] + +forwardMessages :: RPC.Stream -> RPC.Stream -> IO (Either Text ()) +forwardMessages s1 s2 = do + mMsg <- RPC.readStream s1 + case mMsg of + Nothing -> return $ Right () + Just msg -> do + res <- RPC.writeStream s2 def msg + case res of + Left err -> return $ Left err + Right _ -> forwardMessages s1 s2 + +type Tunnel = (Ssb.PublicKey, Ssb.PublicKey) + +newTunnel arg1 arg2 = + -- the tunnel's entries need to be ordered to make pairs unique + if arg1 < arg2 then (arg1, arg2) else (arg2, arg1) + +createTunnel :: Room -> (RPC.Stream, RPC.Stream) -> IO (Either Text ()) +createTunnel room (stream1, stream2) = do + let peer1 = RPC.connPeer $ RPC.conn stream1 + let peer2 = RPC.connPeer $ RPC.conn stream2 + --print + -- $ "creating tunnel for " + -- <> Ssb.formatPublicKey peer1 + -- <> " <-> " + -- <> Ssb.formatPublicKey peer2 + let tunnel = newTunnel peer1 peer2 + + exists <- atomically $ do + tunnels' <- readTMVar (tunnels room) + return $ Map.member tunnel tunnels' + if exists + then return $ Left "only one tunnel allowed" + else do + waiter <- newTMVarIO [] + thread1 <- fork waiter $ forwardMessages stream1 stream2 + thread2 <- fork waiter $ forwardMessages stream2 stream1 + let threads = [thread1, thread2] + + atomically $ do + endpoints' <- takeTMVar (endpoints room) + let endpoints'' = Map.adjust (addTunnel tunnel) (fst tunnel) endpoints' + let endpoints''' = + Map.adjust (addTunnel tunnel) (snd tunnel) endpoints'' + putTMVar (endpoints room) endpoints''' + + tunnels' <- takeTMVar (tunnels room) + let tunnels'' = Map.insert tunnel threads tunnels' + putTMVar (tunnels room) tunnels'' + + waitForkGroup waiter threads + + atomically $ do + endpoints' <- takeTMVar (endpoints room) + let endpoints'' = + Map.adjust (removeTunnel tunnel) (fst tunnel) endpoints' + let endpoints''' = + Map.adjust (removeTunnel tunnel) (snd tunnel) endpoints'' + putTMVar (endpoints room) endpoints''' + + tunnels' <- takeTMVar (tunnels room) + putTMVar (tunnels room) $ Map.delete tunnel tunnels' + + return . return $ () + where + addTunnel arg (conn, tunnels) = (conn, tunnels ++ [arg]) + removeTunnel arg (conn, tunnels) = (conn, filter (arg /=) tunnels) + +connect :: Room -> RPC.Stream -> ConnectRequest -> IO (Either Text ()) +connect room stream req = do + let tunnel = newTunnel (RPC.connPeer $ RPC.conn $ stream) (target req) + tunnels' <- atomically $ readTMVar (tunnels room) + mPeer <- lookUpPeer room (target req) + let peerConn = do + bool (return ()) errSelfNotAllowed (fst tunnel == snd tunnel) + bool (return ()) errOnlyUniqueTunnel (Map.member tunnel tunnels') + maybeToRight errPeerNotAvailable mPeer + case peerConn of + Left err -> do + print $ "errorr! " <> err + return $ Left err + Right conn -> RPC.request conn (rpcRequest req) + $ \peerStream -> createTunnel room (stream, peerStream) + where + errOnlyUniqueTunnel = Left $ "only unique tunnels are allowed" + errPeerNotAvailable = "peer is not connected" :: Text + errSelfNotAllowed = Left $ "connecting to self not allowed" + rpcRequest arg = + RPC.Request { name = ["tunnel", "connect"], typ = RPC.Duplex, args = [arg] } + +leave' :: Room -> Ssb.PublicKey -> IO (Either Text ()) +leave' room peer = do + endpoints' <- atomically $ readTMVar (endpoints room) + tunnels' <- atomically $ readTMVar (tunnels room) + + let etunnels = fromMaybe mempty $ snd <$> Map.lookup peer endpoints' + forM_ etunnels $ \tunnel -> do + let threads = fromMaybe mempty $ Map.lookup tunnel tunnels' + forM_ threads killThread + unregisterPeer room peer + return . return $ () + +instance RPC.Handler Room where + endpoints _ = + [ RPC.Endpoint ["tunnel", "announce"] RPC.Async + , RPC.Endpoint ["tunnel", "connect"] RPC.Duplex + , RPC.Endpoint ["tunnel", "endpoints"] RPC.Async + , RPC.Endpoint ["tunnel", "leave"] RPC.Async + , RPC.Endpoint ["tunnel", "isRoom"] RPC.Async + , RPC.Endpoint ["tunnel", "ping"] RPC.Async + ] + + -- The announce and leave endpoints are defined by the server's JS, but + -- never called by the client. The official (NodeJS) project uses + -- disconnect notifications from the SSB-server. + -- + serve h (RPC.Endpoint ["tunnel", "announce"] _) args stream = + registerPeer h stream + + -- should decode request + serve h (RPC.Endpoint ["tunnel", "connect"] RPC.Duplex) args stream = do + let args' = + decodeJSON (toS $ Aeson.encode args) :: Either + Text + [ConnectRequest] + case args' of + Left err -> return $ Left err + Right [connReq] -> connect h stream connReq + otherwise -> return $ Left "bad target argument" + + serve h (RPC.Endpoint ["tunnel", "endpoints"] _) _ stream = do + err <- registerPeer h stream + case err of + Left msg -> return $ Left msg + Right _ -> do + change <- atomically $ dupTChan $ notifyChange h + while $ do + endpoints' <- getEndpoints h + let resp = filter (/= RPC.peer stream) endpoints' + res <- RPC.writeStreamJSON stream resp + if isLeft res then return False else atomically $ readTChan change + return $ Right () + where + while f = do + continue <- f + if continue then (while f) else return False + + serve room (RPC.Endpoint ["tunnel", "leave"] _) _ stream = + leave' room (RPC.peer stream) + + serve room (RPC.Endpoint ["tunnel", "isRoom"] _) _ stream = + RPC.writeStreamJSON stream (newIsRoomResponse room) + + serve room (RPC.Endpoint ["tunnel", "ping"] _) _ stream = do + resp <- getCurrentTime + RPC.writeStreamJSON stream resp + + serve room endpoint@otherwise arg stream = (RPC.notFoundHandlerFunc endpoint) arg stream + + notifyConnect _ _ = return . return $ () + + notifyDisconnect room peer = do + _ <- leave' room peer + return . return $ () diff --git a/src/Ssb/Peer/RPC/WhoAmI.hs b/src/Ssb/Peer/RPC/WhoAmI.hs new file mode 100644 index 0000000..be979fc --- /dev/null +++ b/src/Ssb/Peer/RPC/WhoAmI.hs @@ -0,0 +1,49 @@ +-- | This module implements Scuttlebutt's Remote Procedure Call for +-- Ping. +-- +-- For more information kindly refer [WHERE] + +-- TODO: Update above documentation + +module Ssb.Peer.RPC.WhoAmI where + +import Protolude hiding ( Identity ) +import Data.Aeson as Aeson (FromJSON,ToJSON) + +import qualified Ssb.Identity as Ssb +import qualified Ssb.Feed as Feed +import qualified Ssb.Peer.RPC as RPC + +whoAmIRequest :: RPC.Request [Text] +whoAmIRequest = RPC.Request { + name = ["whoami"] + , typ = RPC.Async + , args = [] + } + +newtype WhoAmIResponse = WhoAmIResponse + { id :: Feed.FeedID + } deriving (Eq,Generic,Show) + +instance FromJSON WhoAmIResponse + +instance ToJSON WhoAmIResponse + +whoAmI + :: RPC.ConnState + -> IO (Either Text WhoAmIResponse) +whoAmI conn = RPC.requestAsync conn whoAmIRequest + +newtype Handler = Handler () + +newHandler :: Handler +newHandler = Handler () + +instance RPC.Handler Handler where + endpoints h = [RPC.Endpoint ["whoami"] RPC.Async] + + serve (Handler ssbID) (RPC.Endpoint ["whoami"] RPC.Async) _ stream = + RPC.writeStreamJSON stream (WhoAmIResponse $ Feed.FeedID (RPC.peer stream)) + + notifyConnect _ _ = return . return $ () + notifyDisconnect _ _ = return . return $ () |