-- | This module implements Scuttlebutt's Remote Procedure Call for -- CreateHistoryStream. -- -- For more information kindly refer the to protocol guide -- https://ssbc.github.io/scuttlebutt-protocol-guide module Ssb.Peer.RPC.Gossip where import Protolude hiding ( Identity ) import Control.Concurrent.STM import qualified Data.Aeson as Aeson import Data.Aeson ( FromJSON , ToJSON ) import Data.Default import qualified Data.Map.Strict as Map import Ssb.Aux import qualified Ssb.Feed as Feed import qualified Ssb.Peer.RPC as RPC -- | TODO: Comment -- | TODO: Naming -- | TODO: Keyed Message support -- | TODO: Proper Request default values setting data Request = Request { id :: Feed.FeedID , sequence :: Maybe Int , limit :: Maybe Int , live :: Maybe Bool , old :: Maybe Bool , keys :: Bool } deriving (Generic,Show) newRequest :: Feed.FeedID -> Request newRequest id = Request { id = id , sequence = Nothing , limit = Nothing , live = Just False , old = Just False , keys = True } instance FromJSON Request instance ToJSON Request where toJSON = Aeson.genericToJSON (Aeson.defaultOptions {Aeson.omitNothingFields = True}) -- TODO: reduce friction for introducing RPC requests createHistoryStreamRequest :: Request -> RPC.Request [Request] createHistoryStreamRequest req = RPC.Request { RPC.name = ["createHistoryStream"] , RPC.typ = RPC.Source , RPC.args = [req] } createHistoryStream :: FromJSON b => RPC.ConnState -> Request -> a -> (a -> Feed.VerifiableMessage b -> IO (Either Text a)) -> IO (Either Text a) createHistoryStream conn req init cmd = RPC.request conn (createHistoryStreamRequest req) (RPC.foldStream cmd' init) where cmd' a payload = case payload of RPC.JSONPayload buf -> do msg <- Feed.decodeJSONVerifiableMessage buf either (return . error) (cmd a) msg v@otherwise -> return $ error "expected JSON but got something else" data KeyedMessage a = KeyedMessage { key :: Feed.MessageID , timestamp :: Int , value :: Feed.Message a } deriving (Generic,Show) instance (FromJSON a) => FromJSON (KeyedMessage a) instance (ToJSON a) => ToJSON (KeyedMessage a) --createKeyedHistoryStream -- :: RPC.ConnState -- -> Request -- -> (KeyedMessage -> IO (Either Text a)) -- -> IO (Either Text a) --createHistoryStream conn req = -- RPC.request conn (createHistoryStreamRequest req) -- newtype Gossiper a = Gossiper (TMVar (Feed.Feeds a)) newGossiper :: ToJSON a => IO (Gossiper a) newGossiper = do mVar <- newTMVarIO Feed.emptyFeeds return $ Gossiper mVar addFeed :: ToJSON a => Gossiper a -> Feed.Feed a -> IO () addFeed (Gossiper (mFeeds)) feed = do atomically $ do feeds <- takeTMVar mFeeds putTMVar mFeeds (Feed.insert feed feeds) writeFeed :: ToJSON a => RPC.Stream -> Feed.Feed a -> IO (Either Text ()) writeFeed stream (Feed.Feed _ msgs) = do return <$> forM_ msgs (\msg -> do let msg' = Feed.encodeJSONVerifiableMessage msg err <- RPC.writeStream stream def (RPC.JSONPayload msg') either (\err -> putStrLn err) (\_ -> return ()) err ) instance ToJSON a => RPC.Handler (Gossiper a) where endpoints h = [RPC.Endpoint ["createHistoryStream"] RPC.Source] serve (Gossiper mFeeds) (RPC.Endpoint ["createHistoryStream"] RPC.Source) arg stream = do feeds <- atomically $ readTMVar mFeeds let req = decodeJSON (encodeJSON arg) :: Either Text [Request] case req of Left err -> do return . return $ () Right [] -> return . return $ () Right [arg] -> do case Feed.lookup (id arg) feeds of Just feed -> writeFeed stream feed Nothing -> return . return $ () notifyConnect _ _ = return . return $ () notifyDisconnect _ _ = return . return $ ()