diff --git a/notifications/query.go b/notifications/query.go index 29a0bc0fb2715bf361af87342d63068ac9416f2a..6a9d08da3206484f733115c1a5f6a353693d6b0b 100644 --- a/notifications/query.go +++ b/notifications/query.go @@ -18,6 +18,8 @@ const ( QueryError Provider Value + AddingPeer + DialingPeer ) type QueryEvent struct { diff --git a/routing/dht/query.go b/routing/dht/query.go index 5318897ee035c1fb1169dc906740fbe4825777a3..8afaaa7e50f09ea0293d852dfada6ce7bc2f376a 100644 --- a/routing/dht/query.go +++ b/routing/dht/query.go @@ -79,6 +79,8 @@ type dhtQueryRunner struct { rateLimit chan struct{} // processing semaphore log logging.EventLogger + runCtx context.Context + proc process.Process sync.RWMutex } @@ -98,6 +100,7 @@ func newQueryRunner(q *dhtQuery) *dhtQueryRunner { func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) { r.log = log + r.runCtx = ctx if len(peers) == 0 { log.Warning("Running query with no peers!") @@ -167,6 +170,11 @@ func (r *dhtQueryRunner) addPeerToQuery(next peer.ID) { return } + notif.PublishQueryEvent(r.runCtx, ¬if.QueryEvent{ + Type: notif.AddingPeer, + ID: next, + }) + r.peersRemaining.Increment(1) select { case r.peersToQuery.EnqChan <- next: @@ -221,7 +229,12 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) { // make sure we're connected to the peer. // FIXME abstract away into the network layer if conns := r.query.dht.host.Network().ConnsToPeer(p); len(conns) == 0 { - log.Infof("not connected. dialing.") + log.Error("not connected. dialing.") + + notif.PublishQueryEvent(r.runCtx, ¬if.QueryEvent{ + Type: notif.DialingPeer, + ID: p, + }) // while we dial, we do not take up a rate limit. this is to allow // forward progress during potentially very high latency dials. r.rateLimit <- struct{}{} @@ -231,9 +244,10 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) { if err := r.query.dht.host.Connect(ctx, pi); err != nil { log.Debugf("Error connecting: %s", err) - notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ + notif.PublishQueryEvent(r.runCtx, ¬if.QueryEvent{ Type: notif.QueryError, Extra: err.Error(), + ID: p, }) r.Lock()