aboutsummaryrefslogtreecommitdiff
path: root/src/Ssb/Peer/RPC/Room.hs
blob: 0b854147bb9b64b02666c8f4b46dc76953f0f06f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
-- | This module implements Scuttlebutt's Remote Procedure Call for
-- Rooms.
--
-- For more information kindly refer [WHERE]

-- TODO: Documentation for SSB-Room

module Ssb.Peer.RPC.Room where


import           Protolude               hiding ( Identity )
import qualified Data.Aeson                    as Aeson
import           Control.Concurrent.STM
import           Data.Default
import qualified Data.Map.Strict               as Map
import           Data.Time.Clock                ( UTCTime
                                                , getCurrentTime
                                                )
import qualified Data.Text                     as Text

import  Ssb.Aux
import qualified Ssb.Identity                  as Ssb
import qualified Ssb.Pub                       as Ssb
import           Ssb.Network
import qualified Ssb.Feed                      as Feed
import qualified Ssb.Peer.RPC                  as RPC

seed :: Text
seed = "SSB+Room+PSK3TLYC2T86EHQCUHBUHASCASE18JBV24="

data Invite = Invite
    { host :: Host
    , port :: Port
    , key  :: Ssb.PublicKey
    } deriving (Eq, Show)

formatInvite :: Invite -> Text
formatInvite arg =
  "net:"
    <> host arg
    <> ":"
    <> port arg
    <> "~shs"
    <> ":"
    <> formatPublicKey' arg
    <> ":"
    <> seed
 where
  formatPublicKey' arg =
    Text.dropEnd 8 $ Text.drop 1 $ Ssb.formatPublicKey $ key arg

-- TODO: Seriously, figure out how to use duplicate field names
data Room = Room {
      endpoints     :: TMVar (Map Ssb.PublicKey (RPC.ConnState, [Tunnel]))
    , notifyChange  :: TChan Bool
    , roomName      :: Text
    , roomDesc      :: Text
    , tunnels       :: TMVar (Map Tunnel [ThreadId])
    }

newRoom :: Text -> Text -> IO Room
newRoom name desc = do
  endpoints <- newTMVarIO Map.empty
  notifier  <- newBroadcastTChanIO
  tunnels   <- newTMVarIO Map.empty
  return $ Room endpoints notifier name desc tunnels

getEndpoints :: Room -> IO [Ssb.PublicKey]
getEndpoints h = do
  let mVar = endpoints h
  Map.keys <$> atomically (readTMVar mVar)

lookUpPeer :: Room -> Ssb.PublicKey -> IO (Maybe RPC.ConnState)
lookUpPeer h peer = do
  endpoints' <- atomically $ readTMVar (endpoints h)
  return $ fst <$> Map.lookup peer endpoints'

errConnLimitReached = "peer limit reached"

registerPeer :: Room -> RPC.Stream -> IO (Either Text ())
registerPeer h stream = do
  let mVar = endpoints h
  atomically $ do
    endpoints' <- takeTMVar mVar
    if Map.size endpoints' == (maxBound :: Int)
      then return $ Left errConnLimitReached
      else do
        putTMVar mVar
          $ Map.insert (RPC.peer stream) (RPC.conn stream, []) endpoints'
        writeTChan (notifyChange h) True
        return $ Right ()

unregisterPeer :: Room -> Ssb.PublicKey -> IO ()
unregisterPeer h peer = do
  let mVar = endpoints h
  atomically $ do
    endpoints' <- takeTMVar mVar
    putTMVar mVar $ Map.delete peer endpoints'
    writeTChan (notifyChange h) True

data IsRoomResponse = IsRoomResponse {
      name          :: Text
    , description   :: Text
    } deriving (Generic, Show)

instance Aeson.FromJSON IsRoomResponse

instance Aeson.ToJSON IsRoomResponse

newIsRoomResponse :: Room -> IsRoomResponse
newIsRoomResponse l = IsRoomResponse (roomName l) (roomDesc l)

announceRequest :: RPC.Request [Text]
announceRequest =
  RPC.Request { name = ["tunnel", "announce"], typ = RPC.Async, args = [] }

announce :: RPC.ConnState -> IO (Either Text ())
announce conn = RPC.requestAsync conn announceRequest

data ConnectRequest = ConnectRequest {
      target   :: Ssb.PublicKey
    , portal   :: Ssb.PublicKey
    } deriving (Generic, Show)

instance Aeson.FromJSON ConnectRequest

instance Aeson.ToJSON ConnectRequest

leaveRequest :: RPC.Request [Text]
leaveRequest =
  RPC.Request { name = ["tunnel", "leave"], typ = RPC.Async, args = [] }

leave :: RPC.ConnState -> IO (Either Text ())
leave conn = RPC.requestAsync conn leaveRequest

pingRequest :: RPC.Request [Text]
pingRequest =
  RPC.Request { name = ["tunnel", "ping"], typ = RPC.Async, args = [] }

ping :: RPC.ConnState -> IO (Either Text UTCTime)
ping conn = RPC.requestAsync conn pingRequest

-- | fork creates a new thread, incrementing the counter in the given mVar.
fork mVar action = do
  fork' <- newEmptyTMVarIO
  atomically $ do
    forks <- takeTMVar mVar
    putTMVar mVar $ fork' : forks
  forkFinally
    action
    (\_ -> do
      atomically $ putTMVar fork' ()
    )

-- | wait returns when all forks have completed.
waitForkGroup mVar threads = do
  cs <- atomically $ takeTMVar mVar
  case cs of
    []     -> return ()
    m : ms -> do
      atomically $ putTMVar mVar ms
      atomically $ takeTMVar m
      forM_ threads killThread
      waitForkGroup mVar []

forwardMessages :: RPC.Stream -> RPC.Stream -> IO (Either Text ())
forwardMessages s1 s2 = do
  mMsg <- RPC.readStream s1
  case mMsg of
    Nothing  -> return $ Right ()
    Just msg -> do
      res <- RPC.writeStream s2 def msg
      case res of
        Left  err -> return $ Left err
        Right _   -> forwardMessages s1 s2

type Tunnel = (Ssb.PublicKey, Ssb.PublicKey)

newTunnel arg1 arg2 =
  -- the tunnel's entries need to be ordered to make pairs unique
  if arg1 < arg2 then (arg1, arg2) else (arg2, arg1)

createTunnel :: Room -> (RPC.Stream, RPC.Stream) -> IO (Either Text ())
createTunnel room (stream1, stream2) = do
  let peer1  = RPC.connPeer $ RPC.conn stream1
  let peer2  = RPC.connPeer $ RPC.conn stream2
  let tunnel = newTunnel peer1 peer2

  exists <- atomically $ do
    tunnels' <- readTMVar (tunnels room)
    return $ Map.member tunnel tunnels'
  if exists
    then return $ Left "only one tunnel allowed"
    else do
      waiter  <- newTMVarIO []
      thread1 <- fork waiter $ forwardMessages stream1 stream2
      thread2 <- fork waiter $ forwardMessages stream2 stream1
      let threads = [thread1, thread2]

      atomically $ do
        endpoints' <- takeTMVar (endpoints room)
        let endpoints'' = Map.adjust (addTunnel tunnel) (fst tunnel) endpoints'
        let endpoints''' =
              Map.adjust (addTunnel tunnel) (snd tunnel) endpoints''
        putTMVar (endpoints room) endpoints'''

        tunnels' <- takeTMVar (tunnels room)
        let tunnels'' = Map.insert tunnel threads tunnels'
        putTMVar (tunnels room) tunnels''

      waitForkGroup waiter threads

      atomically $ do
        endpoints' <- takeTMVar (endpoints room)
        let endpoints'' =
              Map.adjust (removeTunnel tunnel) (fst tunnel) endpoints'
        let endpoints''' =
              Map.adjust (removeTunnel tunnel) (snd tunnel) endpoints''
        putTMVar (endpoints room) endpoints'''

        tunnels' <- takeTMVar (tunnels room)
        putTMVar (tunnels room) $ Map.delete tunnel tunnels'

      return . return $ ()
 where
  addTunnel arg (conn, tunnels) = (conn, tunnels ++ [arg])
  removeTunnel arg (conn, tunnels) = (conn, filter (arg /=) tunnels)

connect :: Room -> RPC.Stream -> ConnectRequest -> IO (Either Text ())
connect room stream req = do
  let tunnel = newTunnel (RPC.connPeer $ RPC.conn $ stream) (target req)
  tunnels' <- atomically $ readTMVar (tunnels room)
  mPeer    <- lookUpPeer room (target req)
  let peerConn = do
        bool (return ()) errSelfNotAllowed   (fst tunnel == snd tunnel)
        bool (return ()) errOnlyUniqueTunnel (Map.member tunnel tunnels')
        maybeToRight errPeerNotAvailable mPeer
  case peerConn of
    Left err -> do
      putStrLn $ "room error:  " <> err
      return $ Left err
    Right conn -> RPC.request conn (rpcRequest req)
      $ \peerStream -> createTunnel room (stream, peerStream)
 where
  errOnlyUniqueTunnel = Left $ "only unique tunnels are allowed"
  errPeerNotAvailable = "peer is not connected" :: Text
  errSelfNotAllowed   = Left $ "connecting to self not allowed"
  rpcRequest arg =
    RPC.Request { name = ["tunnel", "connect"], typ = RPC.Duplex, args = [arg] }

leave' :: Room -> Ssb.PublicKey -> IO (Either Text ())
leave' room peer = do
  endpoints' <- atomically $ readTMVar (endpoints room)
  tunnels'   <- atomically $ readTMVar (tunnels room)

  let etunnels = fromMaybe mempty $ snd <$> Map.lookup peer endpoints'
  forM_ etunnels $ \tunnel -> do
    let threads = fromMaybe mempty $ Map.lookup tunnel tunnels'
    forM_ threads killThread
  unregisterPeer room peer
  return . return $ ()

instance RPC.Handler Room where
  endpoints _ =
    [ RPC.Endpoint ["tunnel", "announce"] RPC.Async
    , RPC.Endpoint ["tunnel", "connect"] RPC.Duplex
    , RPC.Endpoint ["tunnel", "endpoints"] RPC.Async
    , RPC.Endpoint ["tunnel", "leave"] RPC.Async
    , RPC.Endpoint ["tunnel", "isRoom"] RPC.Async
    , RPC.Endpoint ["tunnel", "ping"] RPC.Async
    ]

    -- The announce and leave endpoints are defined by the server's JS, but
    -- never called by the client.  The official (NodeJS) project uses
    -- disconnect notifications from the SSB-server.
    --
  serve h (RPC.Endpoint ["tunnel", "announce"] _) args stream =
    registerPeer h stream

  -- should decode request
  serve h (RPC.Endpoint ["tunnel", "connect"] RPC.Duplex) args stream = do
    let args' =
          decodeJSON (toS $ Aeson.encode args) :: Either
              Text
              [ConnectRequest]
    case args' of
      Left  err       -> return $ Left err
      Right [connReq] -> connect h stream connReq
      otherwise       -> return $ Left "bad target argument"

  serve h (RPC.Endpoint ["tunnel", "endpoints"] _) _ stream = do
    err <- registerPeer h stream
    case err of
      Left  msg -> return $ Left msg
      Right _   -> do
        change <- atomically $ dupTChan $ notifyChange h
        while $ do
          endpoints' <- getEndpoints h
          let resp = filter (/= RPC.peer stream) endpoints'
          res <- RPC.writeStreamJSON stream resp
          if isLeft res then return False else atomically $ readTChan change
        return $ Right ()
   where
    while f = do
      continue <- f
      if continue then (while f) else return False

  serve room (RPC.Endpoint ["tunnel", "leave"] _) _ stream =
    leave' room (RPC.peer stream)

  serve room (RPC.Endpoint ["tunnel", "isRoom"] _) _ stream =
    RPC.writeStreamJSON stream (newIsRoomResponse room)

  serve room (RPC.Endpoint ["tunnel", "ping"] _) _ stream = do
    resp <- getCurrentTime
    RPC.writeStreamJSON stream resp

  serve room endpoint@otherwise arg stream = (RPC.notFoundHandlerFunc endpoint) arg stream

  notifyConnect _ _ = return . return $ ()

  notifyDisconnect room peer = do
    _ <- leave' room peer
    return . return $ ()