Skip to content
This repository was archived by the owner on Mar 27, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import (
"os/exec"
"path/filepath"
"strconv"
"strings"
"syscall"

mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/kardianos/osext"
"github.com/kr/pty"
nats "github.com/nats-io/go-nats"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -230,6 +232,43 @@ func SketchCB(status *Status) mqtt.MessageHandler {
}
}

func NatsCloudCB(s *Status) nats.MsgHandler {
return func(m *nats.Msg) {
pidStr := strings.TrimPrefix(m.Subject, "$arduino.cloud.")
pid, err := strconv.Atoi(pidStr)
if err != nil {
return
}

sketchName, err := sketchNameForPid(pid, s)
if err != nil {
return
}

updateMessage := fmt.Sprintf("{\"state\": {\"reported\": { \"%s\": %s}}}", sketchName, string(m.Data))

s.mqttClient.Publish("$aws/things/"+s.id+"/shadow/update", 1, false, updateMessage)
}
}

// maps a PID to a sketch name
func sketchNameForPid(pid int, status *Status) (string, error) {
sketchName := ""

for _, sketch := range status.Sketches {
if sketch.PID == pid {
sketchName = sketch.Name
break
}
}

if sketchName == "" {
return "", errors.New("Unknown PID")
} else {
return sketchName, nil
}
}

// downloadfile substitute a file with something that downloads from an url
func downloadFile(filepath, url, token string) error {
// Create the file - remove the existing one if it exists
Expand Down
27 changes: 20 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/namsral/flag"
logger "github.com/nats-io/gnatsd/logger"
server "github.com/nats-io/gnatsd/server"
nats "github.com/nats-io/go-nats"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -87,11 +88,15 @@ func (p program) run() {
newroutes, err := server.RemoveSelfReference(opts.Cluster.Port, opts.Routes)
opts.Routes = newroutes
s := server.New(&opts)
configureLogger(s, &opts)
configureNatsdLogger(s, &opts)
go s.Start()

if !s.ReadyForConnections(1 * time.Second) {
log.Fatal("NATS server not redy for connections!")
}

// Setup MQTT connection
client, err := setupMQTTConnection("certificate.pem", "certificate.key", p.Config.ID, p.Config.URL)
mqttClient, err := setupMQTTConnection("certificate.pem", "certificate.key", p.Config.ID, p.Config.URL)
if err != nil {
// if installing in a chroot the paths may be wrong and the installer may fail.
// Don't report it as an error
Expand All @@ -100,16 +105,24 @@ func (p program) run() {
log.Println("Connected to MQTT")

// Create global status
status := NewStatus(p.Config.ID, client)
status := NewStatus(p.Config.ID, mqttClient)

if p.listenFile != "" {
go tailAndReport(p.listenFile, status)
}

// Start nats-client for local server
nc, err := nats.Connect(nats.DefaultURL)
check(err, "ConnectNATS")
nc.Subscribe("$arduino.cloud.*", NatsCloudCB(status))

// wipe the thing shadows
mqttClient.Publish("$aws/things/"+p.Config.ID+"/shadow/delete", 1, false, "")

// Subscribe to topics endpoint
client.Subscribe("$aws/things/"+p.Config.ID+"/status/post", 1, StatusCB(status))
client.Subscribe("$aws/things/"+p.Config.ID+"/upload/post", 1, UploadCB(status))
client.Subscribe("$aws/things/"+p.Config.ID+"/sketch/post", 1, SketchCB(status))
mqttClient.Subscribe("$aws/things/"+p.Config.ID+"/status/post", 1, StatusCB(status))
mqttClient.Subscribe("$aws/things/"+p.Config.ID+"/upload/post", 1, UploadCB(status))
mqttClient.Subscribe("$aws/things/"+p.Config.ID+"/sketch/post", 1, SketchCB(status))

sketchFolder, err := GetSketchFolder()
// Export LD_LIBRARY_PATH to local lib subfolder
Expand Down Expand Up @@ -211,7 +224,7 @@ func setupMQTTConnection(cert, key, id, url string) (mqtt.Client, error) {
return mqttClient, nil
}

func configureLogger(s *server.Server, opts *server.Options) {
func configureNatsdLogger(s *server.Server, opts *server.Options) {
var log server.Logger
colors := true
// Check to see if stderr is being redirected and if so turn off color
Expand Down
28 changes: 14 additions & 14 deletions status.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,22 @@ import (

// Status contains info about the sketches running on the device
type Status struct {
id string
client mqtt.Client
Sketches map[string]*SketchStatus `json:"sketches"`
id string
mqttClient mqtt.Client
Sketches map[string]*SketchStatus `json:"sketches"`
}

// Status contains info about the sketches running on the device
type StatusTemp struct {
id string
client mqtt.Client
Sketches map[string]SketchStatus `json:"sketches"`
id string
mqttClient mqtt.Client
Sketches map[string]SketchStatus `json:"sketches"`
}

func ExpandStatus(s *Status) *StatusTemp {
var temp StatusTemp
temp.id = s.id
temp.client = s.client
temp.mqttClient = s.mqttClient
temp.Sketches = make(map[string]SketchStatus)
for _, element := range s.Sketches {
temp.Sketches[element.Name] = *element
Expand Down Expand Up @@ -55,11 +55,11 @@ type Endpoint struct {
}

// NewStatus creates a new status that publishes on a topic
func NewStatus(id string, client mqtt.Client) *Status {
func NewStatus(id string, mqttClient mqtt.Client) *Status {
return &Status{
id: id,
client: client,
Sketches: map[string]*SketchStatus{},
id: id,
mqttClient: mqttClient,
Sketches: map[string]*SketchStatus{},
}
}

Expand All @@ -72,20 +72,20 @@ func (s *Status) Set(name string, sketch *SketchStatus) {
panic(err) // Means that something went really wrong
}

if token := s.client.Publish("/status", 1, false, msg); token.Wait() && token.Error() != nil {
if token := s.mqttClient.Publish("/status", 1, false, msg); token.Wait() && token.Error() != nil {
panic(err) // Means that something went really wrong
}
}

// Error logs an error on the specified topic
func (s *Status) Error(topic string, err error) {
token := s.client.Publish("$aws/things/"+s.id+topic, 1, false, "ERROR: "+err.Error())
token := s.mqttClient.Publish("$aws/things/"+s.id+topic, 1, false, "ERROR: "+err.Error())
token.Wait()
}

// Info logs a message on the specified topic
func (s *Status) Info(topic, msg string) {
token := s.client.Publish("$aws/things/"+s.id+topic, 1, false, "INFO: "+msg)
token := s.mqttClient.Publish("$aws/things/"+s.id+topic, 1, false, "INFO: "+msg)
token.Wait()
}

Expand Down