@@ -378,12 +378,14 @@ class PFS_buffer_scalable_container
378
378
PFS_buffer_scalable_container (allocator_type *allocator)
379
379
{
380
380
m_allocator= allocator;
381
+ m_initialized= false ;
381
382
}
382
383
383
384
int init (long max_size)
384
385
{
385
386
int i;
386
387
388
+ m_initialized= true ;
387
389
m_full= true ;
388
390
m_max= PFS_PAGE_COUNT * PFS_PAGE_SIZE;
389
391
m_max_page_count= PFS_PAGE_COUNT;
@@ -421,6 +423,8 @@ class PFS_buffer_scalable_container
421
423
/* max_size = -1 means unbounded allocation */
422
424
m_full= false ;
423
425
}
426
+
427
+ native_mutex_init (& m_critical_section, NULL );
424
428
return 0 ;
425
429
}
426
430
@@ -429,6 +433,11 @@ class PFS_buffer_scalable_container
429
433
int i;
430
434
array_type *page;
431
435
436
+ if (! m_initialized)
437
+ return ;
438
+
439
+ native_mutex_lock (& m_critical_section);
440
+
432
441
for (i=0 ; i < PFS_PAGE_COUNT; i++)
433
442
{
434
443
page= m_pages[i];
@@ -439,6 +448,11 @@ class PFS_buffer_scalable_container
439
448
m_pages[i]= NULL ;
440
449
}
441
450
}
451
+ native_mutex_unlock (& m_critical_section);
452
+
453
+ native_mutex_destroy (& m_critical_section);
454
+
455
+ m_initialized= false ;
442
456
}
443
457
444
458
ulong get_row_count ()
@@ -473,12 +487,10 @@ class PFS_buffer_scalable_container
473
487
uint page_logical_size;
474
488
value_type *pfs;
475
489
array_type *array;
476
- array_type *old_array;
477
490
478
491
void *addr;
479
492
void * volatile * typed_addr;
480
493
void *ptr;
481
- void *old_ptr;
482
494
483
495
/*
484
496
1: Try to find an available record within the existing pages
@@ -549,44 +561,72 @@ class PFS_buffer_scalable_container
549
561
550
562
if (array == NULL )
551
563
{
552
- /* (2-b) Found no page, allocate a new one */
553
- array= new array_type ();
554
- builtin_memory_scalable_buffer. count_alloc ( sizeof (array_type));
564
+ // ==================================================================
565
+ // BEGIN CRITICAL SECTION -- buffer expand
566
+ // ==================================================================
555
567
556
- int rc= m_allocator->alloc_array (array, PFS_PAGE_SIZE);
557
- if (rc != 0 )
558
- {
559
- m_allocator->free_array (array, PFS_PAGE_SIZE);
560
- delete array;
561
- builtin_memory_scalable_buffer.count_free (sizeof (array_type));
562
- m_lost++;
563
- return NULL ;
564
- }
568
+ /*
569
+ On a fresh started server, buffers are typically empty.
570
+ When a sudden load spike is seen by the server,
571
+ multiple threads may want to expand the buffer at the same time.
572
+
573
+ Using a compare and swap to allow multiple pages to be added,
574
+ possibly freeing duplicate pages on collisions,
575
+ does not work well because the amount of code involved
576
+ when creating a new page can be significant (PFS_thread),
577
+ causing MANY collisions between (2-b) and (2-d).
578
+
579
+ A huge number of collisions (which can happen when thousands
580
+ of new connections hits the server after a restart)
581
+ leads to a huge memory consumption, and to OOM.
582
+
583
+ To mitigate this, we use here a mutex,
584
+ to enforce that only ONE page is added at a time,
585
+ so that scaling the buffer happens in a predictable
586
+ and controlled manner.
587
+ */
588
+ native_mutex_lock (& m_critical_section);
589
+
590
+ /*
591
+ Peek again for pages added by collaborating threads,
592
+ this time as the only thread allowed to expand the buffer
593
+ */
565
594
566
- /* (2-c) Atomic CAS, array <==> (m_pages[current_page_count] if NULL) */
567
- old_ptr= NULL ;
568
- ptr= array;
569
- if (my_atomic_casptr (typed_addr, & old_ptr, ptr))
595
+ /* (2-b) Atomic Load, array= m_pages[current_page_count] */
596
+
597
+ ptr= my_atomic_loadptr (typed_addr);
598
+ array= static_cast <array_type *>(ptr);
599
+
600
+ if (array == NULL )
570
601
{
571
- /* CAS: Ok */
602
+ /* (2-c) Found no page, allocate a new one */
603
+ array= new array_type ();
604
+ builtin_memory_scalable_buffer.count_alloc (sizeof (array_type));
605
+
606
+ int rc= m_allocator->alloc_array (array, PFS_PAGE_SIZE);
607
+ if (rc != 0 )
608
+ {
609
+ m_allocator->free_array (array, PFS_PAGE_SIZE);
610
+ delete array;
611
+ builtin_memory_scalable_buffer.count_free (sizeof (array_type));
612
+ m_lost++;
613
+ native_mutex_unlock (& m_critical_section);
614
+ return NULL ;
615
+ }
616
+
617
+ /* (2-d) Atomic STORE, m_pages[current_page_count] = array */
618
+ ptr= array;
619
+ my_atomic_storeptr (typed_addr, ptr);
572
620
573
621
/* Advertise the new page */
574
622
PFS_atomic::add_u32 (& m_max_page_index.m_u32 , 1 );
575
623
}
576
- else
577
- {
578
- /* CAS: Race condition with another thread */
579
624
580
- old_array= static_cast <array_type *>(old_ptr );
625
+ native_mutex_unlock (& m_critical_section );
581
626
582
- /* Delete the page */
583
- m_allocator->free_array (array, PFS_PAGE_SIZE);
584
- delete array;
585
- builtin_memory_scalable_buffer.count_free (sizeof (array_type));
586
-
587
- /* Use the new page added concurrently instead */
588
- array= old_array;
589
- }
627
+ // ==================================================================
628
+ // END CRITICAL SECTION -- buffer expand
629
+ // ==================================================================
590
630
}
591
631
592
632
DBUG_ASSERT (array != NULL );
@@ -885,6 +925,7 @@ class PFS_buffer_scalable_container
885
925
return NULL ;
886
926
}
887
927
928
+ bool m_initialized;
888
929
bool m_full;
889
930
size_t m_max;
890
931
PFS_cacheline_uint32 m_monotonic;
@@ -893,6 +934,7 @@ class PFS_buffer_scalable_container
893
934
ulong m_last_page_size;
894
935
array_type * m_pages[PFS_PAGE_COUNT];
895
936
allocator_type *m_allocator;
937
+ native_mutex_t m_critical_section;
896
938
};
897
939
898
940
template <class T , class U , class V >
0 commit comments