Skip to content
Snippets Groups Projects
Commit c3280ce8 authored by Jeromy Johnson's avatar Jeromy Johnson
Browse files

use new methods from goprocess/context, remove thirdparty/waitable


License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>
parent d5cfb01f
Branches
No related tags found
1 merge request!1New
...@@ -199,7 +199,7 @@ ...@@ -199,7 +199,7 @@
}, },
{ {
"ImportPath": "github.com/jbenet/goprocess", "ImportPath": "github.com/jbenet/goprocess",
"Rev": "4562d0c5780b8f060df2b84a8945bb8678bfc023" "Rev": "64a8220330a485070813201cc05b0c6777f6a516"
}, },
{ {
"ImportPath": "github.com/kardianos/osext", "ImportPath": "github.com/kardianos/osext",
......
package goprocessctx
import (
"errors"
"time"
goprocess "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)
const (
closing = iota
closed
)
type procContext struct {
done <-chan struct{}
which int
}
// OnClosingContext derives a context from a given goprocess that will
// be 'Done' when the process is closing
func OnClosingContext(p goprocess.Process) context.Context {
return &procContext{
done: p.Closing(),
which: closing,
}
}
// OnClosedContext derives a context from a given goprocess that will
// be 'Done' when the process is closed
func OnClosedContext(p goprocess.Process) context.Context {
return &procContext{
done: p.Closed(),
which: closed,
}
}
func (c *procContext) Done() <-chan struct{} {
return c.done
}
func (c *procContext) Deadline() (time.Time, bool) {
return time.Time{}, false
}
func (c *procContext) Err() error {
if c.which == closing {
return errors.New("process closing")
} else if c.which == closed {
return errors.New("process closed")
} else {
panic("unrecognized process context type")
}
}
func (c *procContext) Value(key interface{}) interface{} {
return nil
}
...@@ -139,6 +139,10 @@ type Process interface { ...@@ -139,6 +139,10 @@ type Process interface {
// _after_ Close has completed; teardown has finished. The primary use case // _after_ Close has completed; teardown has finished. The primary use case
// of Closed is waiting for a Process to Close without _causing_ the Close. // of Closed is waiting for a Process to Close without _causing_ the Close.
Closed() <-chan struct{} Closed() <-chan struct{}
// Err waits until the process is closed, and then returns any error that
// occurred during shutdown.
Err() error
} }
// TeardownFunc is a function used to cleanup state at the end of the // TeardownFunc is a function used to cleanup state at the end of the
......
...@@ -163,6 +163,11 @@ func (p *process) Closed() <-chan struct{} { ...@@ -163,6 +163,11 @@ func (p *process) Closed() <-chan struct{} {
return p.closed return p.closed
} }
func (p *process) Err() error {
<-p.Closed()
return p.closeErr
}
// the _actual_ close process. // the _actual_ close process.
func (p *process) doClose() { func (p *process) doClose() {
// this function is only be called once (protected by p.Lock()). // this function is only be called once (protected by p.Lock()).
......
...@@ -7,11 +7,11 @@ import ( ...@@ -7,11 +7,11 @@ import (
"time" "time"
process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
procctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
ratelimit "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit" ratelimit "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit"
blocks "github.com/ipfs/go-ipfs/blocks" blocks "github.com/ipfs/go-ipfs/blocks"
key "github.com/ipfs/go-ipfs/blocks/key" key "github.com/ipfs/go-ipfs/blocks/key"
exchange "github.com/ipfs/go-ipfs/exchange" exchange "github.com/ipfs/go-ipfs/exchange"
waitable "github.com/ipfs/go-ipfs/thirdparty/waitable"
util "github.com/ipfs/go-ipfs/util" util "github.com/ipfs/go-ipfs/util"
) )
...@@ -121,7 +121,7 @@ func (w *Worker) start(c Config) { ...@@ -121,7 +121,7 @@ func (w *Worker) start(c Config) {
// reads from |workerChan| until w.process closes // reads from |workerChan| until w.process closes
limiter := ratelimit.NewRateLimiter(w.process, c.NumWorkers) limiter := ratelimit.NewRateLimiter(w.process, c.NumWorkers)
limiter.Go(func(proc process.Process) { limiter.Go(func(proc process.Process) {
ctx := waitable.Context(proc) // shut down in-progress HasBlock when time to die ctx := procctx.OnClosingContext(proc) // shut down in-progress HasBlock when time to die
for { for {
select { select {
case <-proc.Closing(): case <-proc.Closing():
......
...@@ -79,7 +79,7 @@ func Bootstrap(n *IpfsNode, cfg BootstrapConfig) (io.Closer, error) { ...@@ -79,7 +79,7 @@ func Bootstrap(n *IpfsNode, cfg BootstrapConfig) (io.Closer, error) {
// the periodic bootstrap function -- the connection supervisor // the periodic bootstrap function -- the connection supervisor
periodic := func(worker goprocess.Process) { periodic := func(worker goprocess.Process) {
ctx := procctx.WithProcessClosing(context.Background(), worker) ctx := procctx.OnClosingContext(worker)
defer log.EventBegin(ctx, "periodicBootstrap", n.Identity).Done() defer log.EventBegin(ctx, "periodicBootstrap", n.Identity).Done()
if err := bootstrapRound(ctx, n.PeerHost, cfg); err != nil { if err := bootstrapRound(ctx, n.PeerHost, cfg); err != nil {
...@@ -96,7 +96,7 @@ func Bootstrap(n *IpfsNode, cfg BootstrapConfig) (io.Closer, error) { ...@@ -96,7 +96,7 @@ func Bootstrap(n *IpfsNode, cfg BootstrapConfig) (io.Closer, error) {
// kick off Routing.Bootstrap // kick off Routing.Bootstrap
if n.Routing != nil { if n.Routing != nil {
ctx := procctx.WithProcessClosing(context.Background(), proc) ctx := procctx.OnClosingContext(proc)
if err := n.Routing.Bootstrap(ctx); err != nil { if err := n.Routing.Bootstrap(ctx); err != nil {
proc.Close() proc.Close()
return nil, err return nil, err
......
...@@ -4,9 +4,9 @@ import ( ...@@ -4,9 +4,9 @@ import (
"time" "time"
process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
procctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
ratelimit "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit" ratelimit "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
waitable "github.com/ipfs/go-ipfs/thirdparty/waitable"
key "github.com/ipfs/go-ipfs/blocks/key" key "github.com/ipfs/go-ipfs/blocks/key"
eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog" eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
...@@ -80,7 +80,7 @@ func (bs *Bitswap) provideWorker(px process.Process) { ...@@ -80,7 +80,7 @@ func (bs *Bitswap) provideWorker(px process.Process) {
ev := eventlog.LoggableMap{"ID": wid} ev := eventlog.LoggableMap{"ID": wid}
limiter.LimitedGo(func(px process.Process) { limiter.LimitedGo(func(px process.Process) {
ctx := waitable.Context(px) // derive ctx from px ctx := procctx.OnClosingContext(px) // derive ctx from px
defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, &k).Done() defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, &k).Done()
ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx
...@@ -97,7 +97,7 @@ func (bs *Bitswap) provideWorker(px process.Process) { ...@@ -97,7 +97,7 @@ func (bs *Bitswap) provideWorker(px process.Process) {
limiter.Go(func(px process.Process) { limiter.Go(func(px process.Process) {
for wid := 2; ; wid++ { for wid := 2; ; wid++ {
ev := eventlog.LoggableMap{"ID": 1} ev := eventlog.LoggableMap{"ID": 1}
log.Event(waitable.Context(px), "Bitswap.ProvideWorker.Loop", ev) log.Event(procctx.OnClosingContext(px), "Bitswap.ProvideWorker.Loop", ev)
select { select {
case <-px.Closing(): case <-px.Closing():
......
...@@ -51,7 +51,7 @@ func TestGetFailures(t *testing.T) { ...@@ -51,7 +51,7 @@ func TestGetFailures(t *testing.T) {
err = merr[0] err = merr[0]
} }
if err != context.DeadlineExceeded && err != context.Canceled { if err.Error() != "process closing" {
t.Fatal("Got different error than we expected", err) t.Fatal("Got different error than we expected", err)
} }
} else { } else {
......
...@@ -85,7 +85,7 @@ type dhtQueryRunner struct { ...@@ -85,7 +85,7 @@ type dhtQueryRunner struct {
func newQueryRunner(q *dhtQuery) *dhtQueryRunner { func newQueryRunner(q *dhtQuery) *dhtQueryRunner {
proc := process.WithParent(process.Background()) proc := process.WithParent(process.Background())
ctx := ctxproc.WithProcessClosing(context.Background(), proc) ctx := ctxproc.OnClosingContext(proc)
return &dhtQueryRunner{ return &dhtQueryRunner{
query: q, query: q,
peersToQuery: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(q.key)), peersToQuery: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(q.key)),
...@@ -210,7 +210,7 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) { ...@@ -210,7 +210,7 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {
// ok let's do this! // ok let's do this!
// create a context from our proc. // create a context from our proc.
ctx := ctxproc.WithProcessClosing(context.Background(), proc) ctx := ctxproc.OnClosingContext(proc)
// make sure we do this when we exit // make sure we do this when we exit
defer func() { defer func() {
......
package waitable
import (
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)
type Waitable interface {
Closing() <-chan struct{}
}
// Context returns a context that cancels when the waitable is closing.
func Context(w Waitable) context.Context {
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-w.Closing()
cancel()
}()
return ctx
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment