Skip to content

Commit 9f8e95c

Browse files
authored
Merge pull request #157 from postgres/master
Sync Fork from Upstream Repo
2 parents 79040af + f5a5773 commit 9f8e95c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+2361
-342
lines changed

contrib/test_decoding/expected/twophase.out

Lines changed: 14 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
3333

3434
COMMIT PREPARED 'test_prepared#1';
3535
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
36-
data
37-
----------------------------------------------------
38-
BEGIN
39-
table public.test_prepared1: INSERT: id[integer]:1
40-
table public.test_prepared1: INSERT: id[integer]:2
41-
PREPARE TRANSACTION 'test_prepared#1'
36+
data
37+
-----------------------------------
4238
COMMIT PREPARED 'test_prepared#1'
43-
(5 rows)
39+
(1 row)
4440

4541
-- Test that rollback of a prepared xact is decoded.
4642
BEGIN;
@@ -103,13 +99,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
10399

104100
COMMIT PREPARED 'test_prepared#3';
105101
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
106-
data
107-
-------------------------------------------------------------------------
108-
BEGIN
109-
table public.test_prepared1: INSERT: id[integer]:4 data[text]:'frakbar'
110-
PREPARE TRANSACTION 'test_prepared#3'
102+
data
103+
-----------------------------------
111104
COMMIT PREPARED 'test_prepared#3'
112-
(4 rows)
105+
(1 row)
113106

114107
-- make sure stuff still works
115108
INSERT INTO test_prepared1 VALUES (6);
@@ -143,9 +136,8 @@ WHERE locktype = 'relation'
143136
test_prepared1 | relation | AccessExclusiveLock
144137
(3 rows)
145138

146-
-- The above CLUSTER command shouldn't cause a timeout on 2pc decoding. The
147-
-- call should return within a second.
148-
SET statement_timeout = '1s';
139+
-- The above CLUSTER command shouldn't cause a timeout on 2pc decoding.
140+
SET statement_timeout = '180s';
149141
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
150142
data
151143
---------------------------------------------------------------------------
@@ -159,14 +151,10 @@ RESET statement_timeout;
159151
COMMIT PREPARED 'test_prepared_lock';
160152
-- consume the commit
161153
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
162-
data
163-
---------------------------------------------------------------------------
164-
BEGIN
165-
table public.test_prepared1: INSERT: id[integer]:8 data[text]:'othercol'
166-
table public.test_prepared1: INSERT: id[integer]:9 data[text]:'othercol2'
167-
PREPARE TRANSACTION 'test_prepared_lock'
154+
data
155+
--------------------------------------
168156
COMMIT PREPARED 'test_prepared_lock'
169-
(5 rows)
157+
(1 row)
170158

171159
-- Test savepoints and sub-xacts. Creating savepoints will create
172160
-- sub-xacts implicitly.
@@ -189,13 +177,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
189177
COMMIT PREPARED 'test_prepared_savepoint';
190178
-- consume the commit
191179
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
192-
data
193-
------------------------------------------------------------
194-
BEGIN
195-
table public.test_prepared_savepoint: INSERT: a[integer]:1
196-
PREPARE TRANSACTION 'test_prepared_savepoint'
180+
data
181+
-------------------------------------------
197182
COMMIT PREPARED 'test_prepared_savepoint'
198-
(4 rows)
183+
(1 row)
199184

200185
-- Test that a GID containing "_nodecode" gets decoded at commit prepared time.
201186
BEGIN;

contrib/test_decoding/expected/twophase_stream.out

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -60,32 +60,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-
6060
COMMIT PREPARED 'test1';
6161
--should show the COMMIT PREPARED and the other changes in the transaction
6262
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
63-
data
64-
-------------------------------------------------------------
65-
BEGIN
66-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa1'
67-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa2'
68-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa3'
69-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa4'
70-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa5'
71-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa6'
72-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa7'
73-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa8'
74-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa9'
75-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa10'
76-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa11'
77-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa12'
78-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa13'
79-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa14'
80-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa15'
81-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa16'
82-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa17'
83-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa18'
84-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa19'
85-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa20'
86-
PREPARE TRANSACTION 'test1'
63+
data
64+
-------------------------
8765
COMMIT PREPARED 'test1'
88-
(23 rows)
66+
(1 row)
8967

9068
-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with
9169
-- filtered gid. gids with '_nodecode' will not be decoded at prepare time.

contrib/test_decoding/sql/twophase.sql

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,8 @@ SELECT 'test_prepared1' AS relation, locktype, mode
6868
FROM pg_locks
6969
WHERE locktype = 'relation'
7070
AND relation = 'test_prepared1'::regclass;
71-
-- The above CLUSTER command shouldn't cause a timeout on 2pc decoding. The
72-
-- call should return within a second.
73-
SET statement_timeout = '1s';
71+
-- The above CLUSTER command shouldn't cause a timeout on 2pc decoding.
72+
SET statement_timeout = '180s';
7473
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
7574
RESET statement_timeout;
7675
COMMIT PREPARED 'test_prepared_lock';

doc/src/sgml/logical-replication.sgml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -604,10 +604,10 @@
604604
<para>
605605
The subscriber also requires the <varname>max_replication_slots</varname>
606606
to be set. In this case it should be set to at least the number of
607-
subscriptions that will be added to the subscriber.
608-
<varname>max_logical_replication_workers</varname> must be set to at
609-
least the number of subscriptions, again plus some reserve for the table
610-
synchronization. Additionally the <varname>max_worker_processes</varname>
607+
subscriptions that will be added to the subscriber, plus some reserve for
608+
table synchronization. <varname>max_logical_replication_workers</varname>
609+
must be set to at least the number of subscriptions, again plus some reserve
610+
for the table synchronization. Additionally the <varname>max_worker_processes</varname>
611611
may need to be adjusted to accommodate for replication workers, at least
612612
(<varname>max_logical_replication_workers</varname>
613613
+ <literal>1</literal>). Note that some extensions and parallel queries

doc/src/sgml/logicaldecoding.sgml

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -191,9 +191,6 @@ postgres=# COMMIT PREPARED 'test_prepared1';
191191
postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1');
192192
lsn | xid | data
193193
-----------+-----+--------------------------------------------
194-
0/1689DC0 | 529 | BEGIN 529
195-
0/1689DC0 | 529 | table public.data: INSERT: id[integer]:3 data[text]:'5'
196-
0/1689FC0 | 529 | PREPARE TRANSACTION 'test_prepared1', txid 529
197194
0/168A060 | 529 | COMMIT PREPARED 'test_prepared1', txid 529
198195
(4 row)
199196

@@ -822,10 +819,8 @@ typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx
822819
<parameter>gid</parameter> field, which is part of the
823820
<parameter>txn</parameter> parameter, can be used in this callback to
824821
check if the plugin has already received this <command>PREPARE</command>
825-
in which case it can skip the remaining changes of the transaction.
826-
This can only happen if the user restarts the decoding after receiving
827-
the <command>PREPARE</command> for a transaction but before receiving
828-
the <command>COMMIT PREPARED</command>, say because of some error.
822+
in which case it can either error out or skip the remaining changes of
823+
the transaction.
829824
<programlisting>
830825
typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
831826
ReorderBufferTXN *txn);
@@ -1228,5 +1223,29 @@ stream_commit_cb(...); &lt;-- commit of the streamed transaction
12281223
that name pattern will not be decoded as a two-phase commit transaction.
12291224
</para>
12301225

1226+
<para>
1227+
The users that want to decode prepared transactions need to be careful about
1228+
below mentioned points:
1229+
1230+
<itemizedlist>
1231+
<listitem>
1232+
<para>
1233+
If the prepared transaction has locked [user] catalog tables exclusively
1234+
then decoding prepare can block till the main transaction is committed.
1235+
</para>
1236+
</listitem>
1237+
1238+
<listitem>
1239+
<para>
1240+
The logical replication solution that builds distributed two phase commit
1241+
using this feature can deadlock if the prepared transaction has locked
1242+
[user] catalog tables exclusively. They need to inform users to not have
1243+
locks on catalog tables (via explicit <command>LOCK</command> command) in
1244+
such transactions.
1245+
</para>
1246+
</listitem>
1247+
</itemizedlist>
1248+
</para>
1249+
12311250
</sect1>
12321251
</chapter>

doc/src/sgml/queries.sgml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2349,14 +2349,13 @@ WITH RECURSIVE search_graph(id, link, data, depth) AS (
23492349
SELECT g.id, g.link, g.data, sg.depth + 1
23502350
FROM graph g, search_graph sg
23512351
WHERE g.id = sg.link
2352-
) <emphasis>CYCLE id SET is_cycle TO true DEFAULT false USING path</emphasis>
2352+
) <emphasis>CYCLE id SET is_cycle USING path</emphasis>
23532353
SELECT * FROM search_graph;
23542354
</programlisting>
23552355
and it will be internally rewritten to the above form. The
23562356
<literal>CYCLE</literal> clause specifies first the list of columns to
23572357
track for cycle detection, then a column name that will show whether a
2358-
cycle has been detected, then two values to use in that column for the yes
2359-
and no cases, and finally the name of another column that will track the
2358+
cycle has been detected, and finally the name of another column that will track the
23602359
path. The cycle and path columns will implicitly be added to the output
23612360
rows of the CTE.
23622361
</para>

doc/src/sgml/ref/select.sgml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ SELECT [ ALL | DISTINCT [ ON ( <replaceable class="parameter">expression</replac
7474

7575
<replaceable class="parameter">with_query_name</replaceable> [ ( <replaceable class="parameter">column_name</replaceable> [, ...] ) ] AS [ [ NOT ] MATERIALIZED ] ( <replaceable class="parameter">select</replaceable> | <replaceable class="parameter">values</replaceable> | <replaceable class="parameter">insert</replaceable> | <replaceable class="parameter">update</replaceable> | <replaceable class="parameter">delete</replaceable> )
7676
[ SEARCH { BREADTH | DEPTH } FIRST BY <replaceable>column_name</replaceable> [, ...] SET <replaceable>search_seq_col_name</replaceable> ]
77-
[ CYCLE <replaceable>column_name</replaceable> [, ...] SET <replaceable>cycle_mark_col_name</replaceable> TO <replaceable>cycle_mark_value</replaceable> DEFAULT <replaceable>cycle_mark_default</replaceable> USING <replaceable>cycle_path_col_name</replaceable> ]
77+
[ CYCLE <replaceable>column_name</replaceable> [, ...] SET <replaceable>cycle_mark_col_name</replaceable> [ TO <replaceable>cycle_mark_value</replaceable> DEFAULT <replaceable>cycle_mark_default</replaceable> ] USING <replaceable>cycle_path_col_name</replaceable> ]
7878

7979
TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ]
8080
</synopsis>
@@ -302,8 +302,10 @@ TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ]
302302
been detected. <replaceable>cycle_mark_value</replaceable> and
303303
<replaceable>cycle_mark_default</replaceable> must be constants and they
304304
must be coercible to a common data type, and the data type must have an
305-
inequality operator. (The SQL standard requires that they be character
306-
strings, but PostgreSQL does not require that.) Furthermore, a column
305+
inequality operator. (The SQL standard requires that they be Boolean
306+
constants or character strings, but PostgreSQL does not require that.) By
307+
default, <literal>TRUE</literal> and <literal>FALSE</literal> (of type
308+
<type>boolean</type>) are used. Furthermore, a column
307309
named <replaceable>cycle_path_col_name</replaceable> will be added to the
308310
result column list of the <literal>WITH</literal> query. This column is
309311
used internally for tracking visited rows. See <xref

src/backend/access/heap/heapam.c

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1391,6 +1391,153 @@ heap_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *s
13911391
return true;
13921392
}
13931393

1394+
void
1395+
heap_set_tidrange(TableScanDesc sscan, ItemPointer mintid,
1396+
ItemPointer maxtid)
1397+
{
1398+
HeapScanDesc scan = (HeapScanDesc) sscan;
1399+
BlockNumber startBlk;
1400+
BlockNumber numBlks;
1401+
ItemPointerData highestItem;
1402+
ItemPointerData lowestItem;
1403+
1404+
/*
1405+
* For relations without any pages, we can simply leave the TID range
1406+
* unset. There will be no tuples to scan, therefore no tuples outside
1407+
* the given TID range.
1408+
*/
1409+
if (scan->rs_nblocks == 0)
1410+
return;
1411+
1412+
/*
1413+
* Set up some ItemPointers which point to the first and last possible
1414+
* tuples in the heap.
1415+
*/
1416+
ItemPointerSet(&highestItem, scan->rs_nblocks - 1, MaxOffsetNumber);
1417+
ItemPointerSet(&lowestItem, 0, FirstOffsetNumber);
1418+
1419+
/*
1420+
* If the given maximum TID is below the highest possible TID in the
1421+
* relation, then restrict the range to that, otherwise we scan to the end
1422+
* of the relation.
1423+
*/
1424+
if (ItemPointerCompare(maxtid, &highestItem) < 0)
1425+
ItemPointerCopy(maxtid, &highestItem);
1426+
1427+
/*
1428+
* If the given minimum TID is above the lowest possible TID in the
1429+
* relation, then restrict the range to only scan for TIDs above that.
1430+
*/
1431+
if (ItemPointerCompare(mintid, &lowestItem) > 0)
1432+
ItemPointerCopy(mintid, &lowestItem);
1433+
1434+
/*
1435+
* Check for an empty range and protect from would be negative results
1436+
* from the numBlks calculation below.
1437+
*/
1438+
if (ItemPointerCompare(&highestItem, &lowestItem) < 0)
1439+
{
1440+
/* Set an empty range of blocks to scan */
1441+
heap_setscanlimits(sscan, 0, 0);
1442+
return;
1443+
}
1444+
1445+
/*
1446+
* Calculate the first block and the number of blocks we must scan. We
1447+
* could be more aggressive here and perform some more validation to try
1448+
* and further narrow the scope of blocks to scan by checking if the
1449+
* lowerItem has an offset above MaxOffsetNumber. In this case, we could
1450+
* advance startBlk by one. Likewise, if highestItem has an offset of 0
1451+
* we could scan one fewer blocks. However, such an optimization does not
1452+
* seem worth troubling over, currently.
1453+
*/
1454+
startBlk = ItemPointerGetBlockNumberNoCheck(&lowestItem);
1455+
1456+
numBlks = ItemPointerGetBlockNumberNoCheck(&highestItem) -
1457+
ItemPointerGetBlockNumberNoCheck(&lowestItem) + 1;
1458+
1459+
/* Set the start block and number of blocks to scan */
1460+
heap_setscanlimits(sscan, startBlk, numBlks);
1461+
1462+
/* Finally, set the TID range in sscan */
1463+
ItemPointerCopy(&lowestItem, &sscan->rs_mintid);
1464+
ItemPointerCopy(&highestItem, &sscan->rs_maxtid);
1465+
}
1466+
1467+
bool
1468+
heap_getnextslot_tidrange(TableScanDesc sscan, ScanDirection direction,
1469+
TupleTableSlot *slot)
1470+
{
1471+
HeapScanDesc scan = (HeapScanDesc) sscan;
1472+
ItemPointer mintid = &sscan->rs_mintid;
1473+
ItemPointer maxtid = &sscan->rs_maxtid;
1474+
1475+
/* Note: no locking manipulations needed */
1476+
for (;;)
1477+
{
1478+
if (sscan->rs_flags & SO_ALLOW_PAGEMODE)
1479+
heapgettup_pagemode(scan, direction, sscan->rs_nkeys, sscan->rs_key);
1480+
else
1481+
heapgettup(scan, direction, sscan->rs_nkeys, sscan->rs_key);
1482+
1483+
if (scan->rs_ctup.t_data == NULL)
1484+
{
1485+
ExecClearTuple(slot);
1486+
return false;
1487+
}
1488+
1489+
/*
1490+
* heap_set_tidrange will have used heap_setscanlimits to limit the
1491+
* range of pages we scan to only ones that can contain the TID range
1492+
* we're scanning for. Here we must filter out any tuples from these
1493+
* pages that are outwith that range.
1494+
*/
1495+
if (ItemPointerCompare(&scan->rs_ctup.t_self, mintid) < 0)
1496+
{
1497+
ExecClearTuple(slot);
1498+
1499+
/*
1500+
* When scanning backwards, the TIDs will be in descending order.
1501+
* Future tuples in this direction will be lower still, so we can
1502+
* just return false to indicate there will be no more tuples.
1503+
*/
1504+
if (ScanDirectionIsBackward(direction))
1505+
return false;
1506+
1507+
continue;
1508+
}
1509+
1510+
/*
1511+
* Likewise for the final page, we must filter out TIDs greater than
1512+
* maxtid.
1513+
*/
1514+
if (ItemPointerCompare(&scan->rs_ctup.t_self, maxtid) > 0)
1515+
{
1516+
ExecClearTuple(slot);
1517+
1518+
/*
1519+
* When scanning forward, the TIDs will be in ascending order.
1520+
* Future tuples in this direction will be higher still, so we can
1521+
* just return false to indicate there will be no more tuples.
1522+
*/
1523+
if (ScanDirectionIsForward(direction))
1524+
return false;
1525+
continue;
1526+
}
1527+
1528+
break;
1529+
}
1530+
1531+
/*
1532+
* if we get here it means we have a new current scan tuple, so point to
1533+
* the proper return buffer and return the tuple.
1534+
*/
1535+
pgstat_count_heap_getnext(scan->rs_base.rs_rd);
1536+
1537+
ExecStoreBufferHeapTuple(&scan->rs_ctup, slot, scan->rs_cbuf);
1538+
return true;
1539+
}
1540+
13941541
/*
13951542
* heap_fetch - retrieve tuple with given tid
13961543
*

0 commit comments

Comments
 (0)