Skip to content

Commit b84fa2b

Browse files
committedMay 12, 2015
net/p2p + secio: parallelize crypto handshake
We had a very nasty problem: handshakes were serial so incoming dials would wait for each other to finish handshaking. this was particularly problematic when handshakes hung-- nodes would not recover quickly. This led to gateways not bootstrapping peers fast enough. The approach taken here is to do what crypto/tls does: defer the handshake until Read/Write[1]. There are a number of reasons why this is _the right thing to do_: - it delays handshaking until it is known to be necessary (doing io) - it "accepts" before the handshake, getting the handshake out of the critical path entirely. - it defers to the user's parallelization of conn handling. users must implement this in some way already so use that, instead of picking constants surely to be wrong (how many handshakes to run in parallel?) [0] http://golang.org/src/crypto/tls/conn.go#L886
1 parent 08ea56c commit b84fa2b

File tree

5 files changed

+157
-55
lines changed

5 files changed

+157
-55
lines changed
 

‎p2p/crypto/secio/interface.go

+52-19
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,13 @@ type SessionGenerator struct {
1717
PrivateKey ci.PrivKey
1818
}
1919

20-
// NewSession takes an insecure io.ReadWriter, performs a TLS-like
20+
// NewSession takes an insecure io.ReadWriter, sets up a TLS-like
2121
// handshake with the other side, and returns a secure session.
22+
// The handshake isn't run until the connection is read or written to.
2223
// See the source for the protocol details and security implementation.
2324
// The provided Context is only needed for the duration of this function.
24-
func (sg *SessionGenerator) NewSession(ctx context.Context,
25-
insecure io.ReadWriter) (Session, error) {
26-
27-
ss, err := newSecureSession(sg.LocalID, sg.PrivateKey)
28-
if err != nil {
29-
return nil, err
30-
}
31-
32-
if ctx == nil {
33-
ctx = context.Background()
34-
}
35-
ctx, cancel := context.WithCancel(ctx)
36-
if err := ss.handshake(ctx, insecure); err != nil {
37-
cancel()
38-
return nil, err
39-
}
40-
41-
return ss, nil
25+
func (sg *SessionGenerator) NewSession(ctx context.Context, insecure io.ReadWriteCloser) (Session, error) {
26+
return newSecureSession(ctx, sg.LocalID, sg.PrivateKey, insecure)
4227
}
4328

4429
type Session interface {
@@ -64,6 +49,9 @@ type Session interface {
6449

6550
// SecureReadWriter returns the encrypted communication channel
6651
func (s *secureSession) ReadWriter() msgio.ReadWriteCloser {
52+
if err := s.Handshake(); err != nil {
53+
return &closedRW{err}
54+
}
6755
return s.secure
6856
}
6957

@@ -79,15 +67,60 @@ func (s *secureSession) LocalPrivateKey() ci.PrivKey {
7967

8068
// RemotePeer retrieves the remote peer.
8169
func (s *secureSession) RemotePeer() peer.ID {
70+
if err := s.Handshake(); err != nil {
71+
return ""
72+
}
8273
return s.remotePeer
8374
}
8475

8576
// RemotePeer retrieves the remote peer.
8677
func (s *secureSession) RemotePublicKey() ci.PubKey {
78+
if err := s.Handshake(); err != nil {
79+
return nil
80+
}
8781
return s.remote.permanentPubKey
8882
}
8983

9084
// Close closes the secure session
9185
func (s *secureSession) Close() error {
86+
s.cancel()
87+
s.handshakeMu.Lock()
88+
defer s.handshakeMu.Unlock()
89+
if s.secure == nil {
90+
return s.insecure.Close() // hadn't secured yet.
91+
}
9292
return s.secure.Close()
9393
}
94+
95+
// closedRW implements a stub msgio interface that's already
96+
// closed and errored.
97+
type closedRW struct {
98+
err error
99+
}
100+
101+
func (c *closedRW) Read(buf []byte) (int, error) {
102+
return 0, c.err
103+
}
104+
105+
func (c *closedRW) Write(buf []byte) (int, error) {
106+
return 0, c.err
107+
}
108+
109+
func (c *closedRW) NextMsgLen() (int, error) {
110+
return 0, c.err
111+
}
112+
113+
func (c *closedRW) ReadMsg() ([]byte, error) {
114+
return nil, c.err
115+
}
116+
117+
func (c *closedRW) WriteMsg(buf []byte) error {
118+
return c.err
119+
}
120+
121+
func (c *closedRW) Close() error {
122+
return c.err
123+
}
124+
125+
func (c *closedRW) ReleaseMsg(m []byte) {
126+
}

‎p2p/crypto/secio/protocol.go

+42-8
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"errors"
77
"fmt"
88
"io"
9+
"sync"
10+
"time"
911

1012
msgio "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio"
1113
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
@@ -27,15 +29,23 @@ var ErrClosed = errors.New("connection closed")
2729
// ErrEcho is returned when we're attempting to handshake with the same keys and nonces.
2830
var ErrEcho = errors.New("same keys and nonces. one side talking to self.")
2931

32+
// HandshakeTimeout governs how long the handshake will be allowed to take place for.
33+
// Making this number large means there could be many bogus connections waiting to
34+
// timeout in flight. Typical handshakes take ~3RTTs, so it should be completed within
35+
// seconds across a typical planet in the solar system.
36+
var HandshakeTimeout = time.Second * 30
37+
3038
// nonceSize is the size of our nonces (in bytes)
3139
const nonceSize = 16
3240

3341
// secureSession encapsulates all the parameters needed for encrypting
3442
// and decrypting traffic from an insecure channel.
3543
type secureSession struct {
36-
secure msgio.ReadWriteCloser
44+
ctx context.Context
45+
cancel context.CancelFunc
3746

38-
insecure io.ReadWriter
47+
secure msgio.ReadWriteCloser
48+
insecure io.ReadWriteCloser
3949
insecureM msgio.ReadWriter
4050

4151
localKey ci.PrivKey
@@ -46,6 +56,10 @@ type secureSession struct {
4656
remote encParams
4757

4858
sharedSecret []byte
59+
60+
handshakeMu sync.Mutex // guards handshakeDone + handshakeErr
61+
handshakeDone bool
62+
handshakeErr error
4963
}
5064

5165
func (s *secureSession) Loggable() map[string]interface{} {
@@ -56,8 +70,9 @@ func (s *secureSession) Loggable() map[string]interface{} {
5670
return m
5771
}
5872

59-
func newSecureSession(local peer.ID, key ci.PrivKey) (*secureSession, error) {
73+
func newSecureSession(ctx context.Context, local peer.ID, key ci.PrivKey, insecure io.ReadWriteCloser) (*secureSession, error) {
6074
s := &secureSession{localPeer: local, localKey: key}
75+
s.ctx, s.cancel = context.WithCancel(ctx)
6176

6277
switch {
6378
case s.localPeer == "":
@@ -66,18 +81,37 @@ func newSecureSession(local peer.ID, key ci.PrivKey) (*secureSession, error) {
6681
return nil, errors.New("no local private key provided")
6782
case !s.localPeer.MatchesPrivateKey(s.localKey):
6883
return nil, fmt.Errorf("peer.ID does not match PrivateKey")
84+
case insecure == nil:
85+
return nil, fmt.Errorf("insecure ReadWriter is nil")
6986
}
7087

88+
s.ctx = ctx
89+
s.insecure = insecure
90+
s.insecureM = msgio.NewReadWriter(insecure)
7191
return s, nil
7292
}
7393

74-
// handsahke performs initial communication over insecure channel to share
94+
func (s *secureSession) Handshake() error {
95+
s.handshakeMu.Lock()
96+
defer s.handshakeMu.Unlock()
97+
98+
if s.handshakeErr != nil {
99+
return s.handshakeErr
100+
}
101+
102+
if !s.handshakeDone {
103+
s.handshakeErr = s.runHandshake()
104+
s.handshakeDone = true
105+
}
106+
return s.handshakeErr
107+
}
108+
109+
// runHandshake performs initial communication over insecure channel to share
75110
// keys, IDs, and initiate communication, assigning all necessary params.
76111
// requires the duplex channel to be a msgio.ReadWriter (for framed messaging)
77-
func (s *secureSession) handshake(ctx context.Context, insecure io.ReadWriter) error {
78-
79-
s.insecure = insecure
80-
s.insecureM = msgio.NewReadWriter(insecure)
112+
func (s *secureSession) runHandshake() error {
113+
ctx, cancel := context.WithTimeout(s.ctx, HandshakeTimeout) // remove
114+
defer cancel()
81115

82116
// =============================================================================
83117
// step 1. Propose -- propose cipher suite + send pubkeys + nonce

‎p2p/net/conn/dial_test.go

+40-3
Original file line numberDiff line numberDiff line change
@@ -75,25 +75,56 @@ func setupConn(t *testing.T, ctx context.Context, secure bool) (a, b Conn, p1, p
7575

7676
done := make(chan error)
7777
go func() {
78+
defer close(done)
79+
7880
var err error
7981
c2, err = d2.Dial(ctx, p1.Addr, p1.ID)
8082
if err != nil {
8183
done <- err
84+
return
85+
}
86+
87+
// if secure, need to read + write, as that's what triggers the handshake.
88+
if secure {
89+
if err := sayHello(c2); err != nil {
90+
done <- err
91+
}
8292
}
83-
close(done)
8493
}()
8594

8695
c1, err := l1.Accept()
8796
if err != nil {
8897
t.Fatal("failed to accept", err)
8998
}
99+
100+
// if secure, need to read + write, as that's what triggers the handshake.
101+
if secure {
102+
if err := sayHello(c1); err != nil {
103+
done <- err
104+
}
105+
}
106+
90107
if err := <-done; err != nil {
91108
t.Fatal(err)
92109
}
93110

94111
return c1.(Conn), c2, p1, p2
95112
}
96113

114+
func sayHello(c net.Conn) error {
115+
h := []byte("hello")
116+
if _, err := c.Write(h); err != nil {
117+
return err
118+
}
119+
if _, err := c.Read(h); err != nil {
120+
return err
121+
}
122+
if string(h) != "hello" {
123+
return fmt.Errorf("did not get hello")
124+
}
125+
return nil
126+
}
127+
97128
func testDialer(t *testing.T, secure bool) {
98129
// t.Skip("Skipping in favor of another test")
99130

@@ -203,15 +234,21 @@ func testDialerCloseEarly(t *testing.T, secure bool) {
203234
go func() {
204235
defer func() { done <- struct{}{} }()
205236

206-
_, err := l1.Accept()
237+
c, err := l1.Accept()
207238
if err != nil {
208239
if strings.Contains(err.Error(), "closed") {
209240
gotclosed <- struct{}{}
210241
return
211242
}
212243
errs <- err
213244
}
214-
errs <- fmt.Errorf("got conn")
245+
246+
if _, err := c.Write([]byte("hello")); err != nil {
247+
gotclosed <- struct{}{}
248+
return
249+
}
250+
251+
errs <- fmt.Errorf("wrote to conn")
215252
}()
216253

217254
c, err := d2.Dial(ctx, p1.Addr, p1.ID)

‎p2p/net/conn/secure_conn.go

+14-25
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"net"
66
"time"
77

8-
msgio "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio"
98
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
109
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
1110

@@ -16,15 +15,8 @@ import (
1615

1716
// secureConn wraps another Conn object with an encrypted channel.
1817
type secureConn struct {
19-
20-
// the wrapped conn
21-
insecure Conn
22-
23-
// secure io (wrapping insecure)
24-
secure msgio.ReadWriteCloser
25-
26-
// secure Session
27-
session secio.Session
18+
insecure Conn // the wrapped conn
19+
secure secio.Session // secure Session
2820
}
2921

3022
// newConn constructs a new connection
@@ -37,23 +29,20 @@ func newSecureConn(ctx context.Context, sk ic.PrivKey, insecure Conn) (Conn, err
3729
return nil, errors.New("insecure.LocalPeer() is nil")
3830
}
3931
if sk == nil {
40-
panic("way")
4132
return nil, errors.New("private key is nil")
4233
}
4334

4435
// NewSession performs the secure handshake, which takes multiple RTT
4536
sessgen := secio.SessionGenerator{LocalID: insecure.LocalPeer(), PrivateKey: sk}
46-
session, err := sessgen.NewSession(ctx, insecure)
37+
secure, err := sessgen.NewSession(ctx, insecure)
4738
if err != nil {
4839
return nil, err
4940
}
5041

5142
conn := &secureConn{
5243
insecure: insecure,
53-
session: session,
54-
secure: session.ReadWriter(),
44+
secure: secure,
5545
}
56-
log.Debugf("newSecureConn: %v to %v handshake success!", conn.LocalPeer(), conn.RemotePeer())
5746
return conn, nil
5847
}
5948

@@ -102,49 +91,49 @@ func (c *secureConn) RemoteMultiaddr() ma.Multiaddr {
10291

10392
// LocalPeer is the Peer on this side
10493
func (c *secureConn) LocalPeer() peer.ID {
105-
return c.session.LocalPeer()
94+
return c.secure.LocalPeer()
10695
}
10796

10897
// RemotePeer is the Peer on the remote side
10998
func (c *secureConn) RemotePeer() peer.ID {
110-
return c.session.RemotePeer()
99+
return c.secure.RemotePeer()
111100
}
112101

113102
// LocalPrivateKey is the public key of the peer on this side
114103
func (c *secureConn) LocalPrivateKey() ic.PrivKey {
115-
return c.session.LocalPrivateKey()
104+
return c.secure.LocalPrivateKey()
116105
}
117106

118107
// RemotePubKey is the public key of the peer on the remote side
119108
func (c *secureConn) RemotePublicKey() ic.PubKey {
120-
return c.session.RemotePublicKey()
109+
return c.secure.RemotePublicKey()
121110
}
122111

123112
// Read reads data, net.Conn style
124113
func (c *secureConn) Read(buf []byte) (int, error) {
125-
return c.secure.Read(buf)
114+
return c.secure.ReadWriter().Read(buf)
126115
}
127116

128117
// Write writes data, net.Conn style
129118
func (c *secureConn) Write(buf []byte) (int, error) {
130-
return c.secure.Write(buf)
119+
return c.secure.ReadWriter().Write(buf)
131120
}
132121

133122
func (c *secureConn) NextMsgLen() (int, error) {
134-
return c.secure.NextMsgLen()
123+
return c.secure.ReadWriter().NextMsgLen()
135124
}
136125

137126
// ReadMsg reads data, net.Conn style
138127
func (c *secureConn) ReadMsg() ([]byte, error) {
139-
return c.secure.ReadMsg()
128+
return c.secure.ReadWriter().ReadMsg()
140129
}
141130

142131
// WriteMsg writes data, net.Conn style
143132
func (c *secureConn) WriteMsg(buf []byte) error {
144-
return c.secure.WriteMsg(buf)
133+
return c.secure.ReadWriter().WriteMsg(buf)
145134
}
146135

147136
// ReleaseMsg releases a buffer
148137
func (c *secureConn) ReleaseMsg(m []byte) {
149-
c.secure.ReleaseMsg(m)
138+
c.secure.ReadWriter().ReleaseMsg(m)
150139
}

‎p2p/net/conn/secure_conn_test.go

+9
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,15 @@ func upgradeToSecureConn(t *testing.T, ctx context.Context, sk ic.PrivKey, c Con
2323
if err != nil {
2424
return nil, err
2525
}
26+
27+
// need to read + write, as that's what triggers the handshake.
28+
h := []byte("hello")
29+
if _, err := s.Write(h); err != nil {
30+
return nil, err
31+
}
32+
if _, err := s.Read(h); err != nil {
33+
return nil, err
34+
}
2635
return s, nil
2736
}
2837

0 commit comments

Comments
 (0)
Please sign in to comment.