Skip to content

Commit 22fdd35

Browse files
committed
A patch to improve replicated table update/delete handling.
This commit performs updates/deletes to replicated tables based on either primary key or unique index under the following conditions 1. The replicated table has either a primary key or a unique index defined. 2. The query is not changing the primary key itself. Otherwise ctid is used, like we were using previously. The patch was submitted by Abbas Butt.
1 parent 1cff86a commit 22fdd35

File tree

15 files changed

+627
-42
lines changed

15 files changed

+627
-42
lines changed

src/backend/executor/nodeModifyTable.c

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,12 @@ ExecDelete(ItemPointer tupleid,
363363
TupleTableSlot *planSlot,
364364
EPQState *epqstate,
365365
EState *estate,
366+
#ifndef PGXC
366367
bool canSetTag)
368+
#else
369+
bool canSetTag,
370+
TupleTableSlot *pslot)
371+
#endif
367372
{
368373
ResultRelInfo *resultRelInfo;
369374
Relation resultRelationDesc;
@@ -455,7 +460,18 @@ ldelete:;
455460
#ifdef PGXC
456461
if (IS_PGXC_COORDINATOR && resultRemoteRel)
457462
{
458-
slot = ExecProcNodeDMLInXC(estate, planSlot, NULL);
463+
/*
464+
* Pass in the source plan slot, we may attributes other than ctid
465+
* in case delete is based on primay key or other unique column.
466+
* planSlot can't be passed in place of pslot here because of the
467+
* following test case failure
468+
* create table t2(a int, b int) distribute by replication;
469+
* insert into t2 values(1,2), (3,4), (5,6);
470+
* set enable_fast_query_shipping=false;
471+
* delete from t2 where a = 1;
472+
* ERROR: operator does not exist: tid = integer
473+
*/
474+
slot = ExecProcNodeDMLInXC(estate, planSlot, pslot);
459475
}
460476
else
461477
{
@@ -1007,10 +1023,11 @@ ExecModifyTable(ModifyTableState *node)
10071023
ResultRelInfo *saved_resultRelInfo;
10081024
ResultRelInfo *resultRelInfo;
10091025
PlanState *subplanstate;
1010-
#ifdef PGXC
1026+
#ifdef PGXC
10111027
PlanState *remoterelstate;
10121028
PlanState *saved_resultRemoteRel;
1013-
#endif
1029+
RemoteQuery *step = NULL;
1030+
#endif
10141031
JunkFilter *junkfilter;
10151032
TupleTableSlot *slot;
10161033
TupleTableSlot *planSlot;
@@ -1054,6 +1071,8 @@ ExecModifyTable(ModifyTableState *node)
10541071
#ifdef PGXC
10551072
/* Initialize remote plan state */
10561073
remoterelstate = node->mt_remoterels[node->mt_whichplan];
1074+
if (!IS_PGXC_DATANODE && remoterelstate != NULL)
1075+
step = (RemoteQuery *)((RemoteQueryState *)remoterelstate)->ss.ps.plan;
10571076
#endif
10581077
junkfilter = resultRelInfo->ri_junkFilter;
10591078

@@ -1177,8 +1196,14 @@ ExecModifyTable(ModifyTableState *node)
11771196
/*
11781197
* apply the junkfilter if needed.
11791198
*/
1199+
#ifndef PGXC
11801200
if (operation != CMD_DELETE)
1201+
#else
1202+
if (operation != CMD_DELETE ||
1203+
(!IS_PGXC_DATANODE && step != NULL &&
1204+
!step->rq_use_pk_for_rep_change))
11811205
slot = ExecFilterJunk(junkfilter, slot);
1206+
#endif
11821207
}
11831208
#ifdef PGXC
11841209
estate->es_result_remoterel = remoterelstate;
@@ -1194,7 +1219,11 @@ ExecModifyTable(ModifyTableState *node)
11941219
break;
11951220
case CMD_DELETE:
11961221
slot = ExecDelete(tupleid, oldtuple, planSlot,
1222+
#ifndef PGXC
11971223
&node->mt_epqstate, estate, node->canSetTag);
1224+
#else
1225+
&node->mt_epqstate, estate, node->canSetTag, slot);
1226+
#endif
11981227
break;
11991228
default:
12001229
elog(ERROR, "unknown operation");

src/backend/nodes/copyfuncs.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1048,6 +1048,7 @@ _copyRemoteQuery(const RemoteQuery *from)
10481048
COPY_SCALAR_FIELD(has_row_marks);
10491049
COPY_SCALAR_FIELD(rq_save_command_id);
10501050
COPY_SCALAR_FIELD(rq_params_internal);
1051+
COPY_SCALAR_FIELD(rq_save_command_id);
10511052

10521053
return newnode;
10531054
}

src/backend/nodes/outfuncs.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,7 @@ _outRemoteQuery(StringInfo str, const RemoteQuery *node)
492492
WRITE_NODE_FIELD(query_var_tlist);
493493
WRITE_BOOL_FIELD(rq_save_command_id);
494494
WRITE_BOOL_FIELD(rq_params_internal);
495+
WRITE_BOOL_FIELD(rq_save_command_id);
495496
}
496497

497498
static void

src/backend/optimizer/plan/pgxcplan.c

Lines changed: 96 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
#include "pgxc/pgxcnode.h"
4949
#include "pgxc/execRemote.h"
5050
#include "rewrite/rewriteManip.h"
51+
#include "rewrite/rewriteHandler.h"
5152
#include "utils/builtins.h"
5253
#include "utils/rel.h"
5354
#include "utils/lsyscache.h"
@@ -1073,6 +1074,9 @@ pgxc_build_dml_statement(PlannerInfo *root, CmdType cmdtype,
10731074
int col_att = 0;
10741075
int ctid_param_num;
10751076
ListCell *lc;
1077+
bool can_use_pk_for_rep_change = false;
1078+
int16 *indexed_col_numbers = NULL;
1079+
int index_col_count = 0;
10761080

10771081
/* Make sure we are dealing with DMLs */
10781082
if (cmdtype != CMD_UPDATE &&
@@ -1134,6 +1138,28 @@ pgxc_build_dml_statement(PlannerInfo *root, CmdType cmdtype,
11341138

11351139
query_to_deparse->jointree = makeNode(FromExpr);
11361140

1141+
can_use_pk_for_rep_change = (cmdtype == CMD_UPDATE || cmdtype == CMD_DELETE) &&
1142+
IsRelationReplicated(GetRelationLocInfo(res_rel->relid));
1143+
1144+
if (can_use_pk_for_rep_change)
1145+
{
1146+
index_col_count = pgxc_find_unique_index(res_rel->relid,
1147+
&indexed_col_numbers);
1148+
if (index_col_count <= 0)
1149+
can_use_pk_for_rep_change = false;
1150+
1151+
if (can_use_pk_for_rep_change)
1152+
{
1153+
if (is_pk_being_changed(root->parse, indexed_col_numbers,
1154+
index_col_count))
1155+
{
1156+
can_use_pk_for_rep_change = false;
1157+
}
1158+
}
1159+
}
1160+
1161+
rqplan->rq_use_pk_for_rep_change = can_use_pk_for_rep_change;
1162+
11371163
/*
11381164
* Prepare a param list for INSERT queries
11391165
* While doing so note the position of ctid, xc_node_id in source data
@@ -1146,7 +1172,8 @@ pgxc_build_dml_statement(PlannerInfo *root, CmdType cmdtype,
11461172
col_att++;
11471173

11481174
/* The position of ctid/xc_node_id is not required for INSERT */
1149-
if (tle->resjunk && (cmdtype == CMD_UPDATE || cmdtype == CMD_DELETE))
1175+
if (!can_use_pk_for_rep_change && tle->resjunk &&
1176+
(cmdtype == CMD_UPDATE || cmdtype == CMD_DELETE))
11501177
{
11511178
Var *v = (Var *)tle->expr;
11521179

@@ -1232,42 +1259,89 @@ pgxc_build_dml_statement(PlannerInfo *root, CmdType cmdtype,
12321259
* the n table attributes, followed by ctid and optionally node_id. So
12331260
* we know that the ctid has to be n + 1.
12341261
*/
1235-
ctid_param_num = natts + 1;
1262+
if (!can_use_pk_for_rep_change)
1263+
{
1264+
ctid_param_num = natts + 1;
1265+
}
12361266
}
1237-
else if (cmdtype == CMD_DELETE)
1267+
if (cmdtype == CMD_DELETE)
12381268
{
1239-
/*
1240-
* Since there is no data to update, the first param is going to be
1241-
* ctid.
1242-
*/
1243-
ctid_param_num = 1;
1269+
if (!can_use_pk_for_rep_change)
1270+
{
1271+
/*
1272+
* Since there is no data to update, the first param is going to be
1273+
* ctid.
1274+
*/
1275+
ctid_param_num = 1;
1276+
}
12441277
}
12451278

12461279
/* Add quals like ctid = $4 AND xc_node_id = $6 to the UPDATE/DELETE query */
12471280
if (cmdtype == CMD_UPDATE || cmdtype == CMD_DELETE)
12481281
{
1249-
if (!ctid_found)
1250-
elog(ERROR, "Source data plan's target list does not contain ctid colum");
1251-
12521282
/*
1253-
* Beware, the ordering of ctid and node_id is important ! ctid should
1254-
* be followed by node_id, not vice-versa, so as to be consistent with
1255-
* the data row to be generated while binding the parameters for the
1256-
* update statement.
1283+
* If it is not replicated, we can use CTID, otherwise we need
1284+
* to use a defined primary key
12571285
*/
1258-
pgxc_dml_add_qual_to_query(query_to_deparse, ctid_param_num,
1259-
SelfItemPointerAttributeNumber, resultRelationIndex);
1286+
if (!can_use_pk_for_rep_change)
1287+
{
1288+
if (!ctid_found)
1289+
elog(ERROR, "Source data plan's target list does not contain ctid colum");
12601290

1261-
if (node_id_found)
1291+
/*
1292+
* Beware, the ordering of ctid and node_id is important ! ctid should
1293+
* be followed by node_id, not vice-versa, so as to be consistent with
1294+
* the data row to be generated while binding the parameters for the
1295+
* update statement.
1296+
*/
1297+
pgxc_dml_add_qual_to_query(query_to_deparse, ctid_param_num,
1298+
SelfItemPointerAttributeNumber, resultRelationIndex);
1299+
1300+
if (node_id_found)
1301+
{
1302+
pgxc_dml_add_qual_to_query(query_to_deparse, ctid_param_num + 1,
1303+
XC_NodeIdAttributeNumber, resultRelationIndex);
1304+
}
1305+
}
1306+
else
12621307
{
1263-
pgxc_dml_add_qual_to_query(query_to_deparse, ctid_param_num + 1,
1264-
XC_NodeIdAttributeNumber, resultRelationIndex);
1308+
/*
1309+
* Add all the columns of the primary key or unique index
1310+
* in the where clause of update / delete on the replicated table
1311+
*/
1312+
int i;
1313+
for (i = 0; i < index_col_count; i++)
1314+
{
1315+
int pkattno = indexed_col_numbers[i];
12651316

1266-
query_to_deparse->jointree->quals = (Node *)make_andclause(
1267-
(List *)query_to_deparse->jointree->quals);
1317+
col_att = 0;
1318+
foreach(elt, sourceTargetList)
1319+
{
1320+
TargetEntry *tle = lfirst(elt);
1321+
Var *v;
1322+
1323+
col_att++;
1324+
1325+
v = (Var *)tle->expr;
1326+
1327+
if (v->varno == resultRelationIndex &&
1328+
v->varattno == pkattno)
1329+
{
1330+
break;
1331+
}
1332+
}
1333+
1334+
pgxc_dml_add_qual_to_query(query_to_deparse, col_att,
1335+
pkattno, resultRelationIndex);
1336+
}
12681337
}
1338+
query_to_deparse->jointree->quals = (Node *)make_andclause(
1339+
(List *)query_to_deparse->jointree->quals);
12691340
}
12701341

1342+
if (indexed_col_numbers != NULL)
1343+
pfree(indexed_col_numbers);
1344+
12711345
/* pgxc_add_returning_list copied returning list in base_tlist */
12721346
if (rqplan->base_tlist)
12731347
query_to_deparse->returningList = list_copy(rqplan->base_tlist);

src/backend/pgxc/pool/execRemote.c

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4956,7 +4956,6 @@ void AtEOXact_DBCleanup(bool isCommit)
49564956
* and the ctid/nodeid values to be supplied for the WHERE clause of the
49574957
* query. The data values are present in dataSlot whereas the ctid/nodeid
49584958
* are available in sourceSlot as junk attributes.
4959-
* For DELETEs, the dataSlot is NULL.
49604959
* sourceSlot is used only to retrieve ctid/nodeid, so it does not get
49614960
* used for INSERTs, although it will never be NULL.
49624961
* The slots themselves are undisturbed.
@@ -5054,10 +5053,6 @@ SetDataRowForIntParams(JunkFilter *junkfilter,
50545053
appendBinaryStringInfo(&buf, (char *) &params_nbo, sizeof(params_nbo));
50555054
}
50565055

5057-
/*
5058-
* The data attributes would not be present for DELETE. In such case,
5059-
* dataSlot will be NULL.
5060-
*/
50615056
if (dataSlot)
50625057
{
50635058
TupleDesc tdesc = dataSlot->tts_tupleDescriptor;

0 commit comments

Comments
 (0)