From 41cde99ec6189dbecca6803a5aa4f6f18142e8ba Mon Sep 17 00:00:00 2001 From: Haskell Guy Date: Tue, 26 May 2020 13:07:50 +0200 Subject: initial commit --- src/Ssb/Peer/RPC/Gossip.hs | 139 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 139 insertions(+) create mode 100644 src/Ssb/Peer/RPC/Gossip.hs (limited to 'src/Ssb/Peer/RPC/Gossip.hs') diff --git a/src/Ssb/Peer/RPC/Gossip.hs b/src/Ssb/Peer/RPC/Gossip.hs new file mode 100644 index 0000000..b97e4a4 --- /dev/null +++ b/src/Ssb/Peer/RPC/Gossip.hs @@ -0,0 +1,139 @@ +-- | This module implements Scuttlebutt's Remote Procedure Call for +-- CreateHistoryStream. +-- +-- For more information kindly refer the to protocol guide +-- https://ssbc.github.io/scuttlebutt-protocol-guide + +module Ssb.Peer.RPC.Gossip where + +import Protolude hiding ( Identity ) + +import Control.Concurrent.STM +import qualified Data.Aeson as Aeson +import Data.Aeson ( FromJSON + , ToJSON + ) +import Data.Default +import qualified Data.Map.Strict as Map + +import Ssb.Aux +import qualified Ssb.Feed as Feed +import qualified Ssb.Peer.RPC as RPC + + +-- | TODO: Comment +-- | TODO: Naming +-- | TODO: Keyed Message support +-- | TODO: Proper Request default values setting +data Request = Request + { id :: Feed.FeedID + , sequence :: Maybe Int + , limit :: Maybe Int + , live :: Maybe Bool + , old :: Maybe Bool + , keys :: Bool + } deriving (Generic,Show) + +newRequest :: Feed.FeedID -> Request +newRequest id = Request { id = id + , sequence = Nothing + , limit = Nothing + , live = Just False + , old = Just False + , keys = True + } + + +instance FromJSON Request + +instance ToJSON Request where + toJSON = Aeson.genericToJSON (Aeson.defaultOptions {Aeson.omitNothingFields = True}) + +-- TODO: reduce friction for introducing RPC requests + +createHistoryStreamRequest :: Request -> RPC.Request [Request] +createHistoryStreamRequest req = RPC.Request + { RPC.name = ["createHistoryStream"] + , RPC.typ = RPC.Source + , RPC.args = [req] + } + +createHistoryStream + :: FromJSON b + => RPC.ConnState + -> Request + -> a + -> (a -> Feed.VerifiableMessage b -> IO (Either Text a)) + -> IO (Either Text a) +createHistoryStream conn req init cmd = RPC.request + conn + (createHistoryStreamRequest req) + (RPC.foldStream cmd' init) + where + cmd' a payload = case payload of + RPC.JSONPayload buf -> do + msg <- Feed.decodeJSONVerifiableMessage buf + either (return . error) (cmd a) msg + v@otherwise -> return $ error "expected JSON but got something else" + +data KeyedMessage a = KeyedMessage + { key :: Feed.MessageID + , timestamp :: Int + , value :: Feed.Message a + } deriving (Generic,Show) + +instance (FromJSON a) => FromJSON (KeyedMessage a) + +instance (ToJSON a) => ToJSON (KeyedMessage a) + +--createKeyedHistoryStream +-- :: RPC.ConnState +-- -> Request +-- -> (KeyedMessage -> IO (Either Text a)) +-- -> IO (Either Text a) +--createHistoryStream conn req = +-- RPC.request conn (createHistoryStreamRequest req) +-- + +newtype Gossiper a = Gossiper (TMVar (Feed.Feeds a)) + +newGossiper :: ToJSON a => IO (Gossiper a) +newGossiper = do + mVar <- newTMVarIO Feed.emptyFeeds + return $ Gossiper mVar + +addFeed :: ToJSON a => Gossiper a -> Feed.Feed a -> IO () +addFeed (Gossiper (mFeeds)) feed = do + atomically $ do + feeds <- takeTMVar mFeeds + putTMVar mFeeds (Feed.insert feed feeds) + +writeFeed :: ToJSON a => RPC.Stream -> Feed.Feed a -> IO (Either Text ()) +writeFeed stream (Feed.Feed _ msgs) = do + return <$> forM_ + msgs + (\msg -> do + let msg' = Feed.encodeJSONVerifiableMessage msg + err <- RPC.writeStream stream def (RPC.JSONPayload msg') + either (\err -> print err) (\_ -> return ()) err + ) + +instance ToJSON a => RPC.Handler (Gossiper a) where + endpoints h = [RPC.Endpoint ["createHistoryStream"] RPC.Source] + + serve (Gossiper mFeeds) (RPC.Endpoint ["createHistoryStream"] RPC.Source) arg stream + = do + feeds <- atomically $ readTMVar mFeeds + let req = decodeJSON (encodeJSON arg) :: Either Text [Request] + case req of + Left err -> do + return . return $ () + Right [] -> return . return $ () + Right [arg] -> do + case Feed.lookup (id arg) feeds of + Just feed -> writeFeed stream feed + Nothing -> return . return $ () + + notifyConnect _ _ = return . return $ () + + notifyDisconnect _ _ = return . return $ () -- cgit v1.2.3