Skip to content

Commit 05b037b

Browse files
authored
fix: avoid deadlock race writing to a disconnected mapper (coder#20303)
fixes coder/internal#1045 Fixes a race condition in our PG Coordinator when a peer disconnects. We issue database queries to find the peer mappings (node structures for each peer connected via a tunnel), and then send these to the "mapper" that generates diffs and eventually writes the update to the websocket. Before this change we erroneously used the querier's context for this update, which has the same lifetime as the coordinator itself. If the peer has disconnected, the mapper might not be reading from its channel, and this causes a deadlock in a querier worker. This also prevents us from doing any more work on the peer. I also added some more debug logging that would have been helpful when tracking this down.
1 parent 3699ff6 commit 05b037b

File tree

1 file changed

+8
-3
lines changed

1 file changed

+8
-3
lines changed

enterprise/tailnet/pgcoord.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -873,9 +873,11 @@ func (q *querier) handleIncoming() {
873873
return
874874

875875
case c := <-q.newConnections:
876+
q.logger.Debug(q.ctx, "new connection received", slog.F("peer_id", c.UniqueID()))
876877
q.newConn(c)
877878

878879
case c := <-q.closeConnections:
880+
q.logger.Debug(q.ctx, "connection close request", slog.F("peer_id", c.UniqueID()))
879881
q.cleanupConn(c)
880882
}
881883
}
@@ -902,7 +904,8 @@ func (q *querier) newConn(c *connIO) {
902904
mk := mKey(c.UniqueID())
903905
dup, ok := q.mappers[mk]
904906
if ok {
905-
// duplicate, overwrite and close the old one
907+
q.logger.Debug(q.ctx, "duplicate mapper found; closing old connection", slog.F("peer_id", dup.c.UniqueID()))
908+
// overwrite and close the old one
906909
atomic.StoreInt64(&c.overwrites, dup.c.Overwrites()+1)
907910
err := dup.c.CoordinatorClose()
908911
if err != nil {
@@ -913,6 +916,7 @@ func (q *querier) newConn(c *connIO) {
913916
q.workQ.enqueue(querierWorkKey{
914917
mappingQuery: mk,
915918
})
919+
q.logger.Debug(q.ctx, "added new mapper", slog.F("peer_id", c.UniqueID()))
916920
}
917921

918922
func (q *querier) isHealthy() bool {
@@ -940,11 +944,12 @@ func (q *querier) cleanupConn(c *connIO) {
940944
logger.Error(q.ctx, "failed to close connIO", slog.Error(err))
941945
}
942946
delete(q.mappers, mk)
943-
q.logger.Debug(q.ctx, "removed mapper")
947+
q.logger.Debug(q.ctx, "removed mapper", slog.F("peer_id", c.UniqueID()))
944948
}
945949

946950
func (q *querier) worker() {
947951
defer q.wg.Done()
952+
defer q.logger.Debug(q.ctx, "worker exited")
948953
eb := backoff.NewExponentialBackOff()
949954
eb.MaxElapsedTime = 0 // retry indefinitely
950955
eb.MaxInterval = dbMaxBackoff
@@ -1019,7 +1024,7 @@ func (q *querier) mappingQuery(peer mKey) error {
10191024
return nil
10201025
}
10211026
logger.Debug(q.ctx, "sending mappings", slog.F("mapping_len", len(mappings)))
1022-
return agpl.SendCtx(q.ctx, mpr.mappings, mappings)
1027+
return agpl.SendCtx(mpr.ctx, mpr.mappings, mappings)
10231028
}
10241029

10251030
func (q *querier) bindingsToMappings(bindings []database.GetTailnetTunnelPeerBindingsRow) ([]mapping, error) {

0 commit comments

Comments
 (0)