Skip to content

Commit 3eadf6c

Browse files
author
Amir Khassaia
committed
refactor: post review discussion, move http proxy handling to a sample app to keep the library simple
refactor: revert SNI handling and add a pre connect hook instead to keep the library clean and to allow the clients to customize the final tls config that will be in use for connecting Signed-off-by: amir-khassaia <amir.khassaia@gmail.com>
1 parent c39ca86 commit 3eadf6c

File tree

5 files changed

+134
-24
lines changed

5 files changed

+134
-24
lines changed

client.go

+9-4
Original file line numberDiff line numberDiff line change
@@ -379,8 +379,13 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
379379
cm := newConnectMsgFromOptions(&c.options, broker)
380380
DEBUG.Println(CLI, "about to write new connect msg")
381381
CONN:
382+
tlsCfg := c.options.TLSConfig
383+
if c.options.OnConnectAttempt != nil {
384+
DEBUG.Println(CLI, "using custom onConnectAttempt handler...")
385+
tlsCfg = c.options.OnConnectAttempt(broker, c.options.TLSConfig)
386+
}
382387
// Start by opening the network connection (tcp, tls, ws) etc
383-
conn, err = openConnection(broker, c.options.TLSConfig, c.options.ConnectTimeout, c.options.HTTPHeaders, c.options.WebsocketOptions)
388+
conn, err = openConnection(broker, tlsCfg, c.options.ConnectTimeout, c.options.HTTPHeaders, c.options.WebsocketOptions)
384389
if err != nil {
385390
ERROR.Println(CLI, err.Error())
386391
WARN.Println(CLI, "failed to connect to broker, trying next")
@@ -397,7 +402,7 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
397402

398403
// We may be have to attempt the connection with MQTT 3.1
399404
if conn != nil {
400-
conn.Close()
405+
_ = conn.Close()
401406
}
402407
if !c.options.protocolVersionExplicit && protocolVersion == 4 { // try falling back to 3.1?
403408
DEBUG.Println(CLI, "Trying reconnect using MQTT 3.1 protocol")
@@ -504,8 +509,8 @@ func (c *client) internalConnLost(err error) {
504509
}
505510
}
506511

507-
// startCommsWorkers is called when the connection is up. It starts off all of the routines needed to process incoming and
508-
// outgoing messages.
512+
// startCommsWorkers is called when the connection is up.
513+
// It starts off all of the routines needed to process incoming and outgoing messages.
509514
// Returns true if the comms workers were started (i.e. they were not already running)
510515
func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packets.ControlPacket) bool {
511516
DEBUG.Println(CLI, "startCommsWorkers called")

http_proxy.go cmd/httpproxy/httpproxy.go

+1-6
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package mqtt
1+
package main
22

33
import (
44
"bufio"
@@ -77,8 +77,3 @@ func (s *httpProxy) Dial(_, addr string) (net.Conn, error) {
7777

7878
return c, nil
7979
}
80-
81-
func init() {
82-
proxy.RegisterDialerType("http", newHTTPProxy)
83-
proxy.RegisterDialerType("https", newHTTPProxy)
84-
}

cmd/httpproxy/main.go

+108
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright (c) 2013 IBM Corp.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Public License v1.0
6+
* which accompanies this distribution, and is available at
7+
* http://www.eclipse.org/legal/epl-v10.html
8+
*
9+
* Contributors:
10+
* Seth Hoenig
11+
* Allan Stockdill-Mander
12+
* Mike Robertson
13+
*/
14+
15+
package main
16+
17+
import (
18+
"crypto/tls"
19+
"flag"
20+
"fmt"
21+
"golang.org/x/net/proxy"
22+
"log"
23+
"net/url"
24+
25+
// "log"
26+
"os"
27+
"os/signal"
28+
"strconv"
29+
"syscall"
30+
"time"
31+
32+
MQTT "github.com/eclipse/paho.mqtt.golang"
33+
)
34+
35+
func onMessageReceived(_ MQTT.Client, message MQTT.Message) {
36+
fmt.Printf("Received message on topic: %s\nMessage: %s\n", message.Topic(), message.Payload())
37+
}
38+
39+
func init() {
40+
// Pre-register custom HTTP proxy dialers for use with proxy.FromEnvironment
41+
proxy.RegisterDialerType("http", newHTTPProxy)
42+
proxy.RegisterDialerType("https", newHTTPProxy)
43+
}
44+
45+
/**
46+
* Illustrates how to make an MQTT connection with HTTP proxy CONNECT support.
47+
* Specify proxy via environment variable: eg: ALL_PROXY=https://proxy_host:port
48+
*/
49+
func main() {
50+
MQTT.DEBUG = log.New(os.Stdout, "", 0)
51+
MQTT.ERROR = log.New(os.Stderr, "", 0)
52+
53+
c := make(chan os.Signal, 1)
54+
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
55+
56+
hostname, _ := os.Hostname()
57+
58+
server := flag.String("server", "tcp://127.0.0.1:1883", "The full URL of the MQTT server to "+
59+
"connect to ex: tcp://127.0.0.1:1883")
60+
topic := flag.String("topic", "#", "Topic to subscribe to")
61+
qos := flag.Int("qos", 0, "The QoS to subscribe to messages at")
62+
clientid := flag.String("clientid", hostname+strconv.Itoa(time.Now().Second()), "A clientid for the connection")
63+
username := flag.String("username", "", "A username to authenticate to the MQTT server")
64+
password := flag.String("password", "", "Password to match username")
65+
token := flag.String("token", "", "An optional token credential to authenticate with")
66+
skipVerify := flag.Bool("skipVerify", false, "Controls whether TLS certificate is verified")
67+
flag.Parse()
68+
69+
connOpts := MQTT.NewClientOptions().AddBroker(*server).
70+
SetClientID(*clientid).
71+
SetCleanSession(true).
72+
SetProtocolVersion(4)
73+
74+
if *username != "" {
75+
connOpts.SetUsername(*username)
76+
if *password != "" {
77+
connOpts.SetPassword(*password)
78+
}
79+
} else if *token != "" {
80+
connOpts.SetCredentialsProvider(func() (string, string) {
81+
return "unused", *token
82+
})
83+
}
84+
85+
connOpts.SetTLSConfig(&tls.Config{InsecureSkipVerify: *skipVerify, ClientAuth: tls.NoClientCert})
86+
87+
connOpts.OnConnect = func(c MQTT.Client) {
88+
if token := c.Subscribe(*topic, byte(*qos), onMessageReceived); token.Wait() && token.Error() != nil {
89+
panic(token.Error())
90+
}
91+
}
92+
93+
// Illustrates customized TLS configuration prior to connection attempt
94+
connOpts.OnConnectAttempt = func(broker *url.URL, tlsCfg *tls.Config) *tls.Config {
95+
cfg := tlsCfg.Clone()
96+
cfg.ServerName = broker.Hostname()
97+
return cfg
98+
}
99+
100+
client := MQTT.NewClient(connOpts)
101+
if token := client.Connect(); token.Wait() && token.Error() != nil {
102+
panic(token.Error())
103+
} else {
104+
fmt.Printf("Connected to %s\n", *server)
105+
}
106+
107+
<-c
108+
}

netconn.go

+4-14
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ import (
3030
// This just establishes the network connection; once established the type of connection should be irrelevant
3131
//
3232

33-
// openConnection opens a network connection using the protocol indicated in the URL. Does not carry out any MQTT specific handshakes
33+
// openConnection opens a network connection using the protocol indicated in the URL.
34+
// Does not carry out any MQTT specific handshakes.
3435
func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, headers http.Header, websocketOptions *WebsocketOptions) (net.Conn, error) {
3536
switch uri.Scheme {
3637
case "ws":
@@ -77,26 +78,15 @@ func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, heade
7778
return nil, err
7879
}
7980

80-
tlsConn := tls.Client(conn, tlsConfigWithSni(uri, tlsc))
81+
tlsConn := tls.Client(conn, tlsc)
8182

8283
err = tlsConn.Handshake()
8384
if err != nil {
84-
conn.Close()
85+
_ = conn.Close()
8586
return nil, err
8687
}
8788

8889
return tlsConn, nil
8990
}
9091
return nil, errors.New("unknown protocol")
9192
}
92-
93-
func tlsConfigWithSni(uri *url.URL, conf *tls.Config) *tls.Config {
94-
tlsConfig := conf
95-
if tlsConfig.ServerName == "" {
96-
// Ensure SNI is set appropriately - make a copy to avoid polluting argument or default.
97-
c := tlsConfig.Clone()
98-
c.ServerName = uri.Hostname()
99-
tlsConfig = c
100-
}
101-
return tlsConfig
102-
}

options.go

+12
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ type OnConnectHandler func(Client)
4949
// the initial connection is lost
5050
type ReconnectHandler func(Client, *ClientOptions)
5151

52+
// ConnectionAttemptHandler is invoked prior to making the initial connection.
53+
type ConnectionAttemptHandler func(broker *url.URL, tlsCfg *tls.Config) *tls.Config
54+
5255
// ClientOptions contains configurable options for an Client. Note that these should be set using the
5356
// relevant methods (e.g. AddBroker) rather than directly. See those functions for information on usage.
5457
type ClientOptions struct {
@@ -79,6 +82,7 @@ type ClientOptions struct {
7982
OnConnect OnConnectHandler
8083
OnConnectionLost ConnectionLostHandler
8184
OnReconnecting ReconnectHandler
85+
OnConnectAttempt ConnectionAttemptHandler
8286
WriteTimeout time.Duration
8387
MessageChannelDepth uint
8488
ResumeSubs bool
@@ -120,6 +124,7 @@ func NewClientOptions() *ClientOptions {
120124
Store: nil,
121125
OnConnect: nil,
122126
OnConnectionLost: DefaultConnectionLostHandler,
127+
OnConnectAttempt: nil,
123128
WriteTimeout: 0, // 0 represents timeout disabled
124129
ResumeSubs: false,
125130
HTTPHeaders: make(map[string][]string),
@@ -321,6 +326,13 @@ func (o *ClientOptions) SetReconnectingHandler(cb ReconnectHandler) *ClientOptio
321326
return o
322327
}
323328

329+
// SetConnectionAttemptHandler sets the ConnectionAttemptHandler callback to be executed prior
330+
// to the client attempting initial connection to the MQTT broker.
331+
func (o *ClientOptions) SetConnectionAttemptHandler(onConnectAttempt ConnectionAttemptHandler) *ClientOptions {
332+
o.OnConnectAttempt = onConnectAttempt
333+
return o
334+
}
335+
324336
// SetWriteTimeout puts a limit on how long a mqtt publish should block until it unblocks with a
325337
// timeout error. A duration of 0 never times out. Default never times out
326338
func (o *ClientOptions) SetWriteTimeout(t time.Duration) *ClientOptions {

0 commit comments

Comments
 (0)