1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
-- | This module implements Scuttlebutt's Remote Procedure Call for
-- CreateHistoryStream.
--
-- For more information kindly refer the to protocol guide
-- https://ssbc.github.io/scuttlebutt-protocol-guide
module Ssb.Peer.RPC.Gossip where
import Protolude hiding ( Identity )
import Control.Concurrent.STM
import qualified Data.Aeson as Aeson
import Data.Aeson ( FromJSON
, ToJSON
)
import Data.Default
import qualified Data.Map.Strict as Map
import Ssb.Aux
import qualified Ssb.Feed as Feed
import qualified Ssb.Peer.RPC as RPC
-- | TODO: Comment
-- | TODO: Naming
-- | TODO: Keyed Message support
-- | TODO: Proper Request default values setting
data Request = Request
{ id :: Feed.FeedID
, sequence :: Maybe Int
, limit :: Maybe Int
, live :: Maybe Bool
, old :: Maybe Bool
, keys :: Bool
} deriving (Generic,Show)
newRequest :: Feed.FeedID -> Request
newRequest id = Request { id = id
, sequence = Nothing
, limit = Nothing
, live = Just False
, old = Just False
, keys = True
}
instance FromJSON Request
instance ToJSON Request where
toJSON = Aeson.genericToJSON (Aeson.defaultOptions {Aeson.omitNothingFields = True})
-- TODO: reduce friction for introducing RPC requests
createHistoryStreamRequest :: Request -> RPC.Request [Request]
createHistoryStreamRequest req = RPC.Request
{ RPC.name = ["createHistoryStream"]
, RPC.typ = RPC.Source
, RPC.args = [req]
}
createHistoryStream
:: FromJSON b
=> RPC.ConnState
-> Request
-> a
-> (a -> Feed.VerifiableMessage b -> IO (Either Text a))
-> IO (Either Text a)
createHistoryStream conn req init cmd = RPC.request
conn
(createHistoryStreamRequest req)
(RPC.foldStream cmd' init)
where
cmd' a payload = case payload of
RPC.JSONPayload buf -> do
msg <- Feed.decodeJSONVerifiableMessage buf
either (return . error) (cmd a) msg
v@otherwise -> return $ error "expected JSON but got something else"
data KeyedMessage a = KeyedMessage
{ key :: Feed.MessageID
, timestamp :: Int
, value :: Feed.Message a
} deriving (Generic,Show)
instance (FromJSON a) => FromJSON (KeyedMessage a)
instance (ToJSON a) => ToJSON (KeyedMessage a)
--createKeyedHistoryStream
-- :: RPC.ConnState
-- -> Request
-- -> (KeyedMessage -> IO (Either Text a))
-- -> IO (Either Text a)
--createHistoryStream conn req =
-- RPC.request conn (createHistoryStreamRequest req)
--
newtype Gossiper a = Gossiper (TMVar (Feed.Feeds a))
newGossiper :: ToJSON a => IO (Gossiper a)
newGossiper = do
mVar <- newTMVarIO Feed.emptyFeeds
return $ Gossiper mVar
addFeed :: ToJSON a => Gossiper a -> Feed.Feed a -> IO ()
addFeed (Gossiper (mFeeds)) feed = do
atomically $ do
feeds <- takeTMVar mFeeds
putTMVar mFeeds (Feed.insert feed feeds)
writeFeed :: ToJSON a => RPC.Stream -> Feed.Feed a -> IO (Either Text ())
writeFeed stream (Feed.Feed _ msgs) = do
return <$> forM_
msgs
(\msg -> do
let msg' = Feed.encodeJSONVerifiableMessage msg
err <- RPC.writeStream stream def (RPC.JSONPayload msg')
either (\err -> putStrLn err) (\_ -> return ()) err
)
instance ToJSON a => RPC.Handler (Gossiper a) where
endpoints h = [RPC.Endpoint ["createHistoryStream"] RPC.Source]
serve (Gossiper mFeeds) (RPC.Endpoint ["createHistoryStream"] RPC.Source) arg stream
= do
feeds <- atomically $ readTMVar mFeeds
let req = decodeJSON (encodeJSON arg) :: Either Text [Request]
case req of
Left err -> do
return . return $ ()
Right [] -> return . return $ ()
Right [arg] -> do
case Feed.lookup (id arg) feeds of
Just feed -> writeFeed stream feed
Nothing -> return . return $ ()
notifyConnect _ _ = return . return $ ()
notifyDisconnect _ _ = return . return $ ()
|