Skip to content
Snippets Groups Projects
Commit 68c0f80e authored by Juan Batiz-Benet's avatar Juan Batiz-Benet
Browse files

go-peerstream update


License: MIT
Signed-off-by: default avatarJuan Batiz-Benet <juan@benet.ai>
parent 73d3fa5c
Branches
No related tags found
1 merge request!1New
Showing
with 259 additions and 49 deletions
......@@ -177,7 +177,7 @@
},
{
"ImportPath": "github.com/jbenet/go-peerstream",
"Rev": "8d52ed2801410a2af995b4e87660272d11c8a9a4"
"Rev": "62fe5ede12f9d9cd9406750160122525b3d6b694"
},
{
"ImportPath": "github.com/jbenet/go-random",
......
......@@ -7,4 +7,5 @@ go:
- tip
script:
- go test -race -cpu=5 ./...
- go test ./...
# - go test -race -cpu=5 ./...
{
"ImportPath": "github.com/jbenet/go-peerstream",
"GoVersion": "go1.4.2",
"Packages": [
"./..."
],
"Deps": [
{
"ImportPath": "github.com/docker/spdystream",
"Rev": "b2c3287865f3ad6aa22821ddb7b4692b896ac207"
},
{
"ImportPath": "github.com/hashicorp/yamux",
"Rev": "b2e55852ddaf823a85c67f798080eb7d08acd71d"
},
{
"ImportPath": "github.com/inconshreveable/muxado",
"Rev": "f693c7e88ba316d1a0ae3e205e22a01aa3ec2848"
},
{
"ImportPath": "github.com/jbenet/go-temp-err-catcher",
"Rev": "aac704a3f4f27190b4ccc05f303a4931fd1241ff"
},
{
"ImportPath": "github.com/whyrusleeping/go-multiplex",
"Rev": "ce5baa716247510379cb7640a14da857afd3b622"
},
{
"ImportPath": "github.com/whyrusleeping/go-multistream",
"Rev": "08e8f9c9f5665ed0c63ffde4fa5ef1d5fb3d516d"
}
]
}
This directory tree is generated automatically by godep.
Please do not edit.
See https://github.com/tools/godep for more information.
godep:
go get github.com/tools/godep
vendor: godep
godep save -r ./...
build:
go build ./...
test:
go test ./...
test_race:
go test -race -cpu 5 ./...
......@@ -158,23 +158,45 @@ func ConnInConns(c1 *Conn, conns []*Conn) bool {
// addConn is the internal version of AddConn. we need the server bool
// as spdystream requires it.
func (s *Swarm) addConn(netConn net.Conn, isServer bool) (*Conn, error) {
c, err := s.setupConn(netConn, isServer)
if err != nil {
return nil, err
}
s.ConnHandler()(c)
// go listen for incoming streams on this connection
go c.pstConn.Serve(func(ss pst.Stream) {
// log.Printf("accepted stream %d from %s\n", ssS.Identifier(), netConn.RemoteAddr())
stream := s.setupStream(ss, c)
s.StreamHandler()(stream) // call our handler
})
s.notifyAll(func(n Notifiee) {
n.Connected(c)
})
return c, nil
}
// setupConn adds the relevant connection to the map, first checking if it
// was already there.
func (s *Swarm) setupConn(netConn net.Conn, isServer bool) (*Conn, error) {
if netConn == nil {
return nil, errors.New("nil conn")
}
// this function is so we can defer our lock, which needs to be
// unlocked **before** the Handler is called (which needs to be
// sequential). This was the simplest thing :)
setupConn := func() (*Conn, error) {
// first, check if we already have it, to avoid constructing it
// if it is already there
s.connLock.Lock()
defer s.connLock.Unlock()
// first, check if we already have it...
for c := range s.conns {
if c.netConn == netConn {
s.connLock.Unlock()
return c, nil
}
}
s.connLock.Unlock()
// construct the connection without hanging onto the lock
// (as there could be deadlock if so.)
// create a new spdystream connection
ssConn, err := s.transport.NewConn(netConn, isServer)
......@@ -182,29 +204,20 @@ func (s *Swarm) addConn(netConn net.Conn, isServer bool) (*Conn, error) {
return nil, err
}
// add the connection
c := newConn(netConn, ssConn, s)
s.conns[c] = struct{}{}
// take the lock to add it to the map.
s.connLock.Lock()
defer s.connLock.Unlock()
// check for it again as it may have been added already. (TOCTTOU)
for c := range s.conns {
if c.netConn == netConn {
return c, nil
}
c, err := setupConn()
if err != nil {
return nil, err
}
s.ConnHandler()(c)
// go listen for incoming streams on this connection
go c.pstConn.Serve(func(ss pst.Stream) {
// log.Printf("accepted stream %d from %s\n", ssS.Identifier(), netConn.RemoteAddr())
stream := s.setupStream(ss, c)
s.StreamHandler()(stream) // call our handler
})
s.notifyAll(func(n Notifiee) {
n.Connected(c)
})
// add the connection
c := newConn(netConn, ssConn, s)
s.conns[c] = struct{}{}
return c, nil
}
......
......@@ -4,12 +4,13 @@ import (
"errors"
"fmt"
"net"
"sync"
tec "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-temp-err-catcher"
)
// AcceptConcurrency is how many connections can simultaneously be
// in process of being accepted. Handshakes can sometimes occurr as
// in process of being accepted. Handshakes can sometimes occur as
// part of this process, so it may take some time. It is imporant to
// rate limit lest a malicious influx of connections would cause our
// node to consume all its resources accepting new connections.
......@@ -73,7 +74,11 @@ func ListenersWithGroup(g Group, ls []*Listener) []*Listener {
// run in a goroutine.
// TODO: add rate limiting
func (l *Listener) accept() {
defer l.teardown()
var wg sync.WaitGroup
defer func() {
wg.Wait() // must happen before teardown
l.teardown()
}()
// catching the error here is odd. doing what net/http does:
// http://golang.org/src/net/http/server.go?s=51504:51550#L1728
......@@ -98,12 +103,15 @@ func (l *Listener) accept() {
// do this in a goroutine to avoid blocking the Accept loop.
// note that this does not rate limit accepts.
limit <- struct{}{} // sema down
wg.Add(1)
go func(conn net.Conn) {
defer func() { <-limit }() // sema up
defer wg.Done()
conn2, err := l.swarm.addConn(conn, true)
if err != nil {
l.acceptErr <- err
return
}
conn2.groups.AddSet(&l.groups) // add out groups
}(conn)
......
package peerstream_multiplex
import (
"net"
pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport"
mp "github.com/whyrusleeping/go-multiplex"
)
type conn struct {
*mp.Multiplex
}
func ( // Conn is a connection to a remote peer.
c *conn) Close() error {
return c.Multiplex.Close()
}
func (c *conn) IsClosed() bool {
return c.Multiplex.IsClosed()
}
// OpenStream creates a new stream.
func (c *conn) OpenStream() (pst.Stream, error) {
return c.Multiplex.NewStream(), nil
}
// Serve starts listening for incoming requests and handles them
// using given StreamHandler
func (c *conn) Serve(handler pst.StreamHandler) {
c.Multiplex.Serve(func(s *mp.Stream) {
handler(s)
})
}
// Transport is a go-peerstream transport that constructs
// multiplex-backed connections.
type Transport struct{}
// DefaultTransport has default settings for multiplex
var DefaultTransport = &Transport{}
func (t *Transport) NewConn(nc net.Conn, isServer bool) (pst.Conn, error) {
return &conn{mp.NewMultiplex(nc, isServer)}, nil
}
package peerstream_multiplex
import (
"testing"
psttest "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test"
)
func TestMultiplexTransport(t *testing.T) {
psttest.SubtestAll(t, DefaultTransport)
}
// package multistream implements a peerstream transport using
// go-multistream to select the underlying stream muxer
package multistream
import (
"net"
pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport"
multiplex "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multiplex"
spdy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/spdystream"
yamux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/yamux"
mss "github.com/whyrusleeping/go-multistream"
)
type transport struct {
mux *mss.MultistreamMuxer
tpts map[string]pst.Transport
}
func NewTransport() pst.Transport {
mux := mss.NewMultistreamMuxer()
mux.AddHandler("/multiplex", nil)
mux.AddHandler("/spdystream", nil)
mux.AddHandler("/yamux", nil)
tpts := map[string]pst.Transport{
"/multiplex": multiplex.DefaultTransport,
"/spdystream": spdy.Transport,
"/yamux": yamux.DefaultTransport,
}
return &transport{
mux: mux,
tpts: tpts,
}
}
func (t *transport) NewConn(nc net.Conn, isServer bool) (pst.Conn, error) {
var proto string
if isServer {
selected, _, err := t.mux.Negotiate(nc)
if err != nil {
return nil, err
}
proto = selected
} else {
// prefer yamux
selected, err := mss.SelectOneOf([]string{"/yamux", "/spdystream", "/multiplex"}, nc)
if err != nil {
return nil, err
}
proto = selected
}
tpt := t.tpts[proto]
return tpt.NewConn(nc, isServer)
}
package multistream
import (
"testing"
psttest "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test"
)
func TestMultiStreamTransport(t *testing.T) {
psttest.SubtestAll(t, NewTransport())
}
......@@ -4,8 +4,8 @@ import (
"net"
"net/http"
ss "github.com/docker/spdystream"
pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport"
ss "github.com/jbenet/spdystream"
)
// stream implements pst.Stream using a ss.Stream
......@@ -55,6 +55,8 @@ func (c *conn) IsClosed() bool {
select {
case <-c.closed:
return true
case <-c.sc.CloseChan():
return true
default:
return false
}
......@@ -62,7 +64,10 @@ func (c *conn) IsClosed() bool {
// OpenStream creates a new stream.
func (c *conn) OpenStream() (pst.Stream, error) {
s, err := c.spdyConn().CreateStream(http.Header{}, nil, false)
s, err := c.spdyConn().CreateStream(http.Header{
":method": []string{"GET"}, // this is here for HTTP/SPDY interop
":path": []string{"/"}, // this is here for HTTP/SPDY interop
}, nil, false)
if err != nil {
return nil, err
}
......@@ -87,10 +92,14 @@ func (c *conn) Serve(handler pst.StreamHandler) {
// -- at this moment -- not the solution. Either spdystream must
// change, or we must throttle another way. go-peerstream handles
// every new stream in its own goroutine.
go func() {
s.SendReply(http.Header{}, false)
handler((*stream)(s))
}()
err := s.SendReply(http.Header{}, false)
if err != nil {
// this _could_ error out. not sure how to handle this failure.
// don't return, and let the caller handle a broken stream.
// better than _hiding_ an error.
// return
}
go handler((*stream)(s))
})
}
......
......@@ -7,6 +7,5 @@ import (
)
func TestSpdyStreamTransport(t *testing.T) {
t.Skip("spdystream is known to be broken")
psttest.SubtestAll(t, Transport)
}
......@@ -194,7 +194,7 @@ func SubtestSimpleWrite100msgs(t *testing.T, tr pst.Transport) {
bufs <- buf
log("writing %d bytes (message %d/%d #%x)", len(buf), i, msgs, buf[:3])
if _, err := stream.Write(buf); err != nil {
errs <- err
errs <- fmt.Errorf("stream.Write(buf): %s", err)
continue
}
}
......@@ -212,7 +212,7 @@ func SubtestSimpleWrite100msgs(t *testing.T, tr pst.Transport) {
i++
if _, err := io.ReadFull(stream, buf2); err != nil {
errs <- err
errs <- fmt.Errorf("readFull(stream, buf2): %s", err)
continue
}
if !bytes.Equal(buf1, buf2) {
......@@ -253,7 +253,7 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm,
bufs <- buf
log("%p writing %d bytes (message %d/%d #%x)", s, len(buf), i, nMsg, buf[:3])
if _, err := s.Write(buf); err != nil {
errs <- err
errs <- fmt.Errorf("s.Write(buf): %s", err)
continue
}
}
......@@ -265,11 +265,12 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm,
buf2 := make([]byte, msgsize)
i := 0
for buf1 := range bufs {
log("%p reading %d bytes (message %d/%d #%x)", s, len(buf1), i, nMsg, buf1[:3])
i++
log("%p reading %d bytes (message %d/%d #%x)", s, len(buf1), i-1, nMsg, buf1[:3])
if _, err := io.ReadFull(s, buf2); err != nil {
errs <- err
errs <- fmt.Errorf("io.ReadFull(s, buf2): %s", err)
log("%p failed to read %d bytes (message %d/%d #%x)", s, len(buf1), i-1, nMsg, buf1[:3])
continue
}
if !bytes.Equal(buf1, buf2) {
......@@ -307,13 +308,13 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm,
nc, err := net.Dial(nla.Network(), nla.String())
if err != nil {
errs <- err
errs <- fmt.Errorf("net.Dial(%s, %s): %s", nla.Network(), nla.String(), err)
return
}
c, err := a.AddConn(nc)
if err != nil {
errs <- err
errs <- fmt.Errorf("a.AddConn(%s <--> %s): %s", nc.LocalAddr(), nc.RemoteAddr(), err)
return
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment