aboutsummaryrefslogtreecommitdiff
path: root/src/Ssb/Peer/RPC/Gossip.hs
blob: 80e1aace3d73f14907141ee949c4c9c20c827a49 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
-- | This module implements Scuttlebutt's Remote Procedure Call for
-- CreateHistoryStream.
--
-- For more information kindly refer the to protocol guide
-- https://ssbc.github.io/scuttlebutt-protocol-guide

module Ssb.Peer.RPC.Gossip where

import           Protolude               hiding ( Identity )

import           Control.Concurrent.STM
import qualified Data.Aeson                     as Aeson
import           Data.Aeson                     ( FromJSON
                                                , ToJSON
                                                )
import           Data.Default
import qualified Data.Map.Strict               as Map

import           Ssb.Aux
import qualified Ssb.Feed                      as Feed
import qualified Ssb.Peer.RPC                  as RPC


-- | TODO: Comment
-- | TODO: Naming
-- | TODO: Keyed Message support
-- | TODO: Proper Request default values setting
data Request = Request
    { id        :: Feed.FeedID
    , sequence  :: Maybe Int
    , limit     :: Maybe Int
    , live      :: Maybe Bool
    , old       :: Maybe Bool
    , keys      :: Bool
    } deriving (Generic,Show)

newRequest :: Feed.FeedID -> Request
newRequest id = Request { id       = id
                        , sequence = Nothing
                        , limit    = Nothing
                        , live     = Just False
                        , old      = Just False
                        , keys     = True
                        }


instance FromJSON Request

instance ToJSON Request where
  toJSON = Aeson.genericToJSON (Aeson.defaultOptions {Aeson.omitNothingFields = True})

-- TODO: reduce friction for introducing RPC requests

createHistoryStreamRequest :: Request -> RPC.Request [Request]
createHistoryStreamRequest req = RPC.Request
  { RPC.name = ["createHistoryStream"]
  , RPC.typ  = RPC.Source
  , RPC.args = [req]
  }

createHistoryStream
  :: FromJSON b
  => RPC.ConnState
  -> Request
  -> a
  -> (a -> Feed.VerifiableMessage b -> IO (Either Text a))
  -> IO (Either Text a)
createHistoryStream conn req init cmd = RPC.request
  conn
  (createHistoryStreamRequest req)
  (RPC.foldStream cmd' init)
 where
  cmd' a payload = case payload of
    RPC.JSONPayload buf -> do
      msg <- Feed.decodeJSONVerifiableMessage buf
      either (return . error) (cmd a) msg
    v@otherwise -> return $ error "expected JSON but got something else"

data KeyedMessage a = KeyedMessage
  { key       :: Feed.MessageID
  , timestamp :: Int
  , value     :: Feed.Message a
  } deriving (Generic,Show)

instance (FromJSON a) => FromJSON (KeyedMessage a)

instance (ToJSON a) => ToJSON (KeyedMessage a)

--createKeyedHistoryStream
--  :: RPC.ConnState
--  -> Request
--  -> (KeyedMessage -> IO (Either Text a))
--  -> IO (Either Text a)
--createHistoryStream conn req =
--  RPC.request conn (createHistoryStreamRequest req)
--

newtype Gossiper a = Gossiper (TMVar (Feed.Feeds a))

newGossiper :: ToJSON a => IO (Gossiper a)
newGossiper = do
    mVar <- newTMVarIO Feed.emptyFeeds
    return $ Gossiper mVar

addFeed :: ToJSON a => Gossiper a -> Feed.Feed a -> IO ()
addFeed (Gossiper (mFeeds)) feed = do
     atomically $ do
        feeds <- takeTMVar mFeeds
        putTMVar mFeeds (Feed.insert feed feeds)

writeFeed :: ToJSON a => RPC.Stream -> Feed.Feed a -> IO (Either Text ())
writeFeed stream (Feed.Feed _ msgs) = do
  return <$> forM_
    msgs
    (\msg -> do
      let msg' = Feed.encodeJSONVerifiableMessage msg
      err <- RPC.writeStream stream def (RPC.JSONPayload msg')
      either (\err -> putStrLn err) (\_ -> return ()) err
    )

instance ToJSON a => RPC.Handler (Gossiper a) where
  endpoints h = [RPC.Endpoint ["createHistoryStream"] RPC.Source]

  serve (Gossiper mFeeds) (RPC.Endpoint ["createHistoryStream"] RPC.Source) arg stream
    = do
      feeds <- atomically $ readTMVar mFeeds
      let req = decodeJSON (encodeJSON arg) :: Either Text [Request]
      case req of
        Left  err   -> do
            return . return $ ()
        Right [] -> return . return $ ()
        Right [arg] -> do
          case Feed.lookup (id arg) feeds of
            Just feed -> writeFeed stream feed
            Nothing   -> return . return $ ()

  notifyConnect _ _ = return . return $ ()

  notifyDisconnect _ _ = return . return $ ()