diff --git a/handlers.go b/handlers.go index cdf2ca8a..6a0ef7d7 100644 --- a/handlers.go +++ b/handlers.go @@ -12,11 +12,13 @@ import ( "os/exec" "path/filepath" "strconv" + "strings" "syscall" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/kardianos/osext" "github.com/kr/pty" + nats "github.com/nats-io/go-nats" "github.com/pkg/errors" ) @@ -230,6 +232,43 @@ func SketchCB(status *Status) mqtt.MessageHandler { } } +func NatsCloudCB(s *Status) nats.MsgHandler { + return func(m *nats.Msg) { + pidStr := strings.TrimPrefix(m.Subject, "$arduino.cloud.") + pid, err := strconv.Atoi(pidStr) + if err != nil { + return + } + + sketchName, err := sketchNameForPid(pid, s) + if err != nil { + return + } + + updateMessage := fmt.Sprintf("{\"state\": {\"reported\": { \"%s\": %s}}}", sketchName, string(m.Data)) + + s.mqttClient.Publish("$aws/things/"+s.id+"/shadow/update", 1, false, updateMessage) + } +} + +// maps a PID to a sketch name +func sketchNameForPid(pid int, status *Status) (string, error) { + sketchName := "" + + for _, sketch := range status.Sketches { + if sketch.PID == pid { + sketchName = sketch.Name + break + } + } + + if sketchName == "" { + return "", errors.New("Unknown PID") + } else { + return sketchName, nil + } +} + // downloadfile substitute a file with something that downloads from an url func downloadFile(filepath, url, token string) error { // Create the file - remove the existing one if it exists diff --git a/main.go b/main.go index 99d10301..69d2e3a1 100644 --- a/main.go +++ b/main.go @@ -15,6 +15,7 @@ import ( "github.com/namsral/flag" logger "github.com/nats-io/gnatsd/logger" server "github.com/nats-io/gnatsd/server" + nats "github.com/nats-io/go-nats" "github.com/pkg/errors" ) @@ -87,11 +88,15 @@ func (p program) run() { newroutes, err := server.RemoveSelfReference(opts.Cluster.Port, opts.Routes) opts.Routes = newroutes s := server.New(&opts) - configureLogger(s, &opts) + configureNatsdLogger(s, &opts) go s.Start() + if !s.ReadyForConnections(1 * time.Second) { + log.Fatal("NATS server not redy for connections!") + } + // Setup MQTT connection - client, err := setupMQTTConnection("certificate.pem", "certificate.key", p.Config.ID, p.Config.URL) + mqttClient, err := setupMQTTConnection("certificate.pem", "certificate.key", p.Config.ID, p.Config.URL) if err != nil { // if installing in a chroot the paths may be wrong and the installer may fail. // Don't report it as an error @@ -100,16 +105,24 @@ func (p program) run() { log.Println("Connected to MQTT") // Create global status - status := NewStatus(p.Config.ID, client) + status := NewStatus(p.Config.ID, mqttClient) if p.listenFile != "" { go tailAndReport(p.listenFile, status) } + // Start nats-client for local server + nc, err := nats.Connect(nats.DefaultURL) + check(err, "ConnectNATS") + nc.Subscribe("$arduino.cloud.*", NatsCloudCB(status)) + + // wipe the thing shadows + mqttClient.Publish("$aws/things/"+p.Config.ID+"/shadow/delete", 1, false, "") + // Subscribe to topics endpoint - client.Subscribe("$aws/things/"+p.Config.ID+"/status/post", 1, StatusCB(status)) - client.Subscribe("$aws/things/"+p.Config.ID+"/upload/post", 1, UploadCB(status)) - client.Subscribe("$aws/things/"+p.Config.ID+"/sketch/post", 1, SketchCB(status)) + mqttClient.Subscribe("$aws/things/"+p.Config.ID+"/status/post", 1, StatusCB(status)) + mqttClient.Subscribe("$aws/things/"+p.Config.ID+"/upload/post", 1, UploadCB(status)) + mqttClient.Subscribe("$aws/things/"+p.Config.ID+"/sketch/post", 1, SketchCB(status)) sketchFolder, err := GetSketchFolder() // Export LD_LIBRARY_PATH to local lib subfolder @@ -211,7 +224,7 @@ func setupMQTTConnection(cert, key, id, url string) (mqtt.Client, error) { return mqttClient, nil } -func configureLogger(s *server.Server, opts *server.Options) { +func configureNatsdLogger(s *server.Server, opts *server.Options) { var log server.Logger colors := true // Check to see if stderr is being redirected and if so turn off color diff --git a/status.go b/status.go index 01596bfe..8da1331c 100644 --- a/status.go +++ b/status.go @@ -10,22 +10,22 @@ import ( // Status contains info about the sketches running on the device type Status struct { - id string - client mqtt.Client - Sketches map[string]*SketchStatus `json:"sketches"` + id string + mqttClient mqtt.Client + Sketches map[string]*SketchStatus `json:"sketches"` } // Status contains info about the sketches running on the device type StatusTemp struct { - id string - client mqtt.Client - Sketches map[string]SketchStatus `json:"sketches"` + id string + mqttClient mqtt.Client + Sketches map[string]SketchStatus `json:"sketches"` } func ExpandStatus(s *Status) *StatusTemp { var temp StatusTemp temp.id = s.id - temp.client = s.client + temp.mqttClient = s.mqttClient temp.Sketches = make(map[string]SketchStatus) for _, element := range s.Sketches { temp.Sketches[element.Name] = *element @@ -55,11 +55,11 @@ type Endpoint struct { } // NewStatus creates a new status that publishes on a topic -func NewStatus(id string, client mqtt.Client) *Status { +func NewStatus(id string, mqttClient mqtt.Client) *Status { return &Status{ - id: id, - client: client, - Sketches: map[string]*SketchStatus{}, + id: id, + mqttClient: mqttClient, + Sketches: map[string]*SketchStatus{}, } } @@ -72,20 +72,20 @@ func (s *Status) Set(name string, sketch *SketchStatus) { panic(err) // Means that something went really wrong } - if token := s.client.Publish("/status", 1, false, msg); token.Wait() && token.Error() != nil { + if token := s.mqttClient.Publish("/status", 1, false, msg); token.Wait() && token.Error() != nil { panic(err) // Means that something went really wrong } } // Error logs an error on the specified topic func (s *Status) Error(topic string, err error) { - token := s.client.Publish("$aws/things/"+s.id+topic, 1, false, "ERROR: "+err.Error()) + token := s.mqttClient.Publish("$aws/things/"+s.id+topic, 1, false, "ERROR: "+err.Error()) token.Wait() } // Info logs a message on the specified topic func (s *Status) Info(topic, msg string) { - token := s.client.Publish("$aws/things/"+s.id+topic, 1, false, "INFO: "+msg) + token := s.mqttClient.Publish("$aws/things/"+s.id+topic, 1, false, "INFO: "+msg) token.Wait() }