Skip to content

Commit 5feb1fa

Browse files
author
SAITO Masataka
committed
Fix the issue about renumbering a connection id in GTM proxy.
When a backend disconnect from a GTM proxy, the proxy renumbers a connection id of the connection located in last slot of a connection array. GTM proxy doesn't notify that action to a GTM. Then a GTM could release unrelated transaction ids.
1 parent 5d77fc8 commit 5feb1fa

File tree

3 files changed

+81
-15
lines changed

3 files changed

+81
-15
lines changed

src/gtm/proxy/proxy_main.c

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,9 @@ static int ServerLoop(void);
136136
static int initMasks(fd_set *rmask);
137137
void *GTMProxy_ThreadMain(void *argp);
138138
static int GTMProxyAddConnection(Port *port);
139+
static GTMProxy_ConnectionInfo *GTMProxy_GetConnInfo(GTMProxy_ThreadInfo *thrinfo,
140+
GTMProxy_ConnID con_id);
141+
static int GTMProxy_GetConnInfoIndex(GTMProxy_ThreadInfo *thrinfo, GTMProxy_ConnID con_id);
139142
static int ReadCommand(GTMProxy_ConnectionInfo *conninfo, StringInfo inBuf);
140143
static void GTMProxy_HandshakeConnection(GTMProxy_ConnectionInfo *conninfo);
141144
static void GTMProxy_HandleDisconnect(GTMProxy_ConnectionInfo *conninfo, GTM_Conn *gtm_conn);
@@ -1554,6 +1557,27 @@ GTMProxyAddConnection(Port *port)
15541557
return STATUS_OK;
15551558
}
15561559

1560+
/* Convert a connection id to a index in GTMProxy_ThreadInfo::thr_all_conns */
1561+
static int
1562+
GTMProxy_GetConnInfoIndex(GTMProxy_ThreadInfo *thrinfo, GTMProxy_ConnID con_id)
1563+
{
1564+
if (con_id == InvalidGTMProxyConnID)
1565+
return -1;
1566+
return thrinfo->thr_conid2idx[con_id];
1567+
}
1568+
1569+
static GTMProxy_ConnectionInfo *
1570+
GTMProxy_GetConnInfo(GTMProxy_ThreadInfo *thrinfo, GTMProxy_ConnID con_id)
1571+
{
1572+
int con_idx;
1573+
1574+
con_idx = GTMProxy_GetConnInfoIndex(thrinfo, con_id);
1575+
if (con_idx < 0)
1576+
return NULL;
1577+
1578+
return thrinfo->thr_all_conns[con_idx];
1579+
}
1580+
15571581
void
15581582
ProcessCommand(GTMProxy_ConnectionInfo *conninfo, GTM_Conn *gtm_conn,
15591583
StringInfo input_message)
@@ -1684,7 +1708,9 @@ HandleGTMError(GTM_Conn *gtm_conn)
16841708
static GTM_Conn *
16851709
HandlePostCommand(GTMProxy_ConnectionInfo *conninfo, GTM_Conn *gtm_conn)
16861710
{
1687-
int connIdx = conninfo->con_id;
1711+
int connIdx;
1712+
1713+
connIdx = GTMProxy_GetConnInfoIndex(GetMyThreadInfo, conninfo->con_id);
16881714

16891715
Assert(conninfo && gtm_conn);
16901716
/*
@@ -1896,12 +1922,15 @@ ProcessResponse(GTMProxy_ThreadInfo *thrinfo, GTMProxy_CommandInfo *cmdinfo,
18961922
case MSG_SEQUENCE_CLOSE:
18971923
case MSG_SEQUENCE_RENAME:
18981924
case MSG_SEQUENCE_ALTER:
1899-
if ((res->gr_proxyhdr.ph_conid == InvalidGTMProxyConnID) ||
1900-
(res->gr_proxyhdr.ph_conid >= GTM_PROXY_MAX_CONNECTIONS) ||
1901-
(thrinfo->thr_all_conns[res->gr_proxyhdr.ph_conid] != cmdinfo->ci_conn))
19021925
{
1903-
ReleaseCmdBackup(cmdinfo);
1904-
elog(PANIC, "Invalid response or synchronization loss");
1926+
GTMProxy_ConnID con_id = res->gr_proxyhdr.ph_conid;
1927+
if ((con_id == InvalidGTMProxyConnID) ||
1928+
(con_id >= GTM_PROXY_MAX_CONNECTIONS) ||
1929+
(GTMProxy_GetConnInfo(thrinfo, con_id) != cmdinfo->ci_conn))
1930+
{
1931+
ReleaseCmdBackup(cmdinfo);
1932+
elog(PANIC, "Invalid response or synchronization loss");
1933+
}
19051934
}
19061935

19071936
/*
@@ -1975,9 +2004,10 @@ static int
19752004
ReadCommand(GTMProxy_ConnectionInfo *conninfo, StringInfo inBuf)
19762005
{
19772006
int qtype;
1978-
int connIdx = conninfo->con_id;
2007+
int connIdx;
19792008
int anyBackup;
19802009

2010+
connIdx = GTMProxy_GetConnInfoIndex(GetMyThreadInfo, conninfo->con_id);
19812011
anyBackup = (GetMyThreadInfo->thr_any_backup[connIdx] ? TRUE : FALSE);
19822012

19832013
/*
@@ -3327,7 +3357,9 @@ ConnectGTM(void)
33273357
*/
33283358
static void ReleaseCmdBackup(GTMProxy_CommandInfo *cmdinfo)
33293359
{
3330-
GTMProxy_ConnID connIdx = cmdinfo->ci_conn->con_id;
3360+
GTMProxy_ConnID connIdx;
3361+
3362+
connIdx = GTMProxy_GetConnInfoIndex(GetMyThreadInfo, cmdinfo->ci_conn->con_id);
33313363

33323364
GetMyThreadInfo->thr_any_backup[connIdx] = FALSE;
33333365
GetMyThreadInfo->thr_qtype[connIdx] = 0;

src/gtm/proxy/proxy_thread.c

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,9 @@ GTMProxy_ThreadCreate(void *(* startroutine)(void *), int idx)
144144
GTM_MutexLockInit(&thrinfo->thr_lock);
145145
GTM_CVInit(&thrinfo->thr_cv);
146146

147+
/* Initialize mapping to be unassigned. */
148+
memset(thrinfo->thr_conid2idx, 0xff, sizeof(thrinfo->thr_conid2idx));
149+
147150
/*
148151
* Initialize communication area with SIGUSR2 signal handler (reconnect)
149152
*/
@@ -331,6 +334,7 @@ GTMProxy_ThreadMainWrapper(void *argp)
331334
GTMProxy_ThreadInfo *
332335
GTMProxy_ThreadAddConnection(GTMProxy_ConnectionInfo *conninfo)
333336
{
337+
int con_id = -1, con_idx = 0;
334338
GTMProxy_ThreadInfo *thrinfo = NULL;
335339

336340
/*
@@ -368,15 +372,27 @@ GTMProxy_ThreadAddConnection(GTMProxy_ConnectionInfo *conninfo)
368372
elog(ERROR, "Too many connections");
369373
}
370374

375+
/* Find unassigned connection id in this worker thread. */
376+
for (con_id = 0; con_id < GTM_PROXY_MAX_CONNECTIONS; con_id++)
377+
if (thrinfo->thr_conid2idx[con_id] == -1)
378+
break;
379+
380+
if (con_id >= GTM_PROXY_MAX_CONNECTIONS) {
381+
GTM_MutexLockRelease(&thrinfo->thr_lock);
382+
elog(ERROR, "Unassigned connection id not found.");
383+
}
384+
371385
/*
372386
* Save the array slotid in the conninfo structure. We send this to the GTM
373387
* server as an identifier which the GTM server sends us back in the
374388
* response. We use that information to route the response back to the
375389
* approrpiate connection
376390
*/
377-
conninfo->con_id = thrinfo->thr_conn_count;
378-
thrinfo->thr_all_conns[thrinfo->thr_conn_count] = conninfo;
379-
thrinfo->thr_conn_count++;
391+
con_idx = thrinfo->thr_conn_count++;
392+
conninfo->con_id = con_id;
393+
thrinfo->thr_conid2idx[con_id] = con_idx;
394+
thrinfo->thr_all_conns[con_idx] = conninfo;
395+
elog(DEBUG5, "Assigned a connection id to new connection: id = %d, index = %d", con_id, con_idx);
380396

381397
/*
382398
* Now increment the seqno since a new connection is added to the array.
@@ -432,20 +448,35 @@ GTMProxy_ThreadRemoveConnection(GTMProxy_ThreadInfo *thrinfo, GTMProxy_Connectio
432448
thrinfo->thr_qtype[ii] = 0;
433449
resetStringInfo(&(thrinfo->thr_inBufData[ii]));
434450

451+
/* Release connection id */
452+
if (conninfo->con_id != InvalidGTMProxyConnID)
453+
{
454+
thrinfo->thr_conid2idx[conninfo->con_id] = -1;
455+
elog(DEBUG5, "Released connection id %d", conninfo->con_id);
456+
}
457+
435458
/*
436459
* If this is the last entry in the array ? If not, then copy the last
437460
* entry in this slot and mark the last slot an empty
438461
*/
439462
if ((ii + 1) < thrinfo->thr_conn_count)
440463
{
464+
GTMProxy_ConnectionInfo *ci_moved;
465+
int last_idx;
466+
467+
/* Pick up last slot */
468+
last_idx = thrinfo->thr_conn_count - 1;
469+
ci_moved = thrinfo->thr_all_conns[last_idx];
470+
441471
/* Copy the last entry in this slot */
442-
thrinfo->thr_all_conns[ii] = thrinfo->thr_all_conns[thrinfo->thr_conn_count - 1];
472+
thrinfo->thr_all_conns[ii] = ci_moved;
443473

444474
/* Mark the last slot free */
445-
thrinfo->thr_all_conns[thrinfo->thr_conn_count - 1] = NULL;
475+
thrinfo->thr_all_conns[last_idx] = NULL;
446476

447-
/* Adjust the con_id to reflect the current slot in the array */
448-
thrinfo->thr_all_conns[ii]->con_id = ii;
477+
/* Adjust the mapping to reflect the current slot in the array */
478+
if (ci_moved->con_id != InvalidGTMProxyConnID)
479+
thrinfo->thr_conid2idx[ci_moved->con_id] = ii;
449480
}
450481
else
451482
{

src/include/gtm/gtm_proxy.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,9 @@ typedef struct GTMProxy_ThreadInfo
111111
GTMProxy_ConnectionInfo *thr_all_conns[GTM_PROXY_MAX_CONNECTIONS];
112112
struct pollfd thr_poll_fds[GTM_PROXY_MAX_CONNECTIONS];
113113

114+
/* map info from ConnectionInfo->con_id to array index of thr_all_conns */
115+
int16 thr_conid2idx[GTM_PROXY_MAX_CONNECTIONS];
116+
114117
/* Command backup */
115118
short thr_any_backup[GTM_PROXY_MAX_CONNECTIONS];
116119
int thr_qtype[GTM_PROXY_MAX_CONNECTIONS];

0 commit comments

Comments
 (0)