31
31
import org .junit .Rule ;
32
32
import org .junit .Test ;
33
33
import org .junit .rules .TestRule ;
34
-
35
34
import org .springframework .core .task .SimpleAsyncTaskExecutor ;
36
35
import org .springframework .dao .DataAccessException ;
37
36
import org .springframework .data .annotation .Id ;
38
37
import org .springframework .data .mongodb .MongoDbFactory ;
39
38
import org .springframework .data .mongodb .core .MongoTemplate ;
40
39
import org .springframework .data .mongodb .core .SimpleMongoClientDbFactory ;
41
- import org .springframework .data .mongodb .core .SimpleMongoDbFactory ;
42
40
import org .springframework .data .mongodb .core .messaging .SubscriptionRequest .RequestOptions ;
43
41
import org .springframework .data .mongodb .test .util .MongoTestUtils ;
44
42
import org .springframework .data .mongodb .test .util .ReplicaSet ;
45
43
import org .springframework .test .annotation .IfProfileValue ;
46
44
import org .springframework .util .ErrorHandler ;
47
45
48
- import com .mongodb .MongoClient ;
49
46
import com .mongodb .client .MongoCollection ;
50
47
import com .mongodb .client .model .CreateCollectionOptions ;
51
48
import com .mongodb .client .model .changestream .ChangeStreamDocument ;
@@ -61,6 +58,8 @@ public class DefaultMessageListenerContainerTests {
61
58
public static final String COLLECTION_NAME = "collection-1" ;
62
59
public static final String COLLECTION_2_NAME = "collection-2" ;
63
60
61
+ public static final Duration TIMEOUT = Duration .ofSeconds (2 );
62
+
64
63
public @ Rule TestRule replSet = ReplicaSet .none ();
65
64
66
65
MongoDbFactory dbFactory ;
@@ -94,12 +93,12 @@ public void shouldCollectMappedChangeStreamMessagesCorrectly() throws Interrupte
94
93
Person .class );
95
94
container .start ();
96
95
97
- awaitSubscription (subscription , Duration . ofMillis ( 500 ) );
96
+ awaitSubscription (subscription , TIMEOUT );
98
97
99
98
collection .insertOne (new Document ("_id" , "id-1" ).append ("firstname" , "foo" ));
100
99
collection .insertOne (new Document ("_id" , "id-2" ).append ("firstname" , "bar" ));
101
100
102
- awaitMessages (messageListener , 2 , Duration . ofMillis ( 500 ) );
101
+ awaitMessages (messageListener , 2 , TIMEOUT );
103
102
104
103
assertThat (messageListener .getMessages ().stream ().map (Message ::getBody ).collect (Collectors .toList ()))
105
104
.containsExactly (new Person ("id-1" , "foo" ), new Person ("id-2" , "bar" ));
@@ -125,12 +124,12 @@ public void shouldNotifyErrorHandlerOnErrorInListener() throws InterruptedExcept
125
124
}, () -> COLLECTION_NAME ), Person .class , errorHandler );
126
125
container .start ();
127
126
128
- awaitSubscription (subscription , Duration . ofMillis ( 500 ) );
127
+ awaitSubscription (subscription , TIMEOUT );
129
128
130
129
collection .insertOne (new Document ("_id" , "id-1" ).append ("firstname" , "foo" ));
131
130
collection .insertOne (new Document ("_id" , "id-2" ).append ("firstname" , "bar" ));
132
131
133
- awaitMessages (messageListener , 2 , Duration . ofMillis ( 500 ) );
132
+ awaitMessages (messageListener , 2 , TIMEOUT );
134
133
135
134
verify (errorHandler , atLeast (1 )).handleError (any (IllegalStateException .class ));
136
135
assertThat (messageListener .getTotalNumberMessagesReceived ()).isEqualTo (2 );
@@ -145,12 +144,12 @@ public void shouldNoLongerReceiveMessagesWhenContainerStopped() throws Interrupt
145
144
Document .class );
146
145
container .start ();
147
146
148
- awaitSubscription (subscription , Duration . ofMillis ( 500 ) );
147
+ awaitSubscription (subscription , TIMEOUT );
149
148
150
149
collection .insertOne (new Document ("_id" , "id-1" ).append ("value" , "foo" ));
151
150
collection .insertOne (new Document ("_id" , "id-2" ).append ("value" , "bar" ));
152
151
153
- awaitMessages (messageListener , 2 , Duration . ofMillis ( 500 ) );
152
+ awaitMessages (messageListener , 2 , TIMEOUT );
154
153
155
154
container .stop ();
156
155
@@ -174,12 +173,12 @@ public void shouldReceiveMessagesWhenAddingRequestToAlreadyStartedContainer() th
174
173
Subscription subscription = container .register (new ChangeStreamRequest (messageListener , () -> COLLECTION_NAME ),
175
174
Document .class );
176
175
177
- awaitSubscription (subscription , Duration . ofMillis ( 500 ) );
176
+ awaitSubscription (subscription , TIMEOUT );
178
177
179
178
Document expected = new Document ("_id" , "id-2" ).append ("value" , "bar" );
180
179
collection .insertOne (expected );
181
180
182
- awaitMessages (messageListener , 1 , Duration . ofMillis ( 500 ) );
181
+ awaitMessages (messageListener , 1 , TIMEOUT );
183
182
container .stop ();
184
183
185
184
assertThat (messageListener .getMessages ().stream ().map (Message ::getBody ).collect (Collectors .toList ()))
@@ -226,11 +225,11 @@ public void tailableCursor() throws InterruptedException {
226
225
227
226
awaitSubscription (
228
227
container .register (new TailableCursorRequest (messageListener , () -> COLLECTION_NAME ), Document .class ),
229
- Duration . ofMillis ( 500 ) );
228
+ TIMEOUT );
230
229
231
230
collection .insertOne (new Document ("_id" , "id-2" ).append ("value" , "bar" ));
232
231
233
- awaitMessages (messageListener , 2 , Duration . ofSeconds ( 2 ) );
232
+ awaitMessages (messageListener , 2 , TIMEOUT );
234
233
container .stop ();
235
234
236
235
assertThat (messageListener .getTotalNumberMessagesReceived ()).isEqualTo (2 );
@@ -247,12 +246,12 @@ public void tailableCursorOnEmptyCollection() throws InterruptedException {
247
246
248
247
awaitSubscription (
249
248
container .register (new TailableCursorRequest (messageListener , () -> COLLECTION_NAME ), Document .class ),
250
- Duration . ofMillis ( 500 ) );
249
+ TIMEOUT );
251
250
252
251
collection .insertOne (new Document ("_id" , "id-1" ).append ("value" , "foo" ));
253
252
collection .insertOne (new Document ("_id" , "id-2" ).append ("value" , "bar" ));
254
253
255
- awaitMessages (messageListener , 2 , Duration . ofSeconds ( 2 ) );
254
+ awaitMessages (messageListener , 2 , TIMEOUT );
256
255
container .stop ();
257
256
258
257
assertThat (messageListener .getTotalNumberMessagesReceived ()).isEqualTo (2 );
@@ -359,15 +358,15 @@ public void databaseLevelWatch() throws InterruptedException {
359
358
360
359
container .start ();
361
360
362
- awaitSubscription (subscription , Duration . ofMillis ( 500 ) );
361
+ awaitSubscription (subscription , TIMEOUT );
363
362
364
363
collection .insertOne (new Document ("_id" , "col-1-id-1" ).append ("firstname" , "foo" ));
365
364
collection .insertOne (new Document ("_id" , "col-1-id-2" ).append ("firstname" , "bar" ));
366
365
367
366
collection2 .insertOne (new Document ("_id" , "col-2-id-1" ).append ("firstname" , "bar" ));
368
367
collection2 .insertOne (new Document ("_id" , "col-2-id-2" ).append ("firstname" , "foo" ));
369
368
370
- awaitMessages (messageListener , 4 , Duration . ofMillis ( 500 ) );
369
+ awaitMessages (messageListener , 4 , TIMEOUT );
371
370
372
371
assertThat (messageListener .getMessages ().stream ().map (Message ::getBody ).collect (Collectors .toList ()))
373
372
.containsExactly (new Person ("col-1-id-1" , "foo" ), new Person ("col-1-id-2" , "bar" ),
0 commit comments