use batching datastore for providers storage

License: MIT
Signed-off-by: Jeromy <why@ipfs.io>
This commit is contained in:
Jeromy 2016-07-26 10:48:25 -07:00
parent 034ee297f2
commit 1bd98f05f4
6 changed files with 22 additions and 32 deletions

4
dht.go
View File

@ -21,12 +21,12 @@ import (
goprocess "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess" goprocess "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess"
goprocessctx "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess/context" goprocessctx "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess/context"
peer "gx/ipfs/QmRBqJF7hb8ZSpRcMwUt8hNhydWcxGEhtk81HKq6oUwKvs/go-libp2p-peer" peer "gx/ipfs/QmRBqJF7hb8ZSpRcMwUt8hNhydWcxGEhtk81HKq6oUwKvs/go-libp2p-peer"
ds "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore"
ci "gx/ipfs/QmUWER4r4qMvaCnX5zREcfyiWN7cXN9g3a7fkRqNz8qWPP/go-libp2p-crypto" ci "gx/ipfs/QmUWER4r4qMvaCnX5zREcfyiWN7cXN9g3a7fkRqNz8qWPP/go-libp2p-crypto"
host "gx/ipfs/QmVCe3SNMjkcPgnpFhZs719dheq6xE7gJwjzV7aWcUM4Ms/go-libp2p/p2p/host" host "gx/ipfs/QmVCe3SNMjkcPgnpFhZs719dheq6xE7gJwjzV7aWcUM4Ms/go-libp2p/p2p/host"
protocol "gx/ipfs/QmVCe3SNMjkcPgnpFhZs719dheq6xE7gJwjzV7aWcUM4Ms/go-libp2p/p2p/protocol" protocol "gx/ipfs/QmVCe3SNMjkcPgnpFhZs719dheq6xE7gJwjzV7aWcUM4Ms/go-libp2p/p2p/protocol"
proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto" proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
ds "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore"
) )
var log = logging.Logger("dht") var log = logging.Logger("dht")
@ -65,7 +65,7 @@ type IpfsDHT struct {
} }
// NewDHT creates a new DHT object with the given peer as the 'local' host // NewDHT creates a new DHT object with the given peer as the 'local' host
func NewDHT(ctx context.Context, h host.Host, dstore ds.Datastore) *IpfsDHT { func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
dht := new(IpfsDHT) dht := new(IpfsDHT)
dht.datastore = dstore dht.datastore = dstore
dht.self = h.ID() dht.self = h.ID()

View File

@ -14,8 +14,8 @@ import (
record "github.com/ipfs/go-ipfs/routing/record" record "github.com/ipfs/go-ipfs/routing/record"
ci "github.com/ipfs/go-ipfs/thirdparty/testutil/ci" ci "github.com/ipfs/go-ipfs/thirdparty/testutil/ci"
travisci "github.com/ipfs/go-ipfs/thirdparty/testutil/ci/travis" travisci "github.com/ipfs/go-ipfs/thirdparty/testutil/ci/travis"
ds "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore" ds "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore"
dssync "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore/sync" dssync "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore/sync"
pstore "gx/ipfs/QmQdnfvZQuhdT93LNc5bos52wAmdr3G2p6G8teLJMEN32P/go-libp2p-peerstore" pstore "gx/ipfs/QmQdnfvZQuhdT93LNc5bos52wAmdr3G2p6G8teLJMEN32P/go-libp2p-peerstore"
peer "gx/ipfs/QmRBqJF7hb8ZSpRcMwUt8hNhydWcxGEhtk81HKq6oUwKvs/go-libp2p-peer" peer "gx/ipfs/QmRBqJF7hb8ZSpRcMwUt8hNhydWcxGEhtk81HKq6oUwKvs/go-libp2p-peer"

View File

@ -10,8 +10,8 @@ import (
routing "github.com/ipfs/go-ipfs/routing" routing "github.com/ipfs/go-ipfs/routing"
pb "github.com/ipfs/go-ipfs/routing/dht/pb" pb "github.com/ipfs/go-ipfs/routing/dht/pb"
record "github.com/ipfs/go-ipfs/routing/record" record "github.com/ipfs/go-ipfs/routing/record"
ds "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore" ds "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore"
dssync "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore/sync" dssync "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore/sync"
pstore "gx/ipfs/QmQdnfvZQuhdT93LNc5bos52wAmdr3G2p6G8teLJMEN32P/go-libp2p-peerstore" pstore "gx/ipfs/QmQdnfvZQuhdT93LNc5bos52wAmdr3G2p6G8teLJMEN32P/go-libp2p-peerstore"
inet "gx/ipfs/QmVCe3SNMjkcPgnpFhZs719dheq6xE7gJwjzV7aWcUM4Ms/go-libp2p/p2p/net" inet "gx/ipfs/QmVCe3SNMjkcPgnpFhZs719dheq6xE7gJwjzV7aWcUM4Ms/go-libp2p/p2p/net"

View File

@ -8,7 +8,7 @@ import (
key "github.com/ipfs/go-ipfs/blocks/key" key "github.com/ipfs/go-ipfs/blocks/key"
pb "github.com/ipfs/go-ipfs/routing/dht/pb" pb "github.com/ipfs/go-ipfs/routing/dht/pb"
lgbl "github.com/ipfs/go-ipfs/thirdparty/loggables" lgbl "github.com/ipfs/go-ipfs/thirdparty/loggables"
ds "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore" ds "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore"
pstore "gx/ipfs/QmQdnfvZQuhdT93LNc5bos52wAmdr3G2p6G8teLJMEN32P/go-libp2p-peerstore" pstore "gx/ipfs/QmQdnfvZQuhdT93LNc5bos52wAmdr3G2p6G8teLJMEN32P/go-libp2p-peerstore"
peer "gx/ipfs/QmRBqJF7hb8ZSpRcMwUt8hNhydWcxGEhtk81HKq6oUwKvs/go-libp2p-peer" peer "gx/ipfs/QmRBqJF7hb8ZSpRcMwUt8hNhydWcxGEhtk81HKq6oUwKvs/go-libp2p-peer"

View File

@ -10,16 +10,26 @@ import (
goprocess "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess" goprocess "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess"
goprocessctx "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess/context" goprocessctx "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess/context"
peer "gx/ipfs/QmRBqJF7hb8ZSpRcMwUt8hNhydWcxGEhtk81HKq6oUwKvs/go-libp2p-peer" peer "gx/ipfs/QmRBqJF7hb8ZSpRcMwUt8hNhydWcxGEhtk81HKq6oUwKvs/go-libp2p-peer"
ds "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore"
dsq "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore/query"
lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru" lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru"
autobatch "gx/ipfs/QmVvJ27GcLaLSXvcB4auk3Gn3xuWK5ti5ENkZ2pCoJEYW4/autobatch"
base32 "gx/ipfs/Qmb1DA2A9LS2wR4FFweB4uEDomFsdmnw1VLawLE1yQzudj/base32" base32 "gx/ipfs/Qmb1DA2A9LS2wR4FFweB4uEDomFsdmnw1VLawLE1yQzudj/base32"
ds "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore"
dsq "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore/query"
key "github.com/ipfs/go-ipfs/blocks/key" key "github.com/ipfs/go-ipfs/blocks/key"
flags "github.com/ipfs/go-ipfs/flags"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
) )
var batchBufferSize = 256
func init() {
if flags.LowMemMode {
batchBufferSize = 8
}
}
var log = logging.Logger("providers") var log = logging.Logger("providers")
var lruCacheSize = 256 var lruCacheSize = 256
@ -30,11 +40,9 @@ type ProviderManager struct {
// all non channel fields are meant to be accessed only within // all non channel fields are meant to be accessed only within
// the run method // the run method
providers *lru.Cache providers *lru.Cache
local map[key.Key]struct{}
lpeer peer.ID lpeer peer.ID
dstore ds.Datastore dstore ds.Datastore
getlocal chan chan []key.Key
newprovs chan *addProv newprovs chan *addProv
getprovs chan *getProv getprovs chan *getProv
period time.Duration period time.Duration
@ -58,19 +66,17 @@ type getProv struct {
resp chan []peer.ID resp chan []peer.ID
} }
func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Datastore) *ProviderManager { func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching) *ProviderManager {
pm := new(ProviderManager) pm := new(ProviderManager)
pm.getprovs = make(chan *getProv) pm.getprovs = make(chan *getProv)
pm.newprovs = make(chan *addProv) pm.newprovs = make(chan *addProv)
pm.dstore = dstore pm.dstore = autobatch.NewAutoBatching(dstore, batchBufferSize)
cache, err := lru.New(lruCacheSize) cache, err := lru.New(lruCacheSize)
if err != nil { if err != nil {
panic(err) //only happens if negative value is passed to lru constructor panic(err) //only happens if negative value is passed to lru constructor
} }
pm.providers = cache pm.providers = cache
pm.getlocal = make(chan chan []key.Key)
pm.local = make(map[key.Key]struct{})
pm.proc = goprocessctx.WithContext(ctx) pm.proc = goprocessctx.WithContext(ctx)
pm.cleanupInterval = defaultCleanupInterval pm.cleanupInterval = defaultCleanupInterval
pm.proc.Go(func(p goprocess.Process) { pm.run() }) pm.proc.Go(func(p goprocess.Process) { pm.run() })
@ -251,9 +257,6 @@ func (pm *ProviderManager) run() {
for { for {
select { select {
case np := <-pm.newprovs: case np := <-pm.newprovs:
if np.val == pm.lpeer {
pm.local[np.k] = struct{}{}
}
err := pm.addProv(np.k, np.val) err := pm.addProv(np.k, np.val)
if err != nil { if err != nil {
log.Error("error adding new providers: ", err) log.Error("error adding new providers: ", err)
@ -265,13 +268,6 @@ func (pm *ProviderManager) run() {
} }
gp.resp <- provs gp.resp <- provs
case lc := <-pm.getlocal:
var keys []key.Key
for k := range pm.local {
keys = append(keys, k)
}
lc <- keys
case <-tick.C: case <-tick.C:
keys, err := pm.getAllProvKeys() keys, err := pm.getAllProvKeys()
if err != nil { if err != nil {
@ -337,12 +333,6 @@ func (pm *ProviderManager) GetProviders(ctx context.Context, k key.Key) []peer.I
} }
} }
func (pm *ProviderManager) GetLocal() []key.Key {
resp := make(chan []key.Key)
pm.getlocal <- resp
return <-resp
}
func newProviderSet() *providerSet { func newProviderSet() *providerSet {
return &providerSet{ return &providerSet{
set: make(map[peer.ID]time.Time), set: make(map[peer.ID]time.Time),

View File

@ -7,7 +7,7 @@ import (
key "github.com/ipfs/go-ipfs/blocks/key" key "github.com/ipfs/go-ipfs/blocks/key"
peer "gx/ipfs/QmRBqJF7hb8ZSpRcMwUt8hNhydWcxGEhtk81HKq6oUwKvs/go-libp2p-peer" peer "gx/ipfs/QmRBqJF7hb8ZSpRcMwUt8hNhydWcxGEhtk81HKq6oUwKvs/go-libp2p-peer"
ds "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore" ds "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
) )