27
27
-export ([init /0 ]).
28
28
-export ([info /1 , info /2 ]).
29
29
30
- -record (bucket , {
31
- % % a {resource, bucket} pair
32
- % % where bucket is a non-negative integer
33
- id ,
34
- % % a resource
35
- queue
36
- }).
37
-
38
- -record (bucket_count , {
39
- exchange ,
40
- count
41
- }).
42
-
43
- -record (binding_buckets , {
44
- % % an {exchange, queue} pair because we
45
- % % assume that there's only one binding between
46
- % % a consistent hash exchange and a queue
47
- id ,
48
- bucket_numbers = []
30
+ -record (chx_hash_ring , {
31
+ % % a resource
32
+ exchange ,
33
+ % % a map of bucket => queue | exchange
34
+ bucket_map ,
35
+ next_bucket_number
49
36
}).
50
37
51
38
-rabbit_boot_step (
66
53
{enables , external_infrastructure }]}).
67
54
68
55
% % This data model allows for efficient routing and exchange deletion
69
- % % but not efficient binding management. This is a future area of improvement.
70
- % % A couple of alternatives were considered, e.g. storing the entire ring state
71
- % % in a single map. Without an additional structure such as a balanced tree
72
- % % ring updates would be even less efficient (but easier to follow).
73
-
74
- % % maps buckets to queues
75
- -define (BUCKET_TABLE , rabbit_exchange_type_consistent_hash_bucket_queue ).
76
- % % maps exchange to total the number of buckets
77
- -define (BUCKET_COUNT_TABLE , rabbit_exchange_type_consistent_hash_bucket_count ).
78
- % % maps {exchange, queue} pairs to a list of buckets
79
- -define (BINDING_BUCKET_TABLE , rabbit_exchange_type_consistent_hash_binding_bucket ).
56
+ % % but less efficient (linear) binding management.
57
+
58
+ -define (HASH_RING_STATE_TABLE , rabbit_exchange_type_consistent_hash_ring_state ).
80
59
81
60
-define (PROPERTIES , [<<" correlation_id" >>, <<" message_id" >>, <<" timestamp" >>]).
82
61
@@ -91,21 +70,26 @@ description() ->
91
70
92
71
serialise_events () -> false .
93
72
94
- route (# exchange { name = Name ,
95
- arguments = Args },
96
- # delivery { message = Msg }) ->
97
- case ets :lookup (? BUCKET_COUNT_TABLE , Name ) of
73
+ route (# exchange {name = Name ,
74
+ arguments = Args },
75
+ # delivery {message = Msg }) ->
76
+ case ets :lookup (? HASH_RING_STATE_TABLE , Name ) of
98
77
[] ->
99
78
[];
100
- [# bucket_count {count = N }] ->
101
- K = value_to_hash (hash_on (Args ), Msg ),
102
- SelectedBucket = jump_consistent_hash (K , N ),
103
- case mnesia :dirty_read ({? BUCKET_TABLE , {Name , SelectedBucket }}) of
104
- [Bucket ] -> [Bucket # bucket .queue ];
105
- [] -> rabbit_log :warning (" Bucket ~p not found" , [SelectedBucket ]),
106
- []
79
+ [# chx_hash_ring {bucket_map = BM }] ->
80
+ case maps :size (BM ) of
81
+ 0 -> [];
82
+ N ->
83
+ K = value_to_hash (hash_on (Args ), Msg ),
84
+ SelectedBucket = jump_consistent_hash (K , N ),
85
+
86
+ case maps :get (SelectedBucket , BM , undefined ) of
87
+ undefined ->
88
+ rabbit_log :warning (" Bucket ~p not found" , [SelectedBucket ]),
89
+ [];
90
+ Queue -> [Queue ]
91
+ end
107
92
end
108
-
109
93
end .
110
94
111
95
validate (# exchange { arguments = Args }) ->
@@ -139,142 +123,117 @@ validate_binding(_X, #binding { key = K }) ->
139
123
{error , {binding_invalid , " The binding key must be an integer: ~p " , [K ]}}
140
124
end .
141
125
142
- create (_Tx , _X ) -> ok .
126
+ maybe_initialise_hash_ring_state (transaction , X ) ->
127
+ case mnesia :read (? HASH_RING_STATE_TABLE , X ) of
128
+ [_ ] -> ok ;
129
+ [] ->
130
+ mnesia :write_lock_table (? HASH_RING_STATE_TABLE ),
131
+ ok = mnesia :write (? HASH_RING_STATE_TABLE , # chx_hash_ring {
132
+ exchange = X ,
133
+ next_bucket_number = 0 ,
134
+ bucket_map = #{}}, write )
135
+ end ;
136
+
137
+ maybe_initialise_hash_ring_state (_ , X ) ->
138
+ rabbit_misc :execute_mnesia_transaction (
139
+ fun () -> maybe_initialise_hash_ring_state (transaction , X ) end ).
140
+
141
+ create (transaction , X ) ->
142
+ maybe_initialise_hash_ring_state (transaction , X );
143
+ create (Tx , X ) ->
144
+ maybe_initialise_hash_ring_state (Tx , X ).
143
145
144
146
delete (transaction , # exchange {name = Name }, _Bs ) ->
145
- ok = mnesia :write_lock_table (? BUCKET_TABLE ),
146
- ok = mnesia :write_lock_table (? BUCKET_COUNT_TABLE ),
147
-
148
- Numbers = mnesia :select (? BUCKET_TABLE , [{
149
- # bucket {id = {Name , '$1' }, _ = '_' },
150
- [],
151
- ['$1' ]
152
- }]),
153
- [mnesia :delete ({? BUCKET_TABLE , {Name , N }})
154
- || N <- Numbers ],
155
-
156
- Queues = mnesia :select (? BINDING_BUCKET_TABLE ,
157
- [{
158
- # binding_buckets {id = {Name , '$1' }, _ = '_' },
159
- [],
160
- ['$1' ]
161
- }]),
162
- [mnesia :delete ({? BINDING_BUCKET_TABLE , {Name , Q }})
163
- || Q <- Queues ],
164
-
165
- mnesia :delete ({? BUCKET_COUNT_TABLE , Name }),
166
- ok ;
147
+ mnesia :write_lock_table (? HASH_RING_STATE_TABLE ),
148
+
149
+ ok = mnesia :delete ({? HASH_RING_STATE_TABLE , Name });
167
150
delete (_Tx , _X , _Bs ) ->
168
151
ok .
169
152
170
153
policy_changed (_X1 , _X2 ) -> ok .
171
154
172
- add_binding (transaction , _X ,
173
- # binding {source = S , destination = D , key = K }) ->
155
+ add_binding (transaction , X ,
156
+ B = # binding {source = S , destination = D , key = K }) ->
174
157
Weight = rabbit_data_coercion :to_integer (K ),
175
158
176
- mnesia :write_lock_table (? BUCKET_TABLE ),
177
- mnesia :write_lock_table (? BUCKET_COUNT_TABLE ),
178
-
179
- LastBucketNum = bucket_count_of (S ),
180
- NewBucketCount = LastBucketNum + Weight ,
181
-
182
- Numbers = lists :seq (LastBucketNum , (NewBucketCount - 1 )),
183
- Buckets = [# bucket {id = {S , I }, queue = D } || I <- Numbers ],
184
-
185
- [ok = mnesia :write (? BUCKET_TABLE , B , write ) || B <- Buckets ],
186
-
187
- mnesia :write (? BINDING_BUCKET_TABLE , # binding_buckets {id = {S , D },
188
- bucket_numbers = Numbers }, write ),
189
- mnesia :write (? BUCKET_COUNT_TABLE , # bucket_count {exchange = S ,
190
- count = NewBucketCount }, write ),
191
-
192
- ok ;
159
+ mnesia :write_lock_table (? HASH_RING_STATE_TABLE ),
160
+
161
+ case mnesia :read (? HASH_RING_STATE_TABLE , S ) of
162
+ [State0 = # chx_hash_ring {bucket_map = BM0 ,
163
+ next_bucket_number = NexN0 }] ->
164
+ NextN = NexN0 + Weight ,
165
+ % % hi/lo bucket counters are 0-based but weight is 1-based
166
+ Range = lists :seq (NexN0 , (NextN - 1 )),
167
+ BM = lists :foldl (fun (Key , Acc ) ->
168
+ maps :put (Key , D , Acc )
169
+ end , BM0 , Range ),
170
+ State = State0 # chx_hash_ring {bucket_map = BM ,
171
+ next_bucket_number = NextN },
172
+
173
+ ok = mnesia :write (? HASH_RING_STATE_TABLE , State , write ),
174
+ ok ;
175
+ [] ->
176
+ maybe_initialise_hash_ring_state (transaction , S ),
177
+ add_binding (transaction , X , B )
178
+ end ;
193
179
add_binding (none , _X , _B ) ->
194
180
ok .
195
181
196
182
remove_bindings (transaction , _X , Bindings ) ->
197
- mnesia :write_lock_table (? BUCKET_TABLE ),
198
- mnesia :write_lock_table (? BUCKET_COUNT_TABLE ),
183
+ mnesia :write_lock_table (? HASH_RING_STATE_TABLE ),
199
184
200
185
[remove_binding (B ) || B <- Bindings ],
201
186
202
187
ok ;
203
188
remove_bindings (none , _X , _Bs ) ->
204
189
ok .
205
190
206
- remove_binding (# binding {source = S , destination = D , key = K }) ->
207
- Weight = rabbit_data_coercion :to_integer (K ),
191
+ remove_binding (# binding {source = S , destination = D , key = RK }) ->
192
+ Weight = rabbit_data_coercion :to_integer (RK ),
208
193
209
- [# binding_buckets {bucket_numbers = Numbers }] = mnesia :read (? BINDING_BUCKET_TABLE , {S , D }),
210
- LastNum = lists :last (Numbers ),
211
-
212
- % % Delete all buckets for this {exchange, queue} pair
213
- [ok = mnesia :delete (? BUCKET_TABLE , {S , N }, write ) || N <- Numbers ],
214
-
215
- % % Buckets with lower numbers stay as is; buckets that
216
- % % belong to this binding are removed; buckets with
217
- % % greater numbers are updated (their numbers are adjusted downwards by weight)
218
- BucketsToUpdate = mnesia :select (? BUCKET_TABLE , [{
219
- # bucket {id = {S , '$1' }, _ = '_' },
220
- [
221
- {'>' , '$1' , LastNum }
222
- ],
223
- ['$_' ]
224
- }]),
225
- QueuesWithUpdatedBuckets = lists :usort ([Q || # bucket {queue = Q } <- BucketsToUpdate ]),
226
- [ok = mnesia :delete (? BUCKET_TABLE , Id , write ) || # bucket {id = Id } <- BucketsToUpdate ],
227
-
228
- UpdatedBuckets = [B # bucket {id = {X , N - Weight }} || B = # bucket {id = {X , N }} <- BucketsToUpdate ],
229
- [ok = mnesia :write (? BUCKET_TABLE , B , write ) || B <- UpdatedBuckets ],
230
-
231
- % % There will be no buckets for this {exchange, queue} pair to track
232
- ok = mnesia :delete (? BINDING_BUCKET_TABLE , {S , D }, write ),
233
-
234
- % % Update the counter
235
- TotalBucketsForX = bucket_count_of (S ),
236
- mnesia :write (? BUCKET_COUNT_TABLE , # bucket_count {exchange = S ,
237
- count = TotalBucketsForX - Weight }, write ),
238
-
239
- % % Update bucket numbers
240
- [begin
241
- case mnesia :read (? BINDING_BUCKET_TABLE , {S , Q }) of
242
- [] -> ok ;
243
- [Val = # binding_buckets {bucket_numbers = BNs }] ->
244
- NewBNs = [N - Weight || N <- BNs ],
245
- ok = mnesia :write (? BINDING_BUCKET_TABLE , Val # binding_buckets {bucket_numbers = NewBNs }, write )
246
- end
247
- end || Q <- QueuesWithUpdatedBuckets ],
248
- ok = mnesia :delete (? BINDING_BUCKET_TABLE , {S , D }, write ),
194
+ mnesia :write_lock_table (? HASH_RING_STATE_TABLE ),
249
195
250
- ok .
196
+ case mnesia :read (? HASH_RING_STATE_TABLE , S ) of
197
+ [State0 = # chx_hash_ring {bucket_map = BM0 ,
198
+ next_bucket_number = NexN0 }] ->
199
+ % % Buckets with lower numbers stay as is; buckets that
200
+ % % belong to this binding are removed; buckets with
201
+ % % greater numbers are updated (their numbers are adjusted downwards by weight)
202
+ BucketsOfThisBinding = maps :filter (fun (_K , V ) -> V =:= D end , BM0 ),
203
+ LastBucket = lists :last (maps :keys (BucketsOfThisBinding )),
204
+ BucketsDownTheRing = maps :filter (fun (K , _ ) -> K > LastBucket end , BM0 ),
251
205
206
+ % % hash ring state without the buckets of this binding
207
+ BM1 = maps :fold (fun (K , _ , Acc ) -> maps :remove (K , Acc ) end , BM0 , BucketsOfThisBinding ),
208
+ % % final state with "down the ring" buckets updated
209
+ BM2 = maps :fold (fun (K0 , V , Acc ) ->
210
+ M = maps :remove (K0 , Acc ),
211
+ maps :put (K0 - Weight , V , M )
212
+ end , BM1 , BucketsDownTheRing ),
252
213
253
- assert_args_equivalence (X , Args ) ->
254
- rabbit_exchange :assert_args_equivalence (X , Args ).
214
+ NextN = NexN0 - Weight ,
215
+ State = State0 # chx_hash_ring {bucket_map = BM2 ,
216
+ next_bucket_number = NextN },
217
+
218
+ ok = mnesia :write (? HASH_RING_STATE_TABLE , State , write ),
255
219
256
- bucket_count_of (X ) ->
257
- case ets :lookup (? BUCKET_COUNT_TABLE , X ) of
258
- [] -> 0 ;
259
- [# bucket_count {count = N }] -> N
220
+ ok ;
221
+ [] ->
222
+ rabbit_log :warning (" Can't remove binding: hash ring state for exchange ~s wasn't found" ,
223
+ [rabbit_misc :rs (S )]),
224
+ ok
260
225
end .
261
226
227
+
228
+ assert_args_equivalence (X , Args ) ->
229
+ rabbit_exchange :assert_args_equivalence (X , Args ).
230
+
262
231
init () ->
263
- mnesia :create_table (? BUCKET_TABLE , [{record_name , bucket },
264
- {attributes , record_info (fields , bucket )},
265
- {type , ordered_set }]),
266
- mnesia :create_table (? BUCKET_COUNT_TABLE , [{record_name , bucket_count },
267
- {attributes , record_info (fields , bucket_count )},
268
- {type , ordered_set }]),
269
- mnesia :create_table (? BINDING_BUCKET_TABLE , [{record_name , binding_buckets },
270
- {attributes , record_info (fields , binding_buckets )},
271
- {type , ordered_set }]),
272
-
273
- mnesia :add_table_copy (? BUCKET_TABLE , node (), ram_copies ),
274
- mnesia :add_table_copy (? BUCKET_COUNT_TABLE , node (), ram_copies ),
275
- mnesia :add_table_copy (? BINDING_BUCKET_TABLE , node (), ram_copies ),
276
-
277
- mnesia :wait_for_tables ([? BUCKET_TABLE ], 30000 ),
232
+ mnesia :create_table (? HASH_RING_STATE_TABLE , [{record_name , chx_hash_ring },
233
+ {attributes , record_info (fields , chx_hash_ring )},
234
+ {type , ordered_set }]),
235
+ mnesia :add_table_copy (? HASH_RING_STATE_TABLE , node (), ram_copies ),
236
+ mnesia :wait_for_tables ([? HASH_RING_STATE_TABLE ], 30000 ),
278
237
ok .
279
238
280
239
% %
0 commit comments