Skip to content

Commit d046e5a

Browse files
committed
chore: delay transaction activation until actual use
Delay the actual transaction activation until the first actual usage of the transaction. That is; the first time that a statement is being sent to Spanner. This allows the application to amend the transaction options after calling BeginTx or executing `BEGIN TRANSACTION`. The transaction options can be amended by executing a statement like `SET TRANSACTION READ ONLY`.
1 parent 27d7edc commit d046e5a

File tree

12 files changed

+363
-139
lines changed

12 files changed

+363
-139
lines changed

aborted_transactions_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ func TestCommitAborted(t *testing.T) {
4141
if err != nil {
4242
t.Fatalf("begin failed: %v", err)
4343
}
44+
if _, err := tx.ExecContext(ctx, testutil.UpdateBarSetFoo); err != nil {
45+
t.Fatal(err)
46+
}
4447
server.TestSpanner.PutExecutionTime(testutil.MethodCommitTransaction, testutil.SimulatedExecutionTime{
4548
Errors: []error{status.Error(codes.Aborted, "Aborted")},
4649
})
@@ -51,7 +54,7 @@ func TestCommitAborted(t *testing.T) {
5154
reqs := server.TestSpanner.DrainRequestsFromServer()
5255
commitReqs := testutil.RequestsOfType(reqs, reflect.TypeOf(&sppb.CommitRequest{}))
5356
if g, w := len(commitReqs), 2; g != w {
54-
t.Fatalf("commit request count mismatch\nGot: %v\nWant: %v", g, w)
57+
t.Fatalf("commit request count mismatch\n Got: %v\nWant: %v", g, w)
5558
}
5659

5760
// Verify that the db is still usable.
@@ -117,6 +120,9 @@ func TestCommitAbortedWithInternalRetriesDisabled(t *testing.T) {
117120
if err != nil {
118121
t.Fatalf("begin failed: %v", err)
119122
}
123+
if _, err := tx.ExecContext(ctx, testutil.UpdateBarSetFoo); err != nil {
124+
t.Fatal(err)
125+
}
120126
server.TestSpanner.PutExecutionTime(testutil.MethodCommitTransaction, testutil.SimulatedExecutionTime{
121127
Errors: []error{status.Error(codes.Aborted, "Aborted")},
122128
})

client_side_statement_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func TestStatementExecutor_StartBatchDdl(t *testing.T) {
6565
}
6666

6767
// Starting a DDL batch while the connection is in a transaction is not allowed.
68-
c.tx = &readWriteTransaction{}
68+
c.tx = &delegatingTransaction{conn: c, ctx: ctx}
6969
if _, err := c.ExecContext(ctx, "start batch ddl", []driver.NamedValue{}); spanner.ErrCode(err) != codes.FailedPrecondition {
7070
t.Fatalf("error mismatch for starting a DDL batch while in a transaction\nGot: %v\nWant: %v", spanner.ErrCode(err), codes.FailedPrecondition)
7171
}
@@ -102,13 +102,13 @@ func TestStatementExecutor_StartBatchDml(t *testing.T) {
102102
}
103103

104104
// Starting a DML batch while the connection is in a read-only transaction is not allowed.
105-
c.tx = &readOnlyTransaction{logger: noopLogger}
105+
c.tx = &delegatingTransaction{conn: c, contextTransaction: &readOnlyTransaction{logger: noopLogger}}
106106
if _, err := c.ExecContext(ctx, "start batch dml", []driver.NamedValue{}); spanner.ErrCode(err) != codes.FailedPrecondition {
107107
t.Fatalf("error mismatch for starting a DML batch while in a read-only transaction\nGot: %v\nWant: %v", spanner.ErrCode(err), codes.FailedPrecondition)
108108
}
109109

110110
// Starting a DML batch while the connection is in a read/write transaction is allowed.
111-
c.tx = &readWriteTransaction{logger: noopLogger}
111+
c.tx = &delegatingTransaction{conn: c, contextTransaction: &readWriteTransaction{logger: noopLogger}}
112112
if _, err := c.ExecContext(ctx, "start batch dml", []driver.NamedValue{}); err != nil {
113113
t.Fatalf("could not start a DML batch while in a read/write transaction: %v", err)
114114
}

conn.go

Lines changed: 67 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,8 @@ type SpannerConn interface {
231231
// returned.
232232
resetTransactionForRetry(ctx context.Context, errDuringCommit bool) error
233233

234+
withTransactionCloseFunc(close func())
235+
234236
// withTempTransactionOptions sets the TransactionOptions that should be used
235237
// for the next read/write transaction. This method should only be called
236238
// directly before starting a new read/write transaction.
@@ -257,8 +259,8 @@ type conn struct {
257259
adminClient *adminapi.DatabaseAdminClient
258260
connId string
259261
logger *slog.Logger
260-
tx contextTransaction
261-
prevTx contextTransaction
262+
tx *delegatingTransaction
263+
prevTx *delegatingTransaction
262264
resetForRetry bool
263265
database string
264266

@@ -531,7 +533,7 @@ func (c *conn) InDDLBatch() bool {
531533
}
532534

533535
func (c *conn) InDMLBatch() bool {
534-
return (c.batch != nil && c.batch.tp == parser.BatchTypeDml) || (c.inReadWriteTransaction() && c.tx.(*readWriteTransaction).batch != nil)
536+
return (c.batch != nil && c.batch.tp == parser.BatchTypeDml) || (c.inTransaction() && c.tx.IsInBatch())
535537
}
536538

537539
func (c *conn) GetBatchedStatements() []spanner.Statement {
@@ -567,9 +569,6 @@ func (c *conn) startBatchDML(automatic bool) (driver.Result, error) {
567569
if c.batch != nil {
568570
return nil, spanner.ToSpannerError(status.Errorf(codes.FailedPrecondition, "This connection already has an active batch."))
569571
}
570-
if c.inReadOnlyTransaction() {
571-
return nil, spanner.ToSpannerError(status.Errorf(codes.FailedPrecondition, "This connection has an active read-only transaction. Read-only transactions cannot execute DML batches."))
572-
}
573572
c.logger.Debug("starting dml batch outside transaction")
574573
c.batch = &batch{tp: parser.BatchTypeDml, options: execOptions}
575574
return driver.ResultNoRows, nil
@@ -655,8 +654,8 @@ func (c *conn) execBatchDML(ctx context.Context, statements []spanner.Statement,
655654

656655
var affected []int64
657656
var err error
658-
if c.inTransaction() {
659-
tx, ok := c.tx.(*readWriteTransaction)
657+
if c.inTransaction() && c.tx.contextTransaction != nil {
658+
tx, ok := c.tx.contextTransaction.(*readWriteTransaction)
660659
if !ok {
661660
return nil, status.Errorf(codes.FailedPrecondition, "connection is in a transaction that is not a read/write transaction")
662661
}
@@ -944,7 +943,7 @@ func (c *conn) execContext(ctx context.Context, query string, execOptions *ExecO
944943
}
945944

946945
// Start an automatic DML batch.
947-
if c.AutoBatchDml() && !c.inBatch() && c.inReadWriteTransaction() {
946+
if c.AutoBatchDml() && !c.inBatch() && c.inTransaction() && statementInfo.StatementType == parser.StatementTypeDml {
948947
if _, err := c.startBatchDML( /* automatic = */ true); err != nil {
949948
return nil, err
950949
}
@@ -1041,14 +1040,14 @@ func (c *conn) resetTransactionForRetry(ctx context.Context, errDuringCommit boo
10411040
return c.tx.resetForRetry(ctx)
10421041
}
10431042

1043+
func (c *conn) withTransactionCloseFunc(close func()) {
1044+
c.tempTransactionCloseFunc = close
1045+
}
1046+
10441047
func (c *conn) withTempTransactionOptions(options *ReadWriteTransactionOptions) {
10451048
if options == nil {
10461049
return
10471050
}
1048-
c.tempTransactionCloseFunc = options.close
1049-
// Start a transaction for the connection state, so we can set the transaction options
1050-
// as local options in the current transaction.
1051-
_ = c.state.Begin()
10521051
if options.DisableInternalRetries {
10531052
_ = propertyRetryAbortsInternally.SetLocalValue(c.state, !options.DisableInternalRetries)
10541053
}
@@ -1102,10 +1101,6 @@ func (c *conn) withTempReadOnlyTransactionOptions(options *ReadOnlyTransactionOp
11021101
if options == nil {
11031102
return
11041103
}
1105-
c.tempTransactionCloseFunc = options.close
1106-
// Start a transaction for the connection state, so we can set the transaction options
1107-
// as local options in the current transaction.
1108-
_ = c.state.Begin()
11091104
if options.BeginTransactionOption != spanner.DefaultBeginTransaction {
11101105
_ = propertyBeginTransactionOption.SetLocalValue(c.state, options.BeginTransactionOption)
11111106
}
@@ -1122,10 +1117,6 @@ func (c *conn) withTempBatchReadOnlyTransactionOptions(options *BatchReadOnlyTra
11221117
if options == nil {
11231118
return
11241119
}
1125-
c.tempTransactionCloseFunc = options.close
1126-
// Start a transaction for the connection state, so we can set the transaction options
1127-
// as local options in the current transaction.
1128-
_ = c.state.Begin()
11291120
if options.TimestampBound.String() != "(strong)" {
11301121
_ = propertyReadOnlyStaleness.SetLocalValue(c.state, options.TimestampBound)
11311122
}
@@ -1139,9 +1130,9 @@ func (c *conn) getBatchReadOnlyTransactionOptions() BatchReadOnlyTransactionOpti
11391130
// It is exported for internal reasons, and may receive breaking changes without prior notice.
11401131
//
11411132
// BeginReadOnlyTransaction starts a new read-only transaction on this connection.
1142-
func (c *conn) BeginReadOnlyTransaction(ctx context.Context, options *ReadOnlyTransactionOptions) (driver.Tx, error) {
1133+
func (c *conn) BeginReadOnlyTransaction(ctx context.Context, options *ReadOnlyTransactionOptions, close func()) (driver.Tx, error) {
1134+
tx, err := c.beginTx(ctx, driver.TxOptions{ReadOnly: true}, close)
11431135
c.withTempReadOnlyTransactionOptions(options)
1144-
tx, err := c.BeginTx(ctx, driver.TxOptions{ReadOnly: true})
11451136
if err != nil {
11461137
return nil, err
11471138
}
@@ -1152,9 +1143,9 @@ func (c *conn) BeginReadOnlyTransaction(ctx context.Context, options *ReadOnlyTr
11521143
// It is exported for internal reasons, and may receive breaking changes without prior notice.
11531144
//
11541145
// BeginReadWriteTransaction starts a new read/write transaction on this connection.
1155-
func (c *conn) BeginReadWriteTransaction(ctx context.Context, options *ReadWriteTransactionOptions) (driver.Tx, error) {
1146+
func (c *conn) BeginReadWriteTransaction(ctx context.Context, options *ReadWriteTransactionOptions, close func()) (driver.Tx, error) {
1147+
tx, err := c.beginTx(ctx, driver.TxOptions{}, close)
11561148
c.withTempTransactionOptions(options)
1157-
tx, err := c.BeginTx(ctx, driver.TxOptions{})
11581149
if err != nil {
11591150
return nil, err
11601151
}
@@ -1177,19 +1168,6 @@ func (c *conn) beginTx(ctx context.Context, driverOpts driver.TxOptions, closeFu
11771168
c.resetForRetry = false
11781169
return c.tx, nil
11791170
}
1180-
// Also start a transaction on the ConnectionState if the BeginTx call was successful.
1181-
defer func() {
1182-
if c.tx != nil {
1183-
_ = c.state.Begin()
1184-
} else {
1185-
// Rollback in case the connection state transaction was started before this function
1186-
// was called, for example if the caller set temporary transaction options.
1187-
_ = c.state.Rollback()
1188-
}
1189-
}()
1190-
1191-
readOnlyTxOpts := c.getReadOnlyTransactionOptions()
1192-
batchReadOnlyTxOpts := c.getBatchReadOnlyTransactionOptions()
11931171
if c.inTransaction() {
11941172
return nil, spanner.ToSpannerError(status.Errorf(codes.FailedPrecondition, "already in a transaction"))
11951173
}
@@ -1227,94 +1205,105 @@ func (c *conn) beginTx(ctx context.Context, driverOpts driver.TxOptions, closeFu
12271205
if closeFunc == nil {
12281206
closeFunc = func() {}
12291207
}
1208+
if err := c.state.Begin(); err != nil {
1209+
return nil, err
1210+
}
1211+
c.clearCommitResponse()
12301212

1213+
if isolationLevelFromTxOpts != spannerpb.TransactionOptions_ISOLATION_LEVEL_UNSPECIFIED {
1214+
_ = propertyIsolationLevel.SetLocalValue(c.state, sql.IsolationLevel(driverOpts.Isolation))
1215+
}
1216+
// TODO: Figure out how to distinguish between 'use the default' and 'use read/write'.
12311217
if driverOpts.ReadOnly {
1218+
_ = propertyTransactionReadOnly.SetLocalValue(c.state, true)
1219+
}
1220+
if batchReadOnly {
1221+
_ = propertyTransactionBatchReadOnly.SetLocalValue(c.state, true)
1222+
}
1223+
if disableRetryAborts {
1224+
_ = propertyRetryAbortsInternally.SetLocalValue(c.state, false)
1225+
}
1226+
1227+
c.tx = &delegatingTransaction{
1228+
conn: c,
1229+
ctx: ctx,
1230+
close: func(result txResult) {
1231+
closeFunc()
1232+
if result == txResultCommit {
1233+
_ = c.state.Commit()
1234+
} else {
1235+
_ = c.state.Rollback()
1236+
}
1237+
c.tx = nil
1238+
},
1239+
}
1240+
return c.tx, nil
1241+
}
1242+
1243+
func (c *conn) activateTransaction() (contextTransaction, error) {
1244+
closeFunc := c.tx.close
1245+
if propertyTransactionReadOnly.GetValueOrDefault(c.state) {
12321246
var logger *slog.Logger
12331247
var ro *spanner.ReadOnlyTransaction
12341248
var bo *spanner.BatchReadOnlyTransaction
1235-
if batchReadOnly {
1249+
if propertyTransactionBatchReadOnly.GetValueOrDefault(c.state) {
12361250
logger = c.logger.With("tx", "batchro")
12371251
var err error
12381252
// BatchReadOnly transactions (currently) do not support inline-begin.
12391253
// This means that the transaction options must be supplied here, and not through a callback.
1240-
bo, err = c.client.BatchReadOnlyTransaction(ctx, batchReadOnlyTxOpts.TimestampBound)
1254+
bo, err = c.client.BatchReadOnlyTransaction(c.tx.ctx, propertyReadOnlyStaleness.GetValueOrDefault(c.state))
12411255
if err != nil {
12421256
return nil, err
12431257
}
12441258
ro = &bo.ReadOnlyTransaction
12451259
} else {
12461260
logger = c.logger.With("tx", "ro")
1247-
ro = c.client.ReadOnlyTransaction().WithBeginTransactionOption(readOnlyTxOpts.BeginTransactionOption)
1261+
beginTxOpt := c.convertDefaultBeginTransactionOption(propertyBeginTransactionOption.GetValueOrDefault(c.state))
1262+
ro = c.client.ReadOnlyTransaction().WithBeginTransactionOption(beginTxOpt)
12481263
}
1249-
c.tx = &readOnlyTransaction{
1264+
return &readOnlyTransaction{
12501265
roTx: ro,
12511266
boTx: bo,
12521267
logger: logger,
1253-
close: func(result txResult) {
1254-
closeFunc()
1255-
if result == txResultCommit {
1256-
_ = c.state.Commit()
1257-
} else {
1258-
_ = c.state.Rollback()
1259-
}
1260-
c.tx = nil
1261-
},
1268+
close: closeFunc,
12621269
timestampBoundCallback: func() spanner.TimestampBound {
12631270
return propertyReadOnlyStaleness.GetValueOrDefault(c.state)
12641271
},
1265-
}
1266-
return c.tx, nil
1272+
}, nil
12671273
}
12681274

1269-
// These options are only used to determine how to start the transaction.
1270-
// All other options are fetched in a callback that is called when the transaction is actually started.
1271-
// That callback reads all transaction options from the connection state at that moment. This allows
1272-
// applications to execute a series of statement like this:
1273-
// BEGIN TRANSACTION;
1274-
// SET LOCAL transaction_tag='my_tag';
1275-
// SET LOCAL commit_priority=LOW;
1276-
// INSERT INTO my_table ... -- This starts the transaction with the options above included.
12771275
opts := spanner.TransactionOptions{}
12781276
opts.BeginTransactionOption = c.convertDefaultBeginTransactionOption(propertyBeginTransactionOption.GetValueOrDefault(c.state))
12791277

1280-
tx, err := spanner.NewReadWriteStmtBasedTransactionWithCallbackForOptions(ctx, c.client, opts, func() spanner.TransactionOptions {
1278+
tx, err := spanner.NewReadWriteStmtBasedTransactionWithCallbackForOptions(c.tx.ctx, c.client, opts, func() spanner.TransactionOptions {
12811279
defer func() {
12821280
// Reset the transaction_tag after starting the transaction.
12831281
_ = propertyTransactionTag.ResetValue(c.state, connectionstate.ContextUser)
12841282
}()
1285-
return c.effectiveTransactionOptions(isolationLevelFromTxOpts, c.options( /*reset=*/ true))
1283+
return c.effectiveTransactionOptions(spannerpb.TransactionOptions_ISOLATION_LEVEL_UNSPECIFIED, c.options( /*reset=*/ true))
12861284
})
12871285
if err != nil {
12881286
return nil, err
12891287
}
12901288
logger := c.logger.With("tx", "rw")
1291-
c.tx = &readWriteTransaction{
1292-
ctx: ctx,
1289+
return &readWriteTransaction{
1290+
ctx: c.tx.ctx,
12931291
conn: c,
12941292
logger: logger,
12951293
rwTx: tx,
12961294
close: func(result txResult, commitResponse *spanner.CommitResponse, commitErr error) {
1297-
closeFunc()
12981295
c.prevTx = c.tx
1299-
c.tx = nil
13001296
if commitErr == nil {
13011297
c.setCommitResponse(commitResponse)
1302-
if result == txResultCommit {
1303-
_ = c.state.Commit()
1304-
} else {
1305-
_ = c.state.Rollback()
1306-
}
1298+
closeFunc(result)
13071299
} else {
1308-
_ = c.state.Rollback()
1300+
closeFunc(txResultRollback)
13091301
}
13101302
},
1311-
// Disable internal retries if any of these options have been set.
13121303
retryAborts: sync.OnceValue(func() bool {
1313-
return c.RetryAbortsInternally() && !disableRetryAborts
1304+
return c.RetryAbortsInternally()
13141305
}),
1315-
}
1316-
c.clearCommitResponse()
1317-
return c.tx, nil
1306+
}, nil
13181307
}
13191308

13201309
func (c *conn) effectiveTransactionOptions(isolationLevelFromTxOpts spannerpb.TransactionOptions_IsolationLevel, execOptions *ExecOptions) spanner.TransactionOptions {
@@ -1340,22 +1329,6 @@ func (c *conn) inTransaction() bool {
13401329
return c.tx != nil
13411330
}
13421331

1343-
func (c *conn) inReadOnlyTransaction() bool {
1344-
if c.tx != nil {
1345-
_, ok := c.tx.(*readOnlyTransaction)
1346-
return ok
1347-
}
1348-
return false
1349-
}
1350-
1351-
func (c *conn) inReadWriteTransaction() bool {
1352-
if c.tx != nil {
1353-
_, ok := c.tx.(*readWriteTransaction)
1354-
return ok
1355-
}
1356-
return false
1357-
}
1358-
13591332
// Commit is not part of the public API of the database/sql driver.
13601333
// It is exported for internal reasons, and may receive breaking changes without prior notice.
13611334
//

0 commit comments

Comments
 (0)