@@ -646,7 +646,7 @@ void Dbacc::releaseFragResources(Signal* signal, Uint32 fragIndex)
646
646
FragmentrecPtr regFragPtr;
647
647
regFragPtr.i = fragIndex;
648
648
ptrCheckGuard (regFragPtr, cfragmentsize, fragmentrec);
649
- verifyFragCorrect (regFragPtr);
649
+ ndbrequire (regFragPtr. p -> lockCount == 0 );
650
650
651
651
if (regFragPtr.p ->expandOrShrinkQueued )
652
652
{
@@ -705,11 +705,6 @@ void Dbacc::releaseFragResources(Signal* signal, Uint32 fragIndex)
705
705
ndbassert (validatePageCount ());
706
706
}// Dbacc::releaseFragResources()
707
707
708
- void Dbacc::verifyFragCorrect (FragmentrecPtr regFragPtr)const
709
- {
710
- ndbrequire (regFragPtr.p ->lockOwnersList == RNIL);
711
- }// Dbacc::verifyFragCorrect()
712
-
713
708
void Dbacc::releaseDirResources (Signal* signal)
714
709
{
715
710
jam ();
@@ -721,7 +716,7 @@ void Dbacc::releaseDirResources(Signal* signal)
721
716
FragmentrecPtr regFragPtr;
722
717
regFragPtr.i = fragIndex;
723
718
ptrCheckGuard (regFragPtr, cfragmentsize, fragmentrec);
724
- verifyFragCorrect (regFragPtr);
719
+ ndbrequire (regFragPtr. p -> lockCount == 0 );
725
720
726
721
DynArr256::Head* directory;
727
722
ndbrequire (signal ->theData [0 ] == ZREL_DIR);
@@ -1079,8 +1074,8 @@ void Dbacc::execACCKEYREQ(Signal* signal)
1079
1074
dbgWord32 (elemPageptr, elemptr, eh);
1080
1075
elemPageptr.p ->word32 [elemptr] = eh;
1081
1076
1082
- opbits |= Operationrec::OP_LOCK_OWNER;
1083
- insertLockOwnersList (operationRecPtr) ;
1077
+ opbits |= Operationrec::OP_LOCK_OWNER;
1078
+ fragrecptr. p -> lockCount ++ ;
1084
1079
1085
1080
fragrecptr.p ->
1086
1081
m_lockStats.req_start_imm_ok ((opbits &
@@ -1598,7 +1593,8 @@ void Dbacc::insertelementLab(Signal* signal,
1598
1593
ndbrequire (operationRecPtr.p ->tupkeylen <= fragrecptr.p ->keyLength );
1599
1594
ndbassert (!(operationRecPtr.p ->m_op_bits & Operationrec::OP_LOCK_REQ));
1600
1595
1601
- insertLockOwnersList (operationRecPtr);
1596
+ fragrecptr.p ->lockCount ++;
1597
+ operationRecPtr.p ->m_op_bits |= Operationrec::OP_LOCK_OWNER;
1602
1598
1603
1599
operationRecPtr.p ->reducedHashValue = fragrecptr.p ->level .reduce (operationRecPtr.p ->hashValue );
1604
1600
const Uint32 tidrElemhead = ElementHeader::setLocked (operationRecPtr.i );
@@ -4834,7 +4830,7 @@ void Dbacc::abortOperation(Signal* signal)
4834
4830
4835
4831
if (opbits & Operationrec::OP_LOCK_OWNER)
4836
4832
{
4837
- takeOutLockOwnersList (operationRecPtr) ;
4833
+ fragrecptr. p -> lockCount -- ;
4838
4834
opbits &= ~(Uint32)Operationrec::OP_LOCK_OWNER;
4839
4835
if (opbits & Operationrec::OP_INSERT_IS_DONE)
4840
4836
{
@@ -5013,7 +5009,7 @@ void Dbacc::commitOperation(Signal* signal)
5013
5009
5014
5010
if (opbits & Operationrec::OP_LOCK_OWNER)
5015
5011
{
5016
- takeOutLockOwnersList (operationRecPtr) ;
5012
+ fragrecptr. p -> lockCount -- ;
5017
5013
opbits &= ~(Uint32)Operationrec::OP_LOCK_OWNER;
5018
5014
operationRecPtr.p ->m_op_bits = opbits;
5019
5015
@@ -5340,7 +5336,8 @@ Dbacc::release_lockowner(Signal* signal, OperationrecPtr opPtr, bool commit)
5340
5336
action = START_NEW;
5341
5337
}
5342
5338
5343
- insertLockOwnersList (newOwner);
5339
+ fragrecptr.p ->lockCount ++;
5340
+ newOwner.p ->m_op_bits |= Operationrec::OP_LOCK_OWNER;
5344
5341
5345
5342
/* *
5346
5343
* Copy op info, and store op in element
@@ -5497,99 +5494,6 @@ Dbacc::startNew(Signal* signal, OperationrecPtr newOwner)
5497
5494
return ;
5498
5495
}
5499
5496
5500
- /* *
5501
- * takeOutLockOwnersList
5502
- *
5503
- * Description: Take out an operation from the doubly linked
5504
- * lock owners list on the fragment.
5505
- *
5506
- */
5507
- void Dbacc::takeOutLockOwnersList (const OperationrecPtr& outOperPtr) const
5508
- {
5509
- const Uint32 Tprev = outOperPtr.p ->prevLockOwnerOp ;
5510
- const Uint32 Tnext = outOperPtr.p ->nextLockOwnerOp ;
5511
- #ifdef VM_TRACE
5512
- // Check that operation is already in the list
5513
- OperationrecPtr tmpOperPtr;
5514
- bool inList = false ;
5515
- tmpOperPtr.i = fragrecptr.p ->lockOwnersList ;
5516
- while (tmpOperPtr.i != RNIL){
5517
- ptrCheckGuard (tmpOperPtr, coprecsize, operationrec);
5518
- if (tmpOperPtr.i == outOperPtr.i )
5519
- inList = true ;
5520
- tmpOperPtr.i = tmpOperPtr.p ->nextLockOwnerOp ;
5521
- }
5522
- ndbrequire (inList == true );
5523
- #endif
5524
-
5525
- ndbassert (outOperPtr.p ->m_op_bits & Operationrec::OP_LOCK_OWNER);
5526
-
5527
- // Fast path through the code for the common case.
5528
- if ((Tprev == RNIL) && (Tnext == RNIL)) {
5529
- ndbrequire (fragrecptr.p ->lockOwnersList == outOperPtr.i );
5530
- fragrecptr.p ->lockOwnersList = RNIL;
5531
- return ;
5532
- }
5533
-
5534
- // Check previous operation
5535
- if (Tprev != RNIL) {
5536
- jam ();
5537
- arrGuard (Tprev, coprecsize);
5538
- operationrec[Tprev].nextLockOwnerOp = Tnext;
5539
- } else {
5540
- fragrecptr.p ->lockOwnersList = Tnext;
5541
- }// if
5542
-
5543
- // Check next operation
5544
- if (Tnext == RNIL) {
5545
- return ;
5546
- } else {
5547
- jam ();
5548
- arrGuard (Tnext, coprecsize);
5549
- operationrec[Tnext].prevLockOwnerOp = Tprev;
5550
- }// if
5551
-
5552
- return ;
5553
- }// Dbacc::takeOutLockOwnersList()
5554
-
5555
- /* *
5556
- * insertLockOwnersList
5557
- *
5558
- * Description: Insert an operation first in the dubly linked lock owners
5559
- * list on the fragment.
5560
- *
5561
- */
5562
- void Dbacc::insertLockOwnersList (const OperationrecPtr& insOperPtr) const
5563
- {
5564
- OperationrecPtr tmpOperPtr;
5565
- #ifdef VM_TRACE
5566
- // Check that operation is not already in list
5567
- tmpOperPtr.i = fragrecptr.p ->lockOwnersList ;
5568
- while (tmpOperPtr.i != RNIL){
5569
- ptrCheckGuard (tmpOperPtr, coprecsize, operationrec);
5570
- ndbrequire (tmpOperPtr.i != insOperPtr.i );
5571
- tmpOperPtr.i = tmpOperPtr.p ->nextLockOwnerOp ;
5572
- }
5573
- #endif
5574
- tmpOperPtr.i = fragrecptr.p ->lockOwnersList ;
5575
-
5576
- ndbrequire (! (insOperPtr.p ->m_op_bits & Operationrec::OP_LOCK_OWNER));
5577
-
5578
- insOperPtr.p ->m_op_bits |= Operationrec::OP_LOCK_OWNER;
5579
- insOperPtr.p ->prevLockOwnerOp = RNIL;
5580
- insOperPtr.p ->nextLockOwnerOp = tmpOperPtr.i ;
5581
-
5582
- fragrecptr.p ->lockOwnersList = insOperPtr.i ;
5583
- if (tmpOperPtr.i == RNIL) {
5584
- return ;
5585
- } else {
5586
- jam ();
5587
- ptrCheckGuard (tmpOperPtr, coprecsize, operationrec);
5588
- tmpOperPtr.p ->prevLockOwnerOp = insOperPtr.i ;
5589
- }// if
5590
- }// Dbacc::insertLockOwnersList()
5591
-
5592
-
5593
5497
/* --------------------------------------------------------------------------------- */
5594
5498
/* --------------------------------------------------------------------------------- */
5595
5499
/* --------------------------------------------------------------------------------- */
@@ -7043,8 +6947,7 @@ void Dbacc::initFragGeneral(FragmentrecPtr regFragPtr)const
7043
6947
{
7044
6948
new (®FragPtr.p ->directory ) DynArr256::Head ();
7045
6949
7046
- regFragPtr.p ->lockOwnersList = RNIL;
7047
-
6950
+ regFragPtr.p ->lockCount = 0 ;
7048
6951
regFragPtr.p ->hasCharAttr = ZFALSE;
7049
6952
regFragPtr.p ->dirRangeFull = ZFALSE;
7050
6953
regFragPtr.p ->fragState = FREEFRAG;
@@ -7326,9 +7229,10 @@ void Dbacc::checkNextBucketLab(Signal* signal)
7326
7229
getHighResTimer ());
7327
7230
7328
7231
setlock (nsPageptr, tnsElementptr);
7329
- insertLockOwnersList (operationRecPtr);
7330
- operationRecPtr.p ->m_op_bits |=
7331
- Operationrec::OP_STATE_RUNNING | Operationrec::OP_RUN_QUEUE;
7232
+ fragrecptr.p ->lockCount ++;
7233
+ operationRecPtr.p ->m_op_bits |=
7234
+ Operationrec::OP_LOCK_OWNER |
7235
+ Operationrec::OP_STATE_RUNNING | Operationrec::OP_RUN_QUEUE;
7332
7236
}// if
7333
7237
} else {
7334
7238
arrGuard (tnsElementptr, 2048 );
@@ -8027,26 +7931,6 @@ void Dbacc::putActiveScanOp() const
8027
7931
*/
8028
7932
void Dbacc::putOpScanLockQue () const
8029
7933
{
8030
-
8031
- #ifdef VM_TRACE
8032
- // DEBUG CODE
8033
- // Check that there are as many operations in the lockqueue as
8034
- // scanLockHeld indicates
8035
- OperationrecPtr tmpOp;
8036
- int numLockedOpsBefore = 0 ;
8037
- tmpOp.i = scanPtr.p ->scanFirstLockedOp ;
8038
- while (tmpOp.i != RNIL){
8039
- numLockedOpsBefore++;
8040
- ptrCheckGuard (tmpOp, coprecsize, operationrec);
8041
- if (tmpOp.p ->nextOp == RNIL)
8042
- {
8043
- ndbrequire (tmpOp.i == scanPtr.p ->scanLastLockedOp );
8044
- }
8045
- tmpOp.i = tmpOp.p ->nextOp ;
8046
- }
8047
- ndbrequire (numLockedOpsBefore==scanPtr.p ->scanLockHeld );
8048
- #endif
8049
-
8050
7934
OperationrecPtr pslOperationRecPtr;
8051
7935
ScanRec theScanRec;
8052
7936
theScanRec = *scanPtr.p ;
@@ -8414,25 +8298,6 @@ void Dbacc::takeOutScanLockQueue(Uint32 scanRecIndex) const
8414
8298
TscanPtr.p ->scanLastLockedOp = operationRecPtr.p ->prevOp ;
8415
8299
}// if
8416
8300
TscanPtr.p ->scanLockHeld --;
8417
-
8418
- #ifdef VM_TRACE
8419
- // DEBUG CODE
8420
- // Check that there are as many operations in the lockqueue as
8421
- // scanLockHeld indicates
8422
- OperationrecPtr tmpOp;
8423
- int numLockedOps = 0 ;
8424
- tmpOp.i = TscanPtr.p ->scanFirstLockedOp ;
8425
- while (tmpOp.i != RNIL){
8426
- numLockedOps++;
8427
- ptrCheckGuard (tmpOp, coprecsize, operationrec);
8428
- if (tmpOp.p ->nextOp == RNIL)
8429
- {
8430
- ndbrequire (tmpOp.i == TscanPtr.p ->scanLastLockedOp );
8431
- }
8432
- tmpOp.i = tmpOp.p ->nextOp ;
8433
- }
8434
- ndbrequire (numLockedOps==TscanPtr.p ->scanLockHeld );
8435
- #endif
8436
8301
}// Dbacc::takeOutScanLockQueue()
8437
8302
8438
8303
/* --------------------------------------------------------------------------------- */
@@ -9215,15 +9080,13 @@ Dbacc::execDUMP_STATE_ORD(Signal* signal)
9215
9080
infoEvent (" fid=%d, fragptr=%d " ,
9216
9081
tmpOpPtr.p ->fid , tmpOpPtr.p ->fragptr );
9217
9082
infoEvent (" hashValue=%d" , tmpOpPtr.p ->hashValue .pack ());
9218
- infoEvent (" nextLockOwnerOp=%d, nextOp=%d, nextParallelQue=%d " ,
9219
- tmpOpPtr.p ->nextLockOwnerOp , tmpOpPtr.p ->nextOp ,
9220
- tmpOpPtr.p ->nextParallelQue );
9083
+ infoEvent (" nextOp=%d, nextParallelQue=%d " ,
9084
+ tmpOpPtr.p ->nextOp , tmpOpPtr.p ->nextParallelQue );
9221
9085
infoEvent (" nextSerialQue=%d, prevOp=%d " ,
9222
9086
tmpOpPtr.p ->nextSerialQue ,
9223
9087
tmpOpPtr.p ->prevOp );
9224
- infoEvent (" prevLockOwnerOp=%d, prevParallelQue=%d" ,
9225
- tmpOpPtr.p ->prevLockOwnerOp , tmpOpPtr.p ->nextParallelQue );
9226
- infoEvent (" prevSerialQue=%d, scanRecPtr=%d" ,
9088
+ infoEvent (" prevParallelQue=%d, prevSerialQue=%d, scanRecPtr=%d" ,
9089
+ tmpOpPtr.p ->prevParallelQue ,
9227
9090
tmpOpPtr.p ->prevSerialQue , tmpOpPtr.p ->scanRecPtr );
9228
9091
infoEvent (" m_op_bits=0x%x, reducedHashValue=%x " ,
9229
9092
tmpOpPtr.p ->m_op_bits , tmpOpPtr.p ->reducedHashValue .pack ());
0 commit comments