dht ctxcloserify

This commit is contained in:
Juan Batiz-Benet 2014-10-25 07:12:01 -07:00
parent 6998a8448e
commit be52e35ed7
5 changed files with 41 additions and 31 deletions

15
dht.go
View File

@ -14,6 +14,7 @@ import (
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
kb "github.com/jbenet/go-ipfs/routing/kbucket"
u "github.com/jbenet/go-ipfs/util"
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
@ -56,7 +57,7 @@ type IpfsDHT struct {
//lock to make diagnostics work better
diaglock sync.Mutex
ctx context.Context
ctxc.ContextCloser
}
// NewDHT creates a new DHT object with the given peer as the 'local' host
@ -67,9 +68,10 @@ func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, dialer inet.Dia
dht.datastore = dstore
dht.self = p
dht.peerstore = ps
dht.ctx = ctx
dht.ContextCloser = ctxc.NewContextCloser(ctx, nil)
dht.providers = NewProviderManager(p.ID())
dht.providers = NewProviderManager(dht.Context(), p.ID())
dht.AddCloserChild(dht.providers)
dht.routingTables = make([]*kb.RoutingTable, 3)
dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Millisecond*1000)
@ -78,6 +80,7 @@ func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, dialer inet.Dia
dht.birth = time.Now()
if doPinging {
dht.Children().Add(1)
go dht.PingRoutine(time.Second * 10)
}
return dht
@ -516,6 +519,8 @@ func (dht *IpfsDHT) loadProvidableKeys() error {
// PingRoutine periodically pings nearest neighbors.
func (dht *IpfsDHT) PingRoutine(t time.Duration) {
defer dht.Children().Done()
tick := time.Tick(t)
for {
select {
@ -524,13 +529,13 @@ func (dht *IpfsDHT) PingRoutine(t time.Duration) {
rand.Read(id)
peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(u.Key(id)), 5)
for _, p := range peers {
ctx, _ := context.WithTimeout(dht.ctx, time.Second*5)
ctx, _ := context.WithTimeout(dht.Context(), time.Second*5)
err := dht.Ping(ctx, p)
if err != nil {
log.Errorf("Ping error: %s", err)
}
}
case <-dht.ctx.Done():
case <-dht.Closing():
return
}
}

View File

@ -92,8 +92,8 @@ func TestPing(t *testing.T) {
dhtA := setupDHT(ctx, t, peerA)
dhtB := setupDHT(ctx, t, peerB)
defer dhtA.Halt()
defer dhtB.Halt()
defer dhtA.Close()
defer dhtB.Close()
defer dhtA.dialer.(inet.Network).Close()
defer dhtB.dialer.(inet.Network).Close()
@ -136,8 +136,8 @@ func TestValueGetSet(t *testing.T) {
dhtA := setupDHT(ctx, t, peerA)
dhtB := setupDHT(ctx, t, peerB)
defer dhtA.Halt()
defer dhtB.Halt()
defer dhtA.Close()
defer dhtB.Close()
defer dhtA.dialer.(inet.Network).Close()
defer dhtB.dialer.(inet.Network).Close()
@ -179,7 +179,7 @@ func TestProvides(t *testing.T) {
_, peers, dhts := setupDHTS(ctx, 4, t)
defer func() {
for i := 0; i < 4; i++ {
dhts[i].Halt()
dhts[i].Close()
defer dhts[i].dialer.(inet.Network).Close()
}
}()
@ -239,7 +239,7 @@ func TestProvidesAsync(t *testing.T) {
_, peers, dhts := setupDHTS(ctx, 4, t)
defer func() {
for i := 0; i < 4; i++ {
dhts[i].Halt()
dhts[i].Close()
defer dhts[i].dialer.(inet.Network).Close()
}
}()
@ -302,7 +302,7 @@ func TestLayeredGet(t *testing.T) {
_, peers, dhts := setupDHTS(ctx, 4, t)
defer func() {
for i := 0; i < 4; i++ {
dhts[i].Halt()
dhts[i].Close()
defer dhts[i].dialer.(inet.Network).Close()
}
}()
@ -355,7 +355,7 @@ func TestFindPeer(t *testing.T) {
_, peers, dhts := setupDHTS(ctx, 4, t)
defer func() {
for i := 0; i < 4; i++ {
dhts[i].Halt()
dhts[i].Close()
dhts[i].dialer.(inet.Network).Close()
}
}()
@ -443,8 +443,8 @@ func TestConnectCollision(t *testing.T) {
t.Fatal("Timeout received!")
}
dhtA.Halt()
dhtB.Halt()
dhtA.Close()
dhtB.Close()
dhtA.dialer.(inet.Network).Close()
dhtB.dialer.(inet.Network).Close()

View File

@ -205,9 +205,3 @@ func (dht *IpfsDHT) handleAddProvider(p peer.Peer, pmes *pb.Message) (*pb.Messag
return pmes, nil // send back same msg as confirmation.
}
// Halt stops all communications from this peer and shut down
// TODO -- remove this in favor of context
func (dht *IpfsDHT) Halt() {
dht.providers.Halt()
}

View File

@ -5,6 +5,9 @@ import (
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)
type ProviderManager struct {
@ -14,8 +17,8 @@ type ProviderManager struct {
getlocal chan chan []u.Key
newprovs chan *addProv
getprovs chan *getProv
halt chan struct{}
period time.Duration
ctxc.ContextCloser
}
type addProv struct {
@ -28,19 +31,24 @@ type getProv struct {
resp chan []peer.Peer
}
func NewProviderManager(local peer.ID) *ProviderManager {
func NewProviderManager(ctx context.Context, local peer.ID) *ProviderManager {
pm := new(ProviderManager)
pm.getprovs = make(chan *getProv)
pm.newprovs = make(chan *addProv)
pm.providers = make(map[u.Key][]*providerInfo)
pm.getlocal = make(chan chan []u.Key)
pm.local = make(map[u.Key]struct{})
pm.halt = make(chan struct{})
pm.ContextCloser = ctxc.NewContextCloser(ctx, nil)
pm.Children().Add(1)
go pm.run()
return pm
}
func (pm *ProviderManager) run() {
defer pm.Children().Done()
tick := time.NewTicker(time.Hour)
for {
select {
@ -53,6 +61,7 @@ func (pm *ProviderManager) run() {
pi.Value = np.val
arr := pm.providers[np.k]
pm.providers[np.k] = append(arr, pi)
case gp := <-pm.getprovs:
var parr []peer.Peer
provs := pm.providers[gp.k]
@ -60,12 +69,14 @@ func (pm *ProviderManager) run() {
parr = append(parr, p.Value)
}
gp.resp <- parr
case lc := <-pm.getlocal:
var keys []u.Key
for k, _ := range pm.local {
keys = append(keys, k)
}
lc <- keys
case <-tick.C:
for k, provs := range pm.providers {
var filtered []*providerInfo
@ -76,7 +87,8 @@ func (pm *ProviderManager) run() {
}
pm.providers[k] = filtered
}
case <-pm.halt:
case <-pm.Closing():
return
}
}
@ -102,7 +114,3 @@ func (pm *ProviderManager) GetLocal() []u.Key {
pm.getlocal <- resp
return <-resp
}
func (pm *ProviderManager) Halt() {
pm.halt <- struct{}{}
}

View File

@ -5,16 +5,19 @@ import (
"github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)
func TestProviderManager(t *testing.T) {
ctx := context.Background()
mid := peer.ID("testing")
p := NewProviderManager(mid)
p := NewProviderManager(ctx, mid)
a := u.Key("test")
p.AddProvider(a, peer.WithIDString("testingprovider"))
resp := p.GetProviders(a)
if len(resp) != 1 {
t.Fatal("Could not retrieve provider.")
}
p.Halt()
p.Close()
}