Skip to content
Snippets Groups Projects
Commit 2c4eb609 authored by Jeromy Johnson's avatar Jeromy Johnson
Browse files

allow multistream to have zero rtt stream opening


License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>
parent 859de514
No related branches found
No related tags found
1 merge request!1New
......@@ -344,7 +344,7 @@
},
{
"ImportPath": "github.com/whyrusleeping/go-multistream",
"Rev": "c9eea2e3be705b7cfd730351b510cfa12ca038f4"
"Rev": "30c7a81b6c568654147bf6e106870c5d64ccebc8"
},
{
"ImportPath": "github.com/whyrusleeping/multiaddr-filter",
......
package multistream
import (
"fmt"
"io"
"sync"
)
func NewLazyHandshakeConn(c io.ReadWriteCloser, proto string) io.ReadWriteCloser {
return &lazyConn{
proto: proto,
con: c,
}
}
type lazyConn struct {
rhandshake bool // only accessed by 'Read' should not call read async
rhlock sync.Mutex
rhsync bool //protected by mutex
rerr error
whandshake bool
whlock sync.Mutex
whsync bool
werr error
proto string
con io.ReadWriteCloser
}
func (l *lazyConn) Read(b []byte) (int, error) {
if !l.rhandshake {
go l.writeHandshake()
err := l.readHandshake()
if err != nil {
return 0, err
}
l.rhandshake = true
}
if len(b) == 0 {
return 0, nil
}
return l.con.Read(b)
}
func (l *lazyConn) readHandshake() error {
l.rhlock.Lock()
defer l.rhlock.Unlock()
// if we've already done this, exit
if l.rhsync {
return l.rerr
}
l.rhsync = true
// read multistream version
tok, err := ReadNextToken(l.con)
if err != nil {
l.rerr = err
return err
}
if tok != ProtocolID {
l.rerr = fmt.Errorf("multistream protocol mismatch ( %s != %s )", tok, ProtocolID)
return l.rerr
}
// read protocol
tok, err = ReadNextToken(l.con)
if err != nil {
l.rerr = err
return err
}
if tok != l.proto {
l.rerr = fmt.Errorf("protocol mismatch in lazy handshake ( %s != %s )", tok, l.proto)
return l.rerr
}
return nil
}
func (l *lazyConn) writeHandshake() error {
l.whlock.Lock()
defer l.whlock.Unlock()
if l.whsync {
return l.werr
}
l.whsync = true
err := delimWrite(l.con, []byte(ProtocolID))
if err != nil {
l.werr = err
return err
}
err = delimWrite(l.con, []byte(l.proto))
if err != nil {
l.werr = err
return err
}
return nil
}
func (l *lazyConn) Write(b []byte) (int, error) {
if !l.whandshake {
go l.readHandshake()
err := l.writeHandshake()
if err != nil {
return 0, err
}
l.whandshake = true
}
return l.con.Write(b)
}
func (l *lazyConn) Close() error {
return l.con.Close()
}
......@@ -100,17 +100,7 @@ loop:
switch tok {
case "ls":
buf := new(bytes.Buffer)
msm.handlerlock.Lock()
for proto, _ := range msm.handlers {
err := delimWrite(buf, []byte(proto))
if err != nil {
msm.handlerlock.Unlock()
return "", nil, err
}
}
msm.handlerlock.Unlock()
err := delimWrite(rwc, buf.Bytes())
err := msm.Ls(rwc)
if err != nil {
return "", nil, err
}
......@@ -138,6 +128,24 @@ loop:
}
func (msm *MultistreamMuxer) Ls(rwc io.Writer) error {
buf := new(bytes.Buffer)
msm.handlerlock.Lock()
for proto, _ := range msm.handlers {
err := delimWrite(buf, []byte(proto))
if err != nil {
msm.handlerlock.Unlock()
return err
}
}
msm.handlerlock.Unlock()
err := delimWrite(rwc, buf.Bytes())
if err != nil {
return err
}
return nil
}
func (msm *MultistreamMuxer) Handle(rwc io.ReadWriteCloser) error {
_, h, err := msm.Negotiate(rwc)
if err != nil {
......
......@@ -118,6 +118,112 @@ func TestSelectOneAndWrite(t *testing.T) {
verifyPipe(t, a, b)
}
func TestLazyConns(t *testing.T) {
a, b := net.Pipe()
mux := NewMultistreamMuxer()
mux.AddHandler("/a", nil)
mux.AddHandler("/b", nil)
mux.AddHandler("/c", nil)
la := NewLazyHandshakeConn(a, "/c")
lb := NewLazyHandshakeConn(b, "/c")
verifyPipe(t, la, lb)
}
func TestLazyAndMux(t *testing.T) {
a, b := net.Pipe()
mux := NewMultistreamMuxer()
mux.AddHandler("/a", nil)
mux.AddHandler("/b", nil)
mux.AddHandler("/c", nil)
done := make(chan struct{})
go func() {
selected, _, err := mux.Negotiate(a)
if err != nil {
t.Fatal(err)
}
if selected != "/c" {
t.Fatal("incorrect protocol selected")
}
msg := make([]byte, 5)
_, err = a.Read(msg)
if err != nil {
t.Fatal(err)
}
close(done)
}()
lb := NewLazyHandshakeConn(b, "/c")
// do a write to push the handshake through
_, err := lb.Write([]byte("hello"))
if err != nil {
t.Fatal(err)
}
select {
case <-time.After(time.Second):
t.Fatal("failed to complete in time")
case <-done:
}
verifyPipe(t, a, lb)
}
func TestLazyAndMuxWrite(t *testing.T) {
a, b := net.Pipe()
mux := NewMultistreamMuxer()
mux.AddHandler("/a", nil)
mux.AddHandler("/b", nil)
mux.AddHandler("/c", nil)
done := make(chan struct{})
go func() {
selected, _, err := mux.Negotiate(a)
if err != nil {
t.Fatal(err)
}
if selected != "/c" {
t.Fatal("incorrect protocol selected")
}
_, err = a.Write([]byte("hello"))
if err != nil {
t.Fatal(err)
}
close(done)
}()
lb := NewLazyHandshakeConn(b, "/c")
// do a write to push the handshake through
msg := make([]byte, 5)
_, err := lb.Read(msg)
if err != nil {
t.Fatal(err)
}
if string(msg) != "hello" {
t.Fatal("wrong!")
}
select {
case <-time.After(time.Second):
t.Fatal("failed to complete in time")
case <-done:
}
verifyPipe(t, a, lb)
}
func verifyPipe(t *testing.T, a, b io.ReadWriter) {
mes := make([]byte, 1024)
rand.Read(mes)
......
......@@ -170,12 +170,11 @@ func (h *BasicHost) NewStream(pid protocol.ID, p peer.ID) (inet.Stream, error) {
logStream := mstream.WrapStream(s, pid, h.bwc)
if err := msmux.SelectProtoOrFail(string(pid), logStream); err != nil {
logStream.Close()
return nil, err
}
return logStream, nil
lzcon := msmux.NewLazyHandshakeConn(logStream, string(pid))
return &streamWrapper{
Stream: logStream,
rw: lzcon,
}, nil
}
// Connect ensures there is a connection between this host and the peer with
......@@ -254,3 +253,16 @@ func (h *BasicHost) Close() error {
func (h *BasicHost) GetBandwidthReporter() metrics.Reporter {
return h.bwc
}
type streamWrapper struct {
inet.Stream
rw io.ReadWriter
}
func (s *streamWrapper) Read(b []byte) (int, error) {
return s.rw.Read(b)
}
func (s *streamWrapper) Write(b []byte) (int, error) {
return s.rw.Write(b)
}
......@@ -299,6 +299,12 @@ func TestStBackpressureStreamWrite(t *testing.T) {
}
}
// trigger lazy connection handshaking
_, err = s.Read(nil)
if err != nil {
t.Fatal(err)
}
// 500ms rounds of lockstep write + drain
roundsStart := time.Now()
roundsTotal := 0
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment