Skip to content

Commit 5317d30

Browse files
authored
feat(coderd): add experimental tasks send endpoint (coder#19941)
Fixes coder/internal#902
1 parent 615585d commit 5317d30

File tree

5 files changed

+533
-2
lines changed

5 files changed

+533
-2
lines changed

coderd/aitasks.go

Lines changed: 292 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,25 @@
11
package coderd
22

33
import (
4+
"bytes"
45
"context"
56
"database/sql"
7+
"encoding/json"
68
"errors"
79
"fmt"
10+
"io"
11+
"net"
812
"net/http"
13+
"net/url"
14+
"path"
915
"slices"
1016
"strings"
17+
"time"
1118

1219
"github.com/go-chi/chi/v5"
1320
"github.com/google/uuid"
1421

1522
"cdr.dev/slog"
16-
1723
"github.com/coder/coder/v2/coderd/audit"
1824
"github.com/coder/coder/v2/coderd/database"
1925
"github.com/coder/coder/v2/coderd/httpapi"
@@ -590,3 +596,288 @@ func (api *API) taskDelete(rw http.ResponseWriter, r *http.Request) {
590596
// Delete build created successfully.
591597
rw.WriteHeader(http.StatusAccepted)
592598
}
599+
600+
// taskSend submits task input to the tasks sidebar app by dialing the agent
601+
// directly over the tailnet. We enforce ApplicationConnect RBAC on the
602+
// workspace and validate the sidebar app health.
603+
func (api *API) taskSend(rw http.ResponseWriter, r *http.Request) {
604+
ctx := r.Context()
605+
606+
idStr := chi.URLParam(r, "id")
607+
taskID, err := uuid.Parse(idStr)
608+
if err != nil {
609+
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
610+
Message: fmt.Sprintf("Invalid UUID %q for task ID.", idStr),
611+
})
612+
return
613+
}
614+
615+
var req codersdk.TaskSendRequest
616+
if !httpapi.Read(ctx, rw, r, &req) {
617+
return
618+
}
619+
if req.Input == "" {
620+
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
621+
Message: "Task input is required.",
622+
})
623+
return
624+
}
625+
626+
if err = api.authAndDoWithTaskSidebarAppClient(r, taskID, func(ctx context.Context, client *http.Client, appURL *url.URL) error {
627+
status, err := agentapiDoStatusRequest(ctx, client, appURL)
628+
if err != nil {
629+
return err
630+
}
631+
632+
if status != "stable" {
633+
return httperror.NewResponseError(http.StatusBadGateway, codersdk.Response{
634+
Message: "Task app is not ready to accept input.",
635+
Detail: fmt.Sprintf("Status: %s", status),
636+
})
637+
}
638+
639+
var reqBody struct {
640+
Content string `json:"content"`
641+
Type string `json:"type"`
642+
}
643+
reqBody.Content = req.Input
644+
reqBody.Type = "user"
645+
646+
req, err := agentapiNewRequest(ctx, http.MethodPost, appURL, "message", reqBody)
647+
if err != nil {
648+
return err
649+
}
650+
651+
resp, err := client.Do(req)
652+
if err != nil {
653+
return httperror.NewResponseError(http.StatusBadGateway, codersdk.Response{
654+
Message: "Failed to reach task app endpoint.",
655+
Detail: err.Error(),
656+
})
657+
}
658+
defer resp.Body.Close()
659+
660+
if resp.StatusCode != http.StatusOK {
661+
body, _ := io.ReadAll(io.LimitReader(resp.Body, 128))
662+
return httperror.NewResponseError(http.StatusBadGateway, codersdk.Response{
663+
Message: "Task app rejected the message.",
664+
Detail: fmt.Sprintf("Upstream status: %d; Body: %s", resp.StatusCode, body),
665+
})
666+
}
667+
668+
// {"$schema":"http://localhost:3284/schemas/MessageResponseBody.json","ok":true}
669+
// {"$schema":"http://localhost:3284/schemas/ErrorModel.json","title":"Unprocessable Entity","status":422,"detail":"validation failed","errors":[{"location":"body.type","value":"oof"}]}
670+
var respBody map[string]any
671+
if err := json.NewDecoder(resp.Body).Decode(&respBody); err != nil {
672+
return httperror.NewResponseError(http.StatusBadGateway, codersdk.Response{
673+
Message: "Failed to decode task app response body.",
674+
Detail: err.Error(),
675+
})
676+
}
677+
678+
if v, ok := respBody["status"].(string); !ok || v != "ok" {
679+
return httperror.NewResponseError(http.StatusBadGateway, codersdk.Response{
680+
Message: "Task app rejected the message.",
681+
Detail: fmt.Sprintf("Upstream response: %v", respBody),
682+
})
683+
}
684+
685+
return nil
686+
}); err != nil {
687+
httperror.WriteResponseError(ctx, rw, err)
688+
return
689+
}
690+
691+
rw.WriteHeader(http.StatusNoContent)
692+
}
693+
694+
// authAndDoWithTaskSidebarAppClient centralizes the shared logic to:
695+
//
696+
// - Fetch the task workspace
697+
// - Authorize ApplicationConnect on the workspace
698+
// - Validate the AI task and sidebar app health
699+
// - Dial the agent and construct an HTTP client to the apps loopback URL
700+
//
701+
// The provided callback receives the context, an HTTP client that dials via the
702+
// agent, and the base app URL (as a value URL) to perform any request.
703+
func (api *API) authAndDoWithTaskSidebarAppClient(
704+
r *http.Request,
705+
taskID uuid.UUID,
706+
do func(ctx context.Context, client *http.Client, appURL *url.URL) error,
707+
) error {
708+
ctx := r.Context()
709+
710+
workspaceID := taskID
711+
workspace, err := api.Database.GetWorkspaceByID(ctx, workspaceID)
712+
if err != nil {
713+
if httpapi.Is404Error(err) {
714+
return httperror.ErrResourceNotFound
715+
}
716+
return httperror.NewResponseError(http.StatusInternalServerError, codersdk.Response{
717+
Message: "Internal error fetching workspace.",
718+
Detail: err.Error(),
719+
})
720+
}
721+
722+
// Connecting to applications requires ApplicationConnect on the workspace.
723+
if !api.Authorize(r, policy.ActionApplicationConnect, workspace) {
724+
return httperror.ErrResourceNotFound
725+
}
726+
727+
data, err := api.workspaceData(ctx, []database.Workspace{workspace})
728+
if err != nil {
729+
return httperror.NewResponseError(http.StatusInternalServerError, codersdk.Response{
730+
Message: "Internal error fetching workspace resources.",
731+
Detail: err.Error(),
732+
})
733+
}
734+
if len(data.builds) == 0 || len(data.templates) == 0 {
735+
return httperror.ErrResourceNotFound
736+
}
737+
build := data.builds[0]
738+
if build.HasAITask == nil || !*build.HasAITask || build.AITaskSidebarAppID == nil || *build.AITaskSidebarAppID == uuid.Nil {
739+
return httperror.NewResponseError(http.StatusBadRequest, codersdk.Response{
740+
Message: "Task is not configured with a sidebar app.",
741+
})
742+
}
743+
744+
// Find the sidebar app details to get the URL and validate app health.
745+
sidebarAppID := *build.AITaskSidebarAppID
746+
agentID, sidebarApp, ok := func() (uuid.UUID, codersdk.WorkspaceApp, bool) {
747+
for _, res := range build.Resources {
748+
for _, agent := range res.Agents {
749+
for _, app := range agent.Apps {
750+
if app.ID == sidebarAppID {
751+
return agent.ID, app, true
752+
}
753+
}
754+
}
755+
}
756+
return uuid.Nil, codersdk.WorkspaceApp{}, false
757+
}()
758+
if !ok {
759+
return httperror.NewResponseError(http.StatusBadRequest, codersdk.Response{
760+
Message: "Task sidebar app not found in latest build.",
761+
})
762+
}
763+
764+
// Return an informative error if the app isn't healthy rather than trying
765+
// and failing.
766+
switch sidebarApp.Health {
767+
case codersdk.WorkspaceAppHealthDisabled:
768+
// No health check, pass through.
769+
case codersdk.WorkspaceAppHealthInitializing:
770+
return httperror.NewResponseError(http.StatusServiceUnavailable, codersdk.Response{
771+
Message: "Task sidebar app is initializing. Try again shortly.",
772+
})
773+
case codersdk.WorkspaceAppHealthUnhealthy:
774+
return httperror.NewResponseError(http.StatusServiceUnavailable, codersdk.Response{
775+
Message: "Task sidebar app is unhealthy.",
776+
})
777+
}
778+
779+
// Build the direct app URL and dial the agent.
780+
if sidebarApp.URL == "" {
781+
return httperror.NewResponseError(http.StatusInternalServerError, codersdk.Response{
782+
Message: "Task sidebar app URL is not configured.",
783+
})
784+
}
785+
parsedURL, err := url.Parse(sidebarApp.URL)
786+
if err != nil {
787+
return httperror.NewResponseError(http.StatusInternalServerError, codersdk.Response{
788+
Message: "Internal error parsing task app URL.",
789+
Detail: err.Error(),
790+
})
791+
}
792+
if parsedURL.Scheme != "http" {
793+
return httperror.NewResponseError(http.StatusBadRequest, codersdk.Response{
794+
Message: "Only http scheme is supported for direct agent-dial.",
795+
})
796+
}
797+
798+
dialCtx, dialCancel := context.WithTimeout(ctx, time.Second*30)
799+
defer dialCancel()
800+
agentConn, release, err := api.agentProvider.AgentConn(dialCtx, agentID)
801+
if err != nil {
802+
return httperror.NewResponseError(http.StatusBadGateway, codersdk.Response{
803+
Message: "Failed to reach task app endpoint.",
804+
Detail: err.Error(),
805+
})
806+
}
807+
defer release()
808+
809+
client := &http.Client{
810+
Transport: &http.Transport{
811+
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
812+
return agentConn.DialContext(ctx, network, addr)
813+
},
814+
},
815+
}
816+
return do(ctx, client, parsedURL)
817+
}
818+
819+
func agentapiNewRequest(ctx context.Context, method string, appURL *url.URL, appURLPath string, body any) (*http.Request, error) {
820+
u := *appURL
821+
u.Path = path.Join(appURL.Path, appURLPath)
822+
823+
var bodyReader io.Reader
824+
if body != nil {
825+
b, err := json.Marshal(body)
826+
if err != nil {
827+
return nil, httperror.NewResponseError(http.StatusBadRequest, codersdk.Response{
828+
Message: "Failed to marshal task app request body.",
829+
Detail: err.Error(),
830+
})
831+
}
832+
bodyReader = bytes.NewReader(b)
833+
}
834+
835+
req, err := http.NewRequestWithContext(ctx, method, u.String(), bodyReader)
836+
if err != nil {
837+
return nil, httperror.NewResponseError(http.StatusBadRequest, codersdk.Response{
838+
Message: "Failed to create task app request.",
839+
Detail: err.Error(),
840+
})
841+
}
842+
req.Header.Set("Content-Type", "application/json")
843+
req.Header.Set("Accept", "application/json")
844+
845+
return req, nil
846+
}
847+
848+
func agentapiDoStatusRequest(ctx context.Context, client *http.Client, appURL *url.URL) (string, error) {
849+
req, err := agentapiNewRequest(ctx, http.MethodGet, appURL, "status", nil)
850+
if err != nil {
851+
return "", err
852+
}
853+
854+
resp, err := client.Do(req)
855+
if err != nil {
856+
return "", httperror.NewResponseError(http.StatusBadGateway, codersdk.Response{
857+
Message: "Failed to reach task app endpoint.",
858+
Detail: err.Error(),
859+
})
860+
}
861+
defer resp.Body.Close()
862+
863+
if resp.StatusCode != http.StatusOK {
864+
return "", httperror.NewResponseError(http.StatusBadGateway, codersdk.Response{
865+
Message: "Task app status returned an error.",
866+
Detail: fmt.Sprintf("Status code: %d", resp.StatusCode),
867+
})
868+
}
869+
870+
// {"$schema":"http://localhost:3284/schemas/StatusResponseBody.json","status":"stable"}
871+
var respBody struct {
872+
Status string `json:"status"`
873+
}
874+
875+
if err := json.NewDecoder(resp.Body).Decode(&respBody); err != nil {
876+
return "", httperror.NewResponseError(http.StatusBadGateway, codersdk.Response{
877+
Message: "Failed to decode task app status response body.",
878+
Detail: err.Error(),
879+
})
880+
}
881+
882+
return respBody.Status, nil
883+
}

0 commit comments

Comments
 (0)