go-libp2p-kad-dht/subscriber_notifee.go
2020-03-10 14:51:28 -07:00

262 lines
7.3 KiB
Go

package dht
import (
"fmt"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-eventbus"
ma "github.com/multiformats/go-multiaddr"
"github.com/jbenet/goprocess"
)
// subscriberNotifee implements network.Notifee and also manages the subscriber to the event bus. We consume peer
// identification events to trigger inclusion in the routing table, and we consume Disconnected events to eject peers
// from it.
type subscriberNotifee struct {
dht *IpfsDHT
subs event.Subscription
}
func newSubscriberNotifiee(dht *IpfsDHT) (*subscriberNotifee, error) {
bufSize := eventbus.BufSize(256)
evts := []interface{}{
// register for event bus notifications of when peers successfully complete identification in order to update
// the routing table
new(event.EvtPeerIdentificationCompleted),
// register for event bus protocol ID changes in order to update the routing table
new(event.EvtPeerProtocolsUpdated),
// register for event bus notifications for when our local address/addresses change so we can
// advertise those to the network
new(event.EvtLocalAddressesUpdated),
}
// register for event bus local routability changes in order to trigger switching between client and server modes
// only register for events if the DHT is operating in ModeAuto
if dht.auto {
evts = append(evts, new(event.EvtLocalReachabilityChanged))
}
subs, err := dht.host.EventBus().Subscribe(evts, bufSize)
if err != nil {
return nil, fmt.Errorf("dht could not subscribe to eventbus events; err: %s", err)
}
nn := &subscriberNotifee{
dht: dht,
subs: subs,
}
// register for network notifications
dht.host.Network().Notify(nn)
// Fill routing table with currently connected peers that are DHT servers
dht.plk.Lock()
defer dht.plk.Unlock()
for _, p := range dht.host.Network().Peers() {
valid, err := dht.validRTPeer(p)
if err != nil {
return nil, fmt.Errorf("could not check peerstore for protocol support: err: %s", err)
}
if valid {
dht.peerFound(dht.ctx, p)
}
}
return nn, nil
}
func (nn *subscriberNotifee) subscribe(proc goprocess.Process) {
dht := nn.dht
defer dht.host.Network().StopNotify(nn)
defer nn.subs.Close()
for {
select {
case e, more := <-nn.subs.Out():
if !more {
return
}
switch evt := e.(type) {
case event.EvtLocalAddressesUpdated:
// when our address changes, we should proactively tell our closest peers about it so
// we become discoverable quickly. The Identify protocol will push a signed peer record
// with our new address to all peers we are connected to. However, we might not necessarily be connected
// to our closet peers & so in the true spirit of Zen, searching for ourself in the network really is the best way
// to to forge connections with those matter.
select {
case dht.triggerSelfLookup <- nil:
default:
}
case event.EvtPeerIdentificationCompleted:
handlePeerIdentificationCompletedEvent(dht, evt)
case event.EvtPeerProtocolsUpdated:
handlePeerProtocolsUpdatedEvent(dht, evt)
case event.EvtLocalReachabilityChanged:
if dht.auto {
handleLocalReachabilityChangedEvent(dht, evt)
} else {
// something has gone really wrong if we get an event we did not subscribe to
logger.Errorf("received LocalReachabilityChanged event that was not subscribed to")
}
default:
// something has gone really wrong if we get an event for another type
logger.Errorf("got wrong type from subscription: %T", e)
}
case <-proc.Closing():
return
}
}
}
func handlePeerIdentificationCompletedEvent(dht *IpfsDHT, e event.EvtPeerIdentificationCompleted) {
dht.plk.Lock()
defer dht.plk.Unlock()
if dht.host.Network().Connectedness(e.Peer) != network.Connected {
return
}
// if the peer supports the DHT protocol, add it to our RT and kick a refresh if needed
valid, err := dht.validRTPeer(e.Peer)
if err != nil {
logger.Errorf("could not check peerstore for protocol support: err: %s", err)
return
}
if valid {
dht.peerFound(dht.ctx, e.Peer)
fixLowPeers(dht)
}
}
func handlePeerProtocolsUpdatedEvent(dht *IpfsDHT, e event.EvtPeerProtocolsUpdated) {
valid, err := dht.validRTPeer(e.Peer)
if err != nil {
logger.Errorf("could not check peerstore for protocol support: err: %s", err)
return
}
if valid {
dht.peerFound(dht.ctx, e.Peer)
} else {
dht.peerStoppedDHT(dht.ctx, e.Peer)
}
fixLowPeers(dht)
}
func handleLocalReachabilityChangedEvent(dht *IpfsDHT, e event.EvtLocalReachabilityChanged) {
var target mode
switch e.Reachability {
case network.ReachabilityPrivate, network.ReachabilityUnknown:
target = modeClient
case network.ReachabilityPublic:
target = modeServer
}
logger.Infof("processed event %T; performing dht mode switch", e)
err := dht.setMode(target)
// NOTE: the mode will be printed out as a decimal.
if err == nil {
logger.Infof("switched DHT mode successfully; new mode: %d", target)
} else {
logger.Warningf("switching DHT mode failed; new mode: %d, err: %s", target, err)
}
}
// validRTPeer returns true if the peer supports the DHT protocol and false otherwise. Supporting the DHT protocol means
// supporting the primary protocols, we do not want to add peers that are speaking obsolete secondary protocols to our
// routing table
func (dht *IpfsDHT) validRTPeer(p peer.ID) (bool, error) {
pstrs := make([]string, len(dht.protocols))
for idx, proto := range dht.protocols {
pstrs[idx] = string(proto)
}
protos, err := dht.peerstore.SupportsProtocols(p, pstrs...)
if err != nil {
return false, err
}
return len(protos) > 0, nil
}
// fixLowPeers tries to get more peers into the routing table if we're below the threshold
func fixLowPeers(dht *IpfsDHT) {
if dht.routingTable.Size() > minRTRefreshThreshold {
return
}
// Passively add peers we already know about
for _, p := range dht.host.Network().Peers() {
// Don't bother probing, we do that on connect.
valid, _ := dht.validRTPeer(p)
if valid {
dht.peerFound(dht.Context(), p)
}
}
if dht.autoRefresh {
select {
case dht.triggerRtRefresh <- nil:
default:
}
}
}
func (nn *subscriberNotifee) Disconnected(n network.Network, v network.Conn) {
dht := nn.dht
select {
case <-dht.Process().Closing():
return
default:
}
p := v.RemotePeer()
// Lock and check to see if we're still connected. We lock to make sure
// we don't concurrently process a connect event.
dht.plk.Lock()
defer dht.plk.Unlock()
if dht.host.Network().Connectedness(p) == network.Connected {
// We're still connected.
return
}
dht.peerDisconnected(dht.ctx, p)
fixLowPeers(dht)
dht.smlk.Lock()
defer dht.smlk.Unlock()
ms, ok := dht.strmap[p]
if !ok {
return
}
delete(dht.strmap, p)
// Do this asynchronously as ms.lk can block for a while.
go func() {
if err := ms.lk.Lock(dht.Context()); err != nil {
return
}
defer ms.lk.Unlock()
ms.invalidate()
}()
}
func (nn *subscriberNotifee) Connected(n network.Network, v network.Conn) {}
func (nn *subscriberNotifee) OpenedStream(n network.Network, v network.Stream) {}
func (nn *subscriberNotifee) ClosedStream(n network.Network, v network.Stream) {}
func (nn *subscriberNotifee) Listen(n network.Network, a ma.Multiaddr) {}
func (nn *subscriberNotifee) ListenClose(n network.Network, a ma.Multiaddr) {}