Skip to content

Commit 1733085

Browse files
committed
stable queue implementation on redis
0 parents  commit 1733085

File tree

4 files changed

+356
-0
lines changed

4 files changed

+356
-0
lines changed

README.markdown

Whitespace-only changes.

queue/queue.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package queue
2+
3+
import (
4+
"fmt"
5+
"strconv"
6+
"time"
7+
8+
"github.com/garyburd/redigo/redis"
9+
)
10+
11+
type Queue struct {
12+
c redis.Conn
13+
Name string
14+
}
15+
16+
func New(queueName string, c redis.Conn) *Queue {
17+
return &Queue{
18+
c: c,
19+
Name: queueName,
20+
}
21+
}
22+
23+
func (q *Queue) Push(job string) (bool, error) {
24+
return q.Schedule(job, time.Now())
25+
}
26+
27+
func (q *Queue) Schedule(job string, when time.Time) (bool, error) {
28+
score := when.UnixNano()
29+
added, err := redis.Bool(q.c.Do("ZADD", q.Name, score, job))
30+
// _, err := addTaskScript.Do(q.c, job)
31+
return added, err
32+
33+
}
34+
35+
func (q *Queue) Pending() (int64, error) {
36+
return redis.Int64(q.c.Do("ZCARD", q.Name))
37+
}
38+
39+
func (q *Queue) FlushQueue() error {
40+
_, err := q.c.Do("DEL", q.Name)
41+
return err
42+
}
43+
44+
func (q *Queue) Pop() (string, error) {
45+
jobs, err := q.PopJobs(1)
46+
if err != nil {
47+
return "", err
48+
}
49+
if len(jobs) == 0 {
50+
return "", nil
51+
}
52+
return jobs[0], nil
53+
}
54+
55+
func (q *Queue) PopJobs(limit int) ([]string, error) {
56+
return redis.Strings(popJobsScript.Do(q.c, q.Name, fmt.Sprintf("%d", time.Now().UnixNano()), strconv.Itoa(limit)))
57+
}
58+
59+
// func (q *Queue) PopJobs(limit int) ([]string, error) {
60+
// jobs, err := redis.Strings(q.c.Do("ZRANGEBYSCORE", q.Name, "-inf", time.Now().UnixNano(), "LIMIT", 0, limit))
61+
// if err != nil {
62+
// return jobs, err
63+
// }
64+
//
65+
// if len(jobs) > 0 {
66+
// _, err = q.c.Do("ZREM", q.Name, quoteArgs(jobs))
67+
// if err != nil {
68+
// return []string{}, err
69+
// }
70+
// }
71+
// return jobs, nil
72+
// }
73+
//
74+
func quoteArgs(args []string) string {
75+
result := ""
76+
for i := range args {
77+
if len(result) > 0 {
78+
result += " "
79+
}
80+
result += strconv.QuoteToASCII(args[i])
81+
}
82+
return result
83+
}

queue/queue_test.go

Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
package queue_test
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/AgileBits/go-redis-queue/queue"
8+
"github.com/garyburd/redigo/redis"
9+
)
10+
11+
func TestQueueTasks(t *testing.T) {
12+
c, err := redis.Dial("tcp", "127.0.0.1:6379")
13+
if err != nil {
14+
t.Error(err)
15+
t.FailNow()
16+
}
17+
defer c.Close()
18+
19+
q := queue.New("basic_queue", c)
20+
21+
err = q.FlushQueue()
22+
if err != nil {
23+
t.Error(err)
24+
t.FailNow()
25+
}
26+
27+
b, err := q.Push("basic item 1")
28+
if err != nil {
29+
t.Error(err)
30+
t.FailNow()
31+
}
32+
33+
if b != true {
34+
t.Error("expected item to be added to queue but was not")
35+
}
36+
37+
b, err = q.Push("basic item 1")
38+
if err != nil {
39+
t.Error(err)
40+
t.FailNow()
41+
}
42+
43+
if b != false {
44+
t.Error("expected item not to be added to queue but it was")
45+
}
46+
47+
pending, err := q.Pending()
48+
if err != nil {
49+
t.Error(err)
50+
t.FailNow()
51+
}
52+
if pending != 1 {
53+
t.Error("Expected 1 job pending in queue, was", pending)
54+
}
55+
}
56+
57+
func TestQueueTaskScheduling(t *testing.T) {
58+
c, err := redis.Dial("tcp", "127.0.0.1:6379")
59+
if err != nil {
60+
t.Error(err)
61+
t.FailNow()
62+
}
63+
defer c.Close()
64+
65+
q := queue.New("scheduled_queue", c)
66+
67+
err = q.FlushQueue()
68+
if err != nil {
69+
t.Error(err)
70+
t.FailNow()
71+
}
72+
73+
b, err := q.Schedule("scheduled item 1", time.Now().Add(90*time.Millisecond))
74+
if err != nil {
75+
t.Error(err)
76+
t.FailNow()
77+
}
78+
79+
if b != true {
80+
t.Error("expected item to be added to queue but was not")
81+
}
82+
83+
pending, err := q.Pending()
84+
if err != nil {
85+
t.Error(err)
86+
t.FailNow()
87+
}
88+
if pending != 1 {
89+
t.Error("Expected 1 job pending in queue, was", pending)
90+
}
91+
92+
job, err := q.Pop()
93+
if err != nil {
94+
t.Error(err)
95+
t.FailNow()
96+
}
97+
98+
if job != "" {
99+
t.Error("Didn't expect to get a job off the queue but I got one.")
100+
}
101+
102+
// Wait for the job to become ready.
103+
time.Sleep(100 * time.Millisecond)
104+
105+
job, err = q.Pop()
106+
if err != nil {
107+
t.Error(err)
108+
t.FailNow()
109+
}
110+
111+
if job != "scheduled item 1" {
112+
t.Error("Expected to get a job off the queue, but I got this:", job)
113+
}
114+
}
115+
116+
func TestPopOrder(t *testing.T) {
117+
c, err := redis.Dial("tcp", "127.0.0.1:6379")
118+
if err != nil {
119+
t.Error(err)
120+
t.FailNow()
121+
}
122+
defer c.Close()
123+
124+
q := queue.New("scheduled_queue", c)
125+
126+
err = q.FlushQueue()
127+
if err != nil {
128+
t.Error(err)
129+
t.FailNow()
130+
}
131+
132+
_, err = q.Schedule("oldest", time.Now().Add(-300*time.Millisecond))
133+
if err != nil {
134+
t.Error(err)
135+
t.FailNow()
136+
}
137+
138+
_, err = q.Schedule("newer", time.Now().Add(-100*time.Millisecond))
139+
if err != nil {
140+
t.Error(err)
141+
t.FailNow()
142+
}
143+
144+
_, err = q.Schedule("older", time.Now().Add(-200*time.Millisecond))
145+
if err != nil {
146+
t.Error(err)
147+
t.FailNow()
148+
}
149+
150+
job, err := q.Pop()
151+
if err != nil {
152+
t.Error(err)
153+
t.FailNow()
154+
}
155+
156+
if job != "oldest" {
157+
t.Error("Expected to the oldest job off the queue, but I got this:", job)
158+
}
159+
160+
job, err = q.Pop()
161+
if err != nil {
162+
t.Error(err)
163+
t.FailNow()
164+
}
165+
166+
if job != "older" {
167+
t.Error("Expected to the older job off the queue, but I got this:", job)
168+
}
169+
170+
job, err = q.Pop()
171+
if err != nil {
172+
t.Error(err)
173+
t.FailNow()
174+
}
175+
176+
if job != "newer" {
177+
t.Error("Expected to the newer job off the queue, but I got this:", job)
178+
}
179+
180+
job, err = q.Pop()
181+
if err != nil {
182+
t.Error(err)
183+
t.FailNow()
184+
}
185+
if job != "" {
186+
t.Error("Expected no jobs")
187+
}
188+
}
189+
190+
func TestPopMultiOrder(t *testing.T) {
191+
c, err := redis.Dial("tcp", "127.0.0.1:6379")
192+
if err != nil {
193+
t.Error(err)
194+
t.FailNow()
195+
}
196+
defer c.Close()
197+
198+
q := queue.New("scheduled_queue", c)
199+
200+
err = q.FlushQueue()
201+
if err != nil {
202+
t.Error(err)
203+
t.FailNow()
204+
}
205+
206+
_, err = q.Schedule("oldest", time.Now().Add(-300*time.Millisecond))
207+
if err != nil {
208+
t.Error(err)
209+
t.FailNow()
210+
}
211+
212+
_, err = q.Schedule("newer", time.Now().Add(-100*time.Millisecond))
213+
if err != nil {
214+
t.Error(err)
215+
t.FailNow()
216+
}
217+
218+
_, err = q.Schedule("older", time.Now().Add(-200*time.Millisecond))
219+
if err != nil {
220+
t.Error(err)
221+
t.FailNow()
222+
}
223+
224+
jobs, err := q.PopJobs(3)
225+
if err != nil {
226+
t.Error(err)
227+
t.FailNow()
228+
}
229+
230+
if len(jobs) != 3 {
231+
t.Error("Expected 3 jobs. got: ", jobs)
232+
t.FailNow()
233+
}
234+
235+
if jobs[0] != "oldest" {
236+
t.Error("Expected to the oldest job off the queue, but I got this:", jobs)
237+
}
238+
239+
if jobs[1] != "older" {
240+
t.Error("Expected to the older job off the queue, but I got this:", jobs)
241+
}
242+
243+
if jobs[2] != "newer" {
244+
t.Error("Expected to the newer job off the queue, but I got this:", jobs)
245+
}
246+
247+
job, err := q.Pop()
248+
if err != nil {
249+
t.Error(err)
250+
t.FailNow()
251+
}
252+
if job != "" {
253+
t.Error("Expected no jobs")
254+
}
255+
}

queue/scripts.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package queue
2+
3+
import "github.com/garyburd/redigo/redis"
4+
5+
var popJobsScript *redis.Script
6+
7+
func init() {
8+
popJobsScript = redis.NewScript(3, `
9+
local name = KEYS[1]
10+
local timestamp = KEYS[2]
11+
local limit = KEYS[3]
12+
local results = redis.call('zrangebyscore', name, '-inf', timestamp, 'LIMIT', 0, limit)
13+
if table.getn(results) > 0 then
14+
redis.call('zrem', name, unpack(results))
15+
end
16+
return results
17+
`)
18+
}

0 commit comments

Comments
 (0)