aboutsummaryrefslogtreecommitdiff
path: root/cmd
diff options
context:
space:
mode:
authorDevon Hudson <devonhudson@librem.one>2023-01-31 16:10:48 -0700
committerDevon Hudson <devonhudson@librem.one>2023-02-01 13:41:37 -0700
commit529feb07eec0b52eee66bc1f4ce6985b83485e73 (patch)
tree196565c2024f045e670f34e443e0cf00f42353f1 /cmd
parentbe43b9c0eae5fd45e38f954c23cfadbdfa77cb32 (diff)
Refactor conduit type from pinecone demo into its own package
Diffstat (limited to 'cmd')
-rw-r--r--cmd/dendrite-demo-pinecone/conduit/conduit.go84
-rw-r--r--cmd/dendrite-demo-pinecone/conduit/conduit_test.go121
2 files changed, 205 insertions, 0 deletions
diff --git a/cmd/dendrite-demo-pinecone/conduit/conduit.go b/cmd/dendrite-demo-pinecone/conduit/conduit.go
new file mode 100644
index 00000000..be139c19
--- /dev/null
+++ b/cmd/dendrite-demo-pinecone/conduit/conduit.go
@@ -0,0 +1,84 @@
+// Copyright 2023 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package conduit
+
+import (
+ "io"
+ "net"
+ "sync"
+
+ "github.com/matrix-org/pinecone/types"
+ "go.uber.org/atomic"
+)
+
+type Conduit struct {
+ closed atomic.Bool
+ conn net.Conn
+ portMutex sync.Mutex
+ port types.SwitchPortID
+}
+
+func NewConduit(conn net.Conn, port int) Conduit {
+ return Conduit{
+ conn: conn,
+ port: types.SwitchPortID(port),
+ }
+}
+
+func (c *Conduit) Port() int {
+ c.portMutex.Lock()
+ defer c.portMutex.Unlock()
+ return int(c.port)
+}
+
+func (c *Conduit) SetPort(port types.SwitchPortID) {
+ c.portMutex.Lock()
+ defer c.portMutex.Unlock()
+ c.port = port
+}
+
+func (c *Conduit) Read(b []byte) (int, error) {
+ if c.closed.Load() {
+ return 0, io.EOF
+ }
+ return c.conn.Read(b)
+}
+
+func (c *Conduit) ReadCopy() ([]byte, error) {
+ if c.closed.Load() {
+ return nil, io.EOF
+ }
+ var buf [65535 * 2]byte
+ n, err := c.conn.Read(buf[:])
+ if err != nil {
+ return nil, err
+ }
+ return buf[:n], nil
+}
+
+func (c *Conduit) Write(b []byte) (int, error) {
+ if c.closed.Load() {
+ return 0, io.EOF
+ }
+ return c.conn.Write(b)
+}
+
+func (c *Conduit) Close() error {
+ if c.closed.Load() {
+ return io.ErrClosedPipe
+ }
+ c.closed.Store(true)
+ return c.conn.Close()
+}
diff --git a/cmd/dendrite-demo-pinecone/conduit/conduit_test.go b/cmd/dendrite-demo-pinecone/conduit/conduit_test.go
new file mode 100644
index 00000000..d8cd3133
--- /dev/null
+++ b/cmd/dendrite-demo-pinecone/conduit/conduit_test.go
@@ -0,0 +1,121 @@
+// Copyright 2023 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package conduit
+
+import (
+ "fmt"
+ "net"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+var TestBuf = []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
+
+type TestNetConn struct {
+ net.Conn
+ shouldFail bool
+}
+
+func (t *TestNetConn) Read(b []byte) (int, error) {
+ if t.shouldFail {
+ return 0, fmt.Errorf("Failed")
+ } else {
+ n := copy(b, TestBuf)
+ return n, nil
+ }
+}
+
+func (t *TestNetConn) Write(b []byte) (int, error) {
+ if t.shouldFail {
+ return 0, fmt.Errorf("Failed")
+ } else {
+ return len(b), nil
+ }
+}
+
+func (t *TestNetConn) Close() error {
+ if t.shouldFail {
+ return fmt.Errorf("Failed")
+ } else {
+ return nil
+ }
+}
+
+func TestConduitStoresPort(t *testing.T) {
+ conduit := Conduit{port: 7}
+ assert.Equal(t, 7, conduit.Port())
+}
+
+func TestConduitRead(t *testing.T) {
+ conduit := Conduit{conn: &TestNetConn{}}
+ b := make([]byte, len(TestBuf))
+ bytes, err := conduit.Read(b)
+ assert.NoError(t, err)
+ assert.Equal(t, len(TestBuf), bytes)
+ assert.Equal(t, TestBuf, b)
+}
+
+func TestConduitReadCopy(t *testing.T) {
+ conduit := Conduit{conn: &TestNetConn{}}
+ result, err := conduit.ReadCopy()
+ assert.NoError(t, err)
+ assert.Equal(t, TestBuf, result)
+}
+
+func TestConduitWrite(t *testing.T) {
+ conduit := Conduit{conn: &TestNetConn{}}
+ bytes, err := conduit.Write(TestBuf)
+ assert.NoError(t, err)
+ assert.Equal(t, len(TestBuf), bytes)
+}
+
+func TestConduitClose(t *testing.T) {
+ conduit := Conduit{conn: &TestNetConn{}}
+ err := conduit.Close()
+ assert.NoError(t, err)
+ assert.True(t, conduit.closed.Load())
+}
+
+func TestConduitReadClosed(t *testing.T) {
+ conduit := Conduit{conn: &TestNetConn{}}
+ err := conduit.Close()
+ assert.NoError(t, err)
+ b := make([]byte, len(TestBuf))
+ _, err = conduit.Read(b)
+ assert.Error(t, err)
+}
+
+func TestConduitReadCopyClosed(t *testing.T) {
+ conduit := Conduit{conn: &TestNetConn{}}
+ err := conduit.Close()
+ assert.NoError(t, err)
+ _, err = conduit.ReadCopy()
+ assert.Error(t, err)
+}
+
+func TestConduitWriteClosed(t *testing.T) {
+ conduit := Conduit{conn: &TestNetConn{}}
+ err := conduit.Close()
+ assert.NoError(t, err)
+ _, err = conduit.Write(TestBuf)
+ assert.Error(t, err)
+}
+
+func TestConduitReadCopyFails(t *testing.T) {
+ conduit := Conduit{conn: &TestNetConn{shouldFail: true}}
+ _, err := conduit.ReadCopy()
+ assert.Error(t, err)
+}