Add in some more notifications to help profile queries

License: MIT
Signed-off-by: Jeromy <jeromyj@gmail.com>
This commit is contained in:
Jeromy 2015-11-04 21:49:20 -08:00
parent 675ed09ad8
commit 39d1e2146d

View File

@ -79,6 +79,8 @@ type dhtQueryRunner struct {
rateLimit chan struct{} // processing semaphore rateLimit chan struct{} // processing semaphore
log logging.EventLogger log logging.EventLogger
runCtx context.Context
proc process.Process proc process.Process
sync.RWMutex sync.RWMutex
} }
@ -98,6 +100,7 @@ func newQueryRunner(q *dhtQuery) *dhtQueryRunner {
func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) { func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) {
r.log = log r.log = log
r.runCtx = ctx
if len(peers) == 0 { if len(peers) == 0 {
log.Warning("Running query with no peers!") log.Warning("Running query with no peers!")
@ -167,6 +170,11 @@ func (r *dhtQueryRunner) addPeerToQuery(next peer.ID) {
return return
} }
notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
Type: notif.AddingPeer,
ID: next,
})
r.peersRemaining.Increment(1) r.peersRemaining.Increment(1)
select { select {
case r.peersToQuery.EnqChan <- next: case r.peersToQuery.EnqChan <- next:
@ -221,7 +229,12 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {
// make sure we're connected to the peer. // make sure we're connected to the peer.
// FIXME abstract away into the network layer // FIXME abstract away into the network layer
if conns := r.query.dht.host.Network().ConnsToPeer(p); len(conns) == 0 { if conns := r.query.dht.host.Network().ConnsToPeer(p); len(conns) == 0 {
log.Infof("not connected. dialing.") log.Error("not connected. dialing.")
notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
Type: notif.DialingPeer,
ID: p,
})
// while we dial, we do not take up a rate limit. this is to allow // while we dial, we do not take up a rate limit. this is to allow
// forward progress during potentially very high latency dials. // forward progress during potentially very high latency dials.
r.rateLimit <- struct{}{} r.rateLimit <- struct{}{}
@ -231,9 +244,10 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {
if err := r.query.dht.host.Connect(ctx, pi); err != nil { if err := r.query.dht.host.Connect(ctx, pi); err != nil {
log.Debugf("Error connecting: %s", err) log.Debugf("Error connecting: %s", err)
notif.PublishQueryEvent(ctx, &notif.QueryEvent{ notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
Type: notif.QueryError, Type: notif.QueryError,
Extra: err.Error(), Extra: err.Error(),
ID: p,
}) })
r.Lock() r.Lock()