aboutsummaryrefslogtreecommitdiff
path: root/src/Ssb/Peer/RPC/Gossip.hs
diff options
context:
space:
mode:
authorHaskell Guy <haskell.guy@localhost>2020-05-26 13:07:50 +0200
committerHaskell Guy <haskell.guy@localhost>2020-05-26 13:37:29 +0200
commit41cde99ec6189dbecca6803a5aa4f6f18142e8ba (patch)
tree7a0ceab0d516b8c3b7b49313100ae50c97e875c3 /src/Ssb/Peer/RPC/Gossip.hs
downloadssb-haskell-41cde99ec6189dbecca6803a5aa4f6f18142e8ba.tar.xz
initial commit
Diffstat (limited to 'src/Ssb/Peer/RPC/Gossip.hs')
-rw-r--r--src/Ssb/Peer/RPC/Gossip.hs139
1 files changed, 139 insertions, 0 deletions
diff --git a/src/Ssb/Peer/RPC/Gossip.hs b/src/Ssb/Peer/RPC/Gossip.hs
new file mode 100644
index 0000000..b97e4a4
--- /dev/null
+++ b/src/Ssb/Peer/RPC/Gossip.hs
@@ -0,0 +1,139 @@
+-- | This module implements Scuttlebutt's Remote Procedure Call for
+-- CreateHistoryStream.
+--
+-- For more information kindly refer the to protocol guide
+-- https://ssbc.github.io/scuttlebutt-protocol-guide
+
+module Ssb.Peer.RPC.Gossip where
+
+import Protolude hiding ( Identity )
+
+import Control.Concurrent.STM
+import qualified Data.Aeson as Aeson
+import Data.Aeson ( FromJSON
+ , ToJSON
+ )
+import Data.Default
+import qualified Data.Map.Strict as Map
+
+import Ssb.Aux
+import qualified Ssb.Feed as Feed
+import qualified Ssb.Peer.RPC as RPC
+
+
+-- | TODO: Comment
+-- | TODO: Naming
+-- | TODO: Keyed Message support
+-- | TODO: Proper Request default values setting
+data Request = Request
+ { id :: Feed.FeedID
+ , sequence :: Maybe Int
+ , limit :: Maybe Int
+ , live :: Maybe Bool
+ , old :: Maybe Bool
+ , keys :: Bool
+ } deriving (Generic,Show)
+
+newRequest :: Feed.FeedID -> Request
+newRequest id = Request { id = id
+ , sequence = Nothing
+ , limit = Nothing
+ , live = Just False
+ , old = Just False
+ , keys = True
+ }
+
+
+instance FromJSON Request
+
+instance ToJSON Request where
+ toJSON = Aeson.genericToJSON (Aeson.defaultOptions {Aeson.omitNothingFields = True})
+
+-- TODO: reduce friction for introducing RPC requests
+
+createHistoryStreamRequest :: Request -> RPC.Request [Request]
+createHistoryStreamRequest req = RPC.Request
+ { RPC.name = ["createHistoryStream"]
+ , RPC.typ = RPC.Source
+ , RPC.args = [req]
+ }
+
+createHistoryStream
+ :: FromJSON b
+ => RPC.ConnState
+ -> Request
+ -> a
+ -> (a -> Feed.VerifiableMessage b -> IO (Either Text a))
+ -> IO (Either Text a)
+createHistoryStream conn req init cmd = RPC.request
+ conn
+ (createHistoryStreamRequest req)
+ (RPC.foldStream cmd' init)
+ where
+ cmd' a payload = case payload of
+ RPC.JSONPayload buf -> do
+ msg <- Feed.decodeJSONVerifiableMessage buf
+ either (return . error) (cmd a) msg
+ v@otherwise -> return $ error "expected JSON but got something else"
+
+data KeyedMessage a = KeyedMessage
+ { key :: Feed.MessageID
+ , timestamp :: Int
+ , value :: Feed.Message a
+ } deriving (Generic,Show)
+
+instance (FromJSON a) => FromJSON (KeyedMessage a)
+
+instance (ToJSON a) => ToJSON (KeyedMessage a)
+
+--createKeyedHistoryStream
+-- :: RPC.ConnState
+-- -> Request
+-- -> (KeyedMessage -> IO (Either Text a))
+-- -> IO (Either Text a)
+--createHistoryStream conn req =
+-- RPC.request conn (createHistoryStreamRequest req)
+--
+
+newtype Gossiper a = Gossiper (TMVar (Feed.Feeds a))
+
+newGossiper :: ToJSON a => IO (Gossiper a)
+newGossiper = do
+ mVar <- newTMVarIO Feed.emptyFeeds
+ return $ Gossiper mVar
+
+addFeed :: ToJSON a => Gossiper a -> Feed.Feed a -> IO ()
+addFeed (Gossiper (mFeeds)) feed = do
+ atomically $ do
+ feeds <- takeTMVar mFeeds
+ putTMVar mFeeds (Feed.insert feed feeds)
+
+writeFeed :: ToJSON a => RPC.Stream -> Feed.Feed a -> IO (Either Text ())
+writeFeed stream (Feed.Feed _ msgs) = do
+ return <$> forM_
+ msgs
+ (\msg -> do
+ let msg' = Feed.encodeJSONVerifiableMessage msg
+ err <- RPC.writeStream stream def (RPC.JSONPayload msg')
+ either (\err -> print err) (\_ -> return ()) err
+ )
+
+instance ToJSON a => RPC.Handler (Gossiper a) where
+ endpoints h = [RPC.Endpoint ["createHistoryStream"] RPC.Source]
+
+ serve (Gossiper mFeeds) (RPC.Endpoint ["createHistoryStream"] RPC.Source) arg stream
+ = do
+ feeds <- atomically $ readTMVar mFeeds
+ let req = decodeJSON (encodeJSON arg) :: Either Text [Request]
+ case req of
+ Left err -> do
+ return . return $ ()
+ Right [] -> return . return $ ()
+ Right [arg] -> do
+ case Feed.lookup (id arg) feeds of
+ Just feed -> writeFeed stream feed
+ Nothing -> return . return $ ()
+
+ notifyConnect _ _ = return . return $ ()
+
+ notifyDisconnect _ _ = return . return $ ()