newMessage and more impl.

This commit is contained in:
Juan Batiz-Benet 2014-09-16 07:17:55 -07:00 committed by Brian Tiger Chow
parent e85011ab43
commit fa2b736bbe
4 changed files with 59 additions and 104 deletions

View File

@ -6,6 +6,15 @@ import (
u "github.com/jbenet/go-ipfs/util"
)
func newMessage(typ Message_MessageType, key string, level int) *Message {
m := &Message{
Type: &typ,
Key: &key,
}
m.SetClusterLevel(level)
return m
}
func peerToPBPeer(p *peer.Peer) *Message_Peer {
pbp := new(Message_Peer)
if len(p.Addresses) == 0 || p.Addresses[0] == nil {
@ -24,7 +33,7 @@ func peerToPBPeer(p *peer.Peer) *Message_Peer {
}
func peersToPBPeers(peers []*peer.Peer) []*Message_Peer {
pbpeers = make([]*Message_Peer, len(peers))
pbpeers := make([]*Message_Peer, len(peers))
for i, p := range peers {
pbpeers[i] = peerToPBPeer(p)
}
@ -34,18 +43,19 @@ func peersToPBPeers(peers []*peer.Peer) []*Message_Peer {
// GetClusterLevel gets and adjusts the cluster level on the message.
// a +/- 1 adjustment is needed to distinguish a valid first level (1) and
// default "no value" protobuf behavior (0)
func (m *Message) GetClusterLevel() int32 {
func (m *Message) GetClusterLevel() int {
level := m.GetClusterLevelRaw() - 1
if level < 0 {
u.PErr("handleGetValue: no routing level specified, assuming 0\n")
level = 0
}
return level
return int(level)
}
// SetClusterLevel adjusts and sets the cluster level on the message.
// a +/- 1 adjustment is needed to distinguish a valid first level (1) and
// default "no value" protobuf behavior (0)
func (m *Message) SetClusterLevel(level int32) {
m.ClusterLevelRaw = &level
func (m *Message) SetClusterLevel(level int) {
lvl := int32(level)
m.ClusterLevelRaw = &lvl
}

107
dht.go
View File

@ -246,11 +246,7 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer,
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p *peer.Peer,
key u.Key, level int) (*Message, error) {
typ := Message_GET_VALUE
skey := string(key)
pmes := &Message{Type: &typ, Key: &skey}
pmes.SetClusterLevel(int32(level))
pmes := newMessage(Message_GET_VALUE, string(key), level)
return dht.sendRequest(ctx, p, pmes)
}
@ -262,7 +258,7 @@ func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key,
peerlist []*Message_Peer, level int) ([]byte, error) {
for _, pinfo := range peerlist {
p, err := dht.peerFromInfo(pinfo)
p, err := dht.ensureConnectedToPeer(pinfo)
if err != nil {
u.DErr("getFromPeers error: %s\n", err)
continue
@ -334,34 +330,9 @@ func (dht *IpfsDHT) Find(id peer.ID) (*peer.Peer, *kb.RoutingTable) {
return nil, nil
}
func (dht *IpfsDHT) findPeerSingle(p *peer.Peer, id peer.ID, timeout time.Duration, level int) (*Message, error) {
pmes := Message{
Type: Message_FIND_NODE,
Key: string(id),
ID: swarm.GenerateMessageID(),
Value: []byte{byte(level)},
}
mes := swarm.NewMessage(p, pmes.ToProtobuf())
listenChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
t := time.Now()
dht.netChan.Outgoing <- mes
after := time.After(timeout)
select {
case <-after:
dht.listener.Unlisten(pmes.ID)
return nil, u.ErrTimeout
case resp := <-listenChan:
roundtrip := time.Since(t)
resp.Peer.SetLatency(roundtrip)
pmesOut := new(Message)
err := proto.Unmarshal(resp.Data, pmesOut)
if err != nil {
return nil, err
}
return pmesOut, nil
}
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p *peer.Peer, id peer.ID, level int) (*Message, error) {
pmes := newMessage(Message_FIND_NODE, string(id), level)
return dht.sendRequest(ctx, p, pmes)
}
func (dht *IpfsDHT) printTables() {
@ -370,54 +341,27 @@ func (dht *IpfsDHT) printTables() {
}
}
func (dht *IpfsDHT) findProvidersSingle(p *peer.Peer, key u.Key, level int, timeout time.Duration) (*Message, error) {
pmes := Message{
Type: Message_GET_PROVIDERS,
Key: string(key),
ID: swarm.GenerateMessageID(),
Value: []byte{byte(level)},
}
mes := swarm.NewMessage(p, pmes.ToProtobuf())
listenChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
dht.netChan.Outgoing <- mes
after := time.After(timeout)
select {
case <-after:
dht.listener.Unlisten(pmes.ID)
return nil, u.ErrTimeout
case resp := <-listenChan:
u.DOut("FindProviders: got response.\n")
pmesOut := new(Message)
err := proto.Unmarshal(resp.Data, pmesOut)
if err != nil {
return nil, err
}
return pmesOut, nil
}
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p *peer.Peer, key u.Key, level int) (*Message, error) {
pmes := newMessage(Message_GET_PROVIDERS, string(key), level)
return dht.sendRequest(ctx, p, pmes)
}
// TODO: Could be done async
func (dht *IpfsDHT) addPeerList(key u.Key, peers []*Message_PBPeer) []*peer.Peer {
func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []*peer.Peer {
var provArr []*peer.Peer
for _, prov := range peers {
// Dont add outselves to the list
if peer.ID(prov.GetId()).Equal(dht.self.ID) {
p, err := dht.peerFromInfo(prov)
if err != nil {
u.PErr("error getting peer from info: %v\n", err)
continue
}
// Dont add someone who is already on the list
p := dht.network.GetPeer(u.Key(prov.GetId()))
if p == nil {
u.DOut("given provider %s was not in our network already.\n", peer.ID(prov.GetId()).Pretty())
var err error
p, err = dht.peerFromInfo(prov)
if err != nil {
u.PErr("error connecting to new peer: %s\n", err)
continue
}
// Dont add outselves to the list
if p.ID.Equal(dht.self.ID) {
continue
}
// TODO(jbenet) ensure providers is idempotent
dht.providers.AddProvider(key, p)
provArr = append(provArr, p)
}
@ -450,6 +394,7 @@ func (dht *IpfsDHT) betterPeerToQuery(pmes *Message) *peer.Peer {
}
// self is closer? nil
key := u.Key(pmes.GetKey())
if kb.Closer(dht.self.ID, closer.ID, key) {
return nil
}
@ -478,11 +423,19 @@ func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (*peer.Peer, error) {
// create new Peer
p := &peer.Peer{ID: id}
p.AddAddress(maddr)
dht.peerstore.Put(pr)
dht.peerstore.Put(p)
}
return p, nil
}
func (dht *IpfsDHT) ensureConnectedToPeer(pbp *Message_Peer) (*peer.Peer, error) {
p, err := dht.peerFromInfo(pbp)
if err != nil {
return nil, err
}
// dial connection
err = dht.network.Dial(p)
err = dht.network.DialPeer(p)
return p, err
}
@ -497,7 +450,7 @@ func (dht *IpfsDHT) loadProvidableKeys() error {
return nil
}
// Builds up list of peers by requesting random peer IDs
// Bootstrap builds up list of peers by requesting random peer IDs
func (dht *IpfsDHT) Bootstrap() {
id := make([]byte, 16)
rand.Read(id)

View File

@ -40,11 +40,8 @@ func (dht *IpfsDHT) handlerForMsgType(t Message_MessageType) dhtHandler {
func (dht *IpfsDHT) putValueToNetwork(p *peer.Peer, key string, value []byte) error {
typ := Message_PUT_VALUE
pmes := &Message{
Type: &typ,
Key: &key,
Value: value,
}
pmes := newMessage(Message_PUT_VALUE, string(key), 0)
pmes.Value = value
mes, err := msg.FromObject(p, pmes)
if err != nil {
@ -57,10 +54,7 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *Message) (*Message, error
u.DOut("handleGetValue for key: %s\n", pmes.GetKey())
// setup response
resp := &Message{
Type: pmes.Type,
Key: pmes.Key,
}
resp := newMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
// first, is the key even a key?
key := pmes.GetKey()
@ -113,24 +107,22 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *Message) (*Message, error
}
// Store a value in this peer local storage
func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *Message) {
func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *Message) (*Message, error) {
dht.dslock.Lock()
defer dht.dslock.Unlock()
dskey := ds.NewKey(pmes.GetKey())
err := dht.datastore.Put(dskey, pmes.GetValue())
if err != nil {
// For now, just panic, handle this better later maybe
panic(err)
}
return nil, err
}
func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *Message) (*Message, error) {
u.DOut("[%s] Responding to ping from [%s]!\n", dht.self.ID.Pretty(), p.ID.Pretty())
return &Message{Type: pmes.Type}, nil
return newMessage(pmes.GetType(), "", int(pmes.GetClusterLevel())), nil
}
func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *Message) (*Message, error) {
resp := &Message{Type: pmes.Type}
resp := newMessage(pmes.GetType(), "", pmes.GetClusterLevel())
var closest *peer.Peer
// if looking for self... special case where we send it on CloserPeers.
@ -156,10 +148,7 @@ func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *Message) (*Message, error
}
func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *Message) (*Message, error) {
resp := &Message{
Type: pmes.Type,
Key: pmes.Key,
}
resp := newMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
// check if we have this value, to add ourselves as provider.
has, err := dht.datastore.Has(ds.NewKey(pmes.GetKey()))
@ -193,11 +182,14 @@ type providerInfo struct {
Value *peer.Peer
}
func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *Message) {
func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *Message) (*Message, error) {
key := u.Key(pmes.GetKey())
u.DOut("[%s] Adding [%s] as a provider for '%s'\n",
dht.self.ID.Pretty(), p.ID.Pretty(), peer.ID(key).Pretty())
dht.providers.AddProvider(key, p)
return nil, nil
}
// Halt stops all communications from this peer and shut down

View File

@ -261,7 +261,7 @@ func (dht *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Pee
}
if pmes.GetSuccess() {
u.DOut("Got providers back from findProviders call!\n")
provs := dht.addPeerList(key, pmes.GetPeers())
provs := dht.addProviders(key, pmes.GetPeers())
ll.Success = true
return provs, nil
}