Skip to content

Commit ce6d135

Browse files
committed
Add the .clean() operation
1 parent b1897b5 commit ce6d135

File tree

5 files changed

+207
-8
lines changed

5 files changed

+207
-8
lines changed

README.md

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,19 @@ queue.ping(msg.ack, function(err, id) {
5252
Ack a message (and remove it from the queue):
5353

5454
```js
55-
queue.ack(msg.ack, function(err) {
56-
// This msg removed from queue for this message id.
57-
// 'id' is returned, useful for logging.
55+
queue.ack(msg.ack, function(err, id) {
56+
// This msg removed from queue for this ack.
57+
// The 'id' of the message is returned, useful for logging.
58+
})
59+
```
60+
61+
By default, all old messages - even processed ones - are left in MongoDB. This is so that
62+
you can go and analyse them if you want. However, you can call the following function
63+
to remove processed messages:
64+
65+
```js
66+
queue.clean(function(err) {
67+
// All processed (ie. acked) messages have been deleted
5868
})
5969
```
6070

@@ -333,10 +343,17 @@ or deleted. We always use MongoDB's excellent `collection.findAndModify()` so th
333343
each message is updated atomically inside MongoDB and we never have to fetch something,
334344
change it and store it back.
335345

346+
## Note on MongoDB Version ##
347+
348+
When using MongoDB v2.6 and the v1.3.23 version of the mongodb driver from npm, I was getting
349+
a weird error similar to "key $exists must not start with '$'". Yes, very strange. Anyway, the fix
350+
is to install a later version of the driver. I have tried this with v1.4.9 and it seems ok.
351+
336352
## Releases ##
337353

338-
### 0.8.0 (not yet released) ###
354+
### 0.8.0 (2014-08-28) ###
339355

356+
* [NEW] Added .clean() method to remove old (processed) messages
340357
* [NEW] Add 'delay' option to queue.add() so individual messages can be delayed separately
341358
* [TEST] Test individual 'delay' option for each message
342359

mongodb-queue.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,16 @@ Queue.prototype.ack = function(ack, callback) {
177177
})
178178
}
179179

180+
Queue.prototype.clean = function(callback) {
181+
var self = this
182+
183+
var query = {
184+
deleted : { $exists : true },
185+
}
186+
187+
self.col.remove(query, callback)
188+
}
189+
180190
Queue.prototype.total = function(callback) {
181191
var self = this
182192

package.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
},
1616
"dependencies": {},
1717
"devDependencies": {
18-
"tape": "^2.10.2",
19-
"mongodb": "^1.3.23",
20-
"async": "^0.2.10"
18+
"tape": "^2.14.0",
19+
"mongodb": "^1.4.9",
20+
"async": "^0.9.0"
2121
},
2222
"repository": {
2323
"type": "git",

test/clean.js

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
var async = require('async')
2+
var test = require('tape')
3+
4+
var setup = require('./setup.js')
5+
var mongoDbQueue = require('../')
6+
7+
setup(function(db) {
8+
9+
test('clean: check deleted messages are deleted', function(t) {
10+
var queue = mongoDbQueue(db, 'clean', { visibility : 3 })
11+
var msg
12+
13+
async.series(
14+
[
15+
function(next) {
16+
queue.size(function(err, size) {
17+
t.ok(!err, 'There is no error.')
18+
t.equal(size, 0, 'There is currently nothing on the queue')
19+
next()
20+
})
21+
},
22+
function(next) {
23+
queue.total(function(err, size) {
24+
t.ok(!err, 'There is no error.')
25+
t.equal(size, 0, 'There is currently nothing in the queue at all')
26+
next()
27+
})
28+
},
29+
function(next) {
30+
queue.clean(function(err) {
31+
t.ok(!err, 'There is no error.')
32+
next()
33+
})
34+
},
35+
function(next) {
36+
queue.size(function(err, size) {
37+
t.ok(!err, 'There is no error.')
38+
t.equal(size, 0, 'There is currently nothing on the queue')
39+
next()
40+
})
41+
},
42+
function(next) {
43+
queue.total(function(err, size) {
44+
t.ok(!err, 'There is no error.')
45+
t.equal(size, 0, 'There is currently nothing in the queue at all')
46+
next()
47+
})
48+
},
49+
function(next) {
50+
queue.add('Hello, World!', function(err) {
51+
t.ok(!err, 'There is no error when adding a message.')
52+
next()
53+
})
54+
},
55+
function(next) {
56+
queue.clean(function(err) {
57+
t.ok(!err, 'There is no error.')
58+
next()
59+
})
60+
},
61+
function(next) {
62+
queue.size(function(err, size) {
63+
t.ok(!err, 'There is no error.')
64+
t.equal(size, 1, 'Queue size is correct')
65+
next()
66+
})
67+
},
68+
function(next) {
69+
queue.total(function(err, size) {
70+
t.ok(!err, 'There is no error.')
71+
t.equal(size, 1, 'Queue total is correct')
72+
next()
73+
})
74+
},
75+
function(next) {
76+
queue.get(function(err, newMsg) {
77+
msg = newMsg
78+
t.ok(msg.id, 'Got a msg.id (sanity check)')
79+
next()
80+
})
81+
},
82+
function(next) {
83+
queue.size(function(err, size) {
84+
t.ok(!err, 'There is no error.')
85+
t.equal(size, 0, 'Queue size is correct')
86+
next()
87+
})
88+
},
89+
function(next) {
90+
queue.total(function(err, size) {
91+
t.ok(!err, 'There is no error.')
92+
t.equal(size, 1, 'Queue total is correct')
93+
next()
94+
})
95+
},
96+
function(next) {
97+
queue.clean(function(err) {
98+
t.ok(!err, 'There is no error.')
99+
next()
100+
})
101+
},
102+
function(next) {
103+
queue.size(function(err, size) {
104+
t.ok(!err, 'There is no error.')
105+
t.equal(size, 0, 'Queue size is correct')
106+
next()
107+
})
108+
},
109+
function(next) {
110+
queue.total(function(err, size) {
111+
t.ok(!err, 'There is no error.')
112+
t.equal(size, 1, 'Queue total is correct')
113+
next()
114+
})
115+
},
116+
function(next) {
117+
queue.ack(msg.ack, function(err, id) {
118+
t.ok(!err, 'No error when acking the message')
119+
t.ok(id, 'Received an id when acking this message')
120+
next()
121+
})
122+
},
123+
function(next) {
124+
queue.size(function(err, size) {
125+
t.ok(!err, 'There is no error.')
126+
t.equal(size, 0, 'Queue size is correct')
127+
next()
128+
})
129+
},
130+
function(next) {
131+
queue.total(function(err, size) {
132+
t.ok(!err, 'There is no error.')
133+
t.equal(size, 1, 'Queue total is correct')
134+
next()
135+
})
136+
},
137+
function(next) {
138+
queue.clean(function(err) {
139+
t.ok(!err, 'There is no error.')
140+
next()
141+
})
142+
},
143+
function(next) {
144+
queue.size(function(err, size) {
145+
t.ok(!err, 'There is no error.')
146+
t.equal(size, 0, 'Queue size is correct')
147+
next()
148+
})
149+
},
150+
function(next) {
151+
queue.total(function(err, size) {
152+
t.ok(!err, 'There is no error.')
153+
t.equal(size, 0, 'Queue total is correct')
154+
next()
155+
})
156+
},
157+
],
158+
function(err) {
159+
if (err) t.fail(err)
160+
t.pass('Finished test ok')
161+
t.end()
162+
}
163+
)
164+
})
165+
166+
test('db.close()', function(t) {
167+
t.pass('db.close()')
168+
db.close()
169+
t.end()
170+
})
171+
172+
})

test/setup.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ module.exports = function(callback) {
88
var done = 0
99
// let's empty out some collections to make sure there are no messages
1010
var collections = [
11-
'default', 'delay', 'multi', 'visibility', 'ping',
11+
'default', 'delay', 'multi', 'visibility', 'clean', 'ping',
1212
'stats1', 'stats2',
1313
'queue', 'dead-queue', 'queue-2', 'dead-queue-2'
1414
]

0 commit comments

Comments
 (0)