From 14c9044fbb977838ee263612f01994f41097d3cf Mon Sep 17 00:00:00 2001 From: Sean Corfield Date: Tue, 2 Sep 2025 15:42:42 -0400 Subject: [PATCH 01/17] add clj-kondo imports; ignore new calva repl file Signed-off-by: Sean Corfield --- .clj-kondo/imports/babashka/fs/config.edn | 1 + .../imports/http-kit/http-kit/config.edn | 3 + .../http-kit/httpkit/with_channel.clj | 16 ++++ .../clj_commons/byte_streams.clj_kondo | 11 +++ .../org.clj-commons/byte-streams/config.edn | 9 ++ .../imports/potemkin/potemkin/config.edn | 62 +++++++++++++ .../potemkin/potemkin/potemkin/namespaces.clj | 56 ++++++++++++ .../rewrite-clj/rewrite-clj/config.edn | 5 ++ .clj-kondo/imports/taoensso/encore/config.edn | 7 ++ .../taoensso/encore/taoensso/encore_hooks.clj | 88 +++++++++++++++++++ .gitignore | 1 + 11 files changed, 259 insertions(+) create mode 100644 .clj-kondo/imports/babashka/fs/config.edn create mode 100644 .clj-kondo/imports/http-kit/http-kit/config.edn create mode 100644 .clj-kondo/imports/http-kit/http-kit/httpkit/with_channel.clj create mode 100644 .clj-kondo/imports/org.clj-commons/byte-streams/clj_commons/byte_streams.clj_kondo create mode 100644 .clj-kondo/imports/org.clj-commons/byte-streams/config.edn create mode 100644 .clj-kondo/imports/potemkin/potemkin/config.edn create mode 100644 .clj-kondo/imports/potemkin/potemkin/potemkin/namespaces.clj create mode 100644 .clj-kondo/imports/rewrite-clj/rewrite-clj/config.edn create mode 100644 .clj-kondo/imports/taoensso/encore/config.edn create mode 100644 .clj-kondo/imports/taoensso/encore/taoensso/encore_hooks.clj diff --git a/.clj-kondo/imports/babashka/fs/config.edn b/.clj-kondo/imports/babashka/fs/config.edn new file mode 100644 index 0000000..23f3609 --- /dev/null +++ b/.clj-kondo/imports/babashka/fs/config.edn @@ -0,0 +1 @@ +{:lint-as {babashka.fs/with-temp-dir clojure.core/let}} diff --git a/.clj-kondo/imports/http-kit/http-kit/config.edn b/.clj-kondo/imports/http-kit/http-kit/config.edn new file mode 100644 index 0000000..e9dbcd8 --- /dev/null +++ b/.clj-kondo/imports/http-kit/http-kit/config.edn @@ -0,0 +1,3 @@ + +{:hooks + {:analyze-call {org.httpkit.server/with-channel httpkit.with-channel/with-channel}}} diff --git a/.clj-kondo/imports/http-kit/http-kit/httpkit/with_channel.clj b/.clj-kondo/imports/http-kit/http-kit/httpkit/with_channel.clj new file mode 100644 index 0000000..b429de8 --- /dev/null +++ b/.clj-kondo/imports/http-kit/http-kit/httpkit/with_channel.clj @@ -0,0 +1,16 @@ +(ns httpkit.with-channel + (:require [clj-kondo.hooks-api :as api])) + +(defn with-channel [{node :node}] + (let [[request channel & body] (rest (:children node))] + (when-not (and request channel) (throw (ex-info "No request or channel provided" {}))) + (when-not (api/token-node? channel) (throw (ex-info "Missing channel argument" {}))) + (let [new-node + (api/list-node + (list* + (api/token-node 'let) + (api/vector-node [channel (api/vector-node [])]) + request + body))] + + {:node new-node}))) diff --git a/.clj-kondo/imports/org.clj-commons/byte-streams/clj_commons/byte_streams.clj_kondo b/.clj-kondo/imports/org.clj-commons/byte-streams/clj_commons/byte_streams.clj_kondo new file mode 100644 index 0000000..588e47e --- /dev/null +++ b/.clj-kondo/imports/org.clj-commons/byte-streams/clj_commons/byte_streams.clj_kondo @@ -0,0 +1,11 @@ +(ns clj-commons.byte-streams) + +;; TODO: propagate type info from src/dst +(defmacro def-conversion + "Kondo hook" + [[src dst :as conversion] params & body] + `(fn [~(first params) + ~(if-let [options (second params)] + options + `_#)] + ~@body)) diff --git a/.clj-kondo/imports/org.clj-commons/byte-streams/config.edn b/.clj-kondo/imports/org.clj-commons/byte-streams/config.edn new file mode 100644 index 0000000..ae60fe4 --- /dev/null +++ b/.clj-kondo/imports/org.clj-commons/byte-streams/config.edn @@ -0,0 +1,9 @@ +{:lint-as {byte-streams.utils/defprotocol+ clojure.core/defprotocol + byte-streams.utils/deftype+ clojure.core/deftype + byte-streams.utils/defrecord+ clojure.core/defrecord + byte-streams.utils/definterface+ clojure.core/definterface + clj-commons.byte-streams.utils/defprotocol+ clojure.core/defprotocol + clj-commons.byte-streams.utils/deftype+ clojure.core/deftype + clj-commons.byte-streams.utils/defrecord+ clojure.core/defrecord + clj-commons.byte-streams.utils/definterface+ clojure.core/definterface} + :hooks {:macroexpand {clj-commons.byte-streams/def-conversion clj-commons.byte-streams/def-conversion}}} diff --git a/.clj-kondo/imports/potemkin/potemkin/config.edn b/.clj-kondo/imports/potemkin/potemkin/config.edn new file mode 100644 index 0000000..3f59f3e --- /dev/null +++ b/.clj-kondo/imports/potemkin/potemkin/config.edn @@ -0,0 +1,62 @@ +{:lint-as {potemkin.collections/compile-if clojure.core/if + potemkin.collections/reify-map-type clojure.core/reify + potemkin.collections/def-map-type clj-kondo.lint-as/def-catch-all + potemkin.collections/def-derived-map clj-kondo.lint-as/def-catch-all + + potemkin.types/reify+ clojure.core/reify + potemkin.types/defprotocol+ clojure.core/defprotocol + potemkin.types/deftype+ clojure.core/deftype + potemkin.types/defrecord+ clojure.core/defrecord + potemkin.types/definterface+ clojure.core/defprotocol + potemkin.types/extend-protocol+ clojure.core/extend-protocol + potemkin.types/def-abstract-type clj-kondo.lint-as/def-catch-all + + potemkin.utils/doit clojure.core/doseq + potemkin.utils/doary clojure.core/doseq + potemkin.utils/condp-case clojure.core/condp + potemkin.utils/fast-bound-fn clojure.core/bound-fn + + potemkin.walk/prewalk clojure.walk/prewalk + potemkin.walk/postwalk clojure.walk/postwalk + potemkin.walk/walk clojure.walk/walk + + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + ;;;; top-level from import-vars + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + + ;; Have hooks + ;;potemkin/import-fn potemkin.namespaces/import-fn + ;;potemkin/import-macro potemkin.namespaces/import-macro + ;;potemkin/import-def potemkin.namespaces/import-def + + ;; Internal, not transitive + ;;potemkin/unify-gensyms potemkin.macros/unify-gensyms + ;;potemkin/normalize-gensyms potemkin.macros/normalize-gensyms + ;;potemkin/equivalent? potemkin.macros/equivalent? + + potemkin/condp-case clojure.core/condp + potemkin/doit potemkin.utils/doit + potemkin/doary potemkin.utils/doary + + potemkin/def-abstract-type clj-kondo.lint-as/def-catch-all + potemkin/reify+ clojure.core/reify + potemkin/defprotocol+ clojure.core/defprotocol + potemkin/deftype+ clojure.core/deftype + potemkin/defrecord+ clojure.core/defrecord + potemkin/definterface+ clojure.core/defprotocol + potemkin/extend-protocol+ clojure.core/extend-protocol + + potemkin/reify-map-type clojure.core/reify + potemkin/def-derived-map clj-kondo.lint-as/def-catch-all + potemkin/def-map-type clj-kondo.lint-as/def-catch-all} + + ;; leave import-vars alone, kondo special-cases it + :hooks {:macroexpand {#_#_potemkin.namespaces/import-vars potemkin.namespaces/import-vars + potemkin.namespaces/import-fn potemkin.namespaces/import-fn + potemkin.namespaces/import-macro potemkin.namespaces/import-macro + potemkin.namespaces/import-def potemkin.namespaces/import-def + + #_#_potemkin/import-vars potemkin.namespaces/import-vars + potemkin/import-fn potemkin.namespaces/import-fn + potemkin/import-macro potemkin.namespaces/import-macro + potemkin/import-def potemkin.namespaces/import-def}}} diff --git a/.clj-kondo/imports/potemkin/potemkin/potemkin/namespaces.clj b/.clj-kondo/imports/potemkin/potemkin/potemkin/namespaces.clj new file mode 100644 index 0000000..a247af5 --- /dev/null +++ b/.clj-kondo/imports/potemkin/potemkin/potemkin/namespaces.clj @@ -0,0 +1,56 @@ +(ns potemkin.namespaces + (:require [clj-kondo.hooks-api :as api])) + +(defn import-macro* + ([sym] + `(def ~(-> sym name symbol) ~sym)) + ([sym name] + `(def ~name ~sym))) + +(defmacro import-fn + ([sym] + (import-macro* sym)) + ([sym name] + (import-macro* sym name))) + +(defmacro import-macro + ([sym] + (import-macro* sym)) + ([sym name] + (import-macro* sym name))) + +(defmacro import-def + ([sym] + (import-macro* sym)) + ([sym name] + (import-macro* sym name))) + +#_ +(defmacro import-vars + "Imports a list of vars from other namespaces." + [& syms] + (let [unravel (fn unravel [x] + (if (sequential? x) + (->> x + rest + (mapcat unravel) + (map + #(symbol + (str (first x) + (when-let [n (namespace %)] + (str "." n))) + (name %)))) + [x])) + syms (mapcat unravel syms) + result `(do + ~@(map + (fn [sym] + (let [vr (resolve sym) + m (meta vr)] + (cond + (nil? vr) `(throw (ex-info (format "`%s` does not exist" '~sym) {})) + (:macro m) `(def ~(-> sym name symbol) ~sym) + (:arglists m) `(def ~(-> sym name symbol) ~sym) + :else `(def ~(-> sym name symbol) ~sym)))) + syms))] + result)) diff --git a/.clj-kondo/imports/rewrite-clj/rewrite-clj/config.edn b/.clj-kondo/imports/rewrite-clj/rewrite-clj/config.edn new file mode 100644 index 0000000..19ecae9 --- /dev/null +++ b/.clj-kondo/imports/rewrite-clj/rewrite-clj/config.edn @@ -0,0 +1,5 @@ +{:lint-as + {rewrite-clj.zip/subedit-> clojure.core/-> + rewrite-clj.zip/subedit->> clojure.core/->> + rewrite-clj.zip/edit-> clojure.core/-> + rewrite-clj.zip/edit->> clojure.core/->>}} diff --git a/.clj-kondo/imports/taoensso/encore/config.edn b/.clj-kondo/imports/taoensso/encore/config.edn new file mode 100644 index 0000000..975c943 --- /dev/null +++ b/.clj-kondo/imports/taoensso/encore/config.edn @@ -0,0 +1,7 @@ +{:hooks + {:analyze-call + {taoensso.encore/defalias taoensso.encore-hooks/defalias + taoensso.encore/defaliases taoensso.encore-hooks/defaliases + taoensso.encore/defn-cached taoensso.encore-hooks/defn-cached + taoensso.encore/defonce taoensso.encore-hooks/defonce + taoensso.encore/def* taoensso.encore-hooks/def*}}} diff --git a/.clj-kondo/imports/taoensso/encore/taoensso/encore_hooks.clj b/.clj-kondo/imports/taoensso/encore/taoensso/encore_hooks.clj new file mode 100644 index 0000000..d133f73 --- /dev/null +++ b/.clj-kondo/imports/taoensso/encore/taoensso/encore_hooks.clj @@ -0,0 +1,88 @@ +(ns taoensso.encore-hooks + "I don't personally use clj-kondo, so these hooks are + kindly authored and maintained by contributors. + PRs very welcome! - Peter Taoussanis" + (:refer-clojure :exclude [defonce]) + (:require + [clj-kondo.hooks-api :as hooks])) + +(defn defalias + [{:keys [node]}] + (let [[alias src-raw _attrs body] (rest (:children node)) + src (or src-raw alias) + sym (if src-raw (hooks/sexpr alias) (symbol (name (hooks/sexpr src))))] + {:node + (with-meta + (hooks/list-node + [(hooks/token-node 'def) + (hooks/token-node sym) + (if body + (hooks/list-node + ;; use :body in the def to avoid unused import/private var warnings + [(hooks/token-node 'or) body src]) + src)]) + (meta src))})) + +(defn defaliases + [{:keys [node]}] + (let [alias-nodes (rest (:children node))] + {:node + (hooks/list-node + (into + [(hooks/token-node 'do)] + (map + (fn alias->defalias [alias-node] + (cond + (hooks/token-node? alias-node) + (hooks/list-node + [(hooks/token-node 'taoensso.encore/defalias) + alias-node]) + + (hooks/map-node? alias-node) + (let [{:keys [src alias attrs body]} (hooks/sexpr alias-node) + ;; workaround as can't seem to (get) using a token-node + ;; and there's no update-keys (yet) in sci apparently + [& {:as node-as-map}] (:children alias-node) + {:keys [attrs body]} (zipmap (map hooks/sexpr (keys node-as-map)) + (vals node-as-map))] + (hooks/list-node + [(hooks/token-node 'taoensso.encore/defalias) + (or alias src) (hooks/token-node src) attrs body]))))) + alias-nodes))})) + +(defn defn-cached + [{:keys [node]}] + (let [[sym _opts binding-vec & body] (rest (:children node))] + {:node + (hooks/list-node + (list + (hooks/token-node 'def) + sym + (hooks/list-node + (list* + (hooks/token-node 'fn) + binding-vec + body))))})) + +(defn -def-impl + [{:keys [node]} core-macro-sym] + ;; args = [sym doc-string? attr-map? init-expr] + (let [[sym & args] (rest (:children node)) + [doc-string args] (if (and (hooks/string-node? (first args)) (next args)) [(hooks/sexpr (first args)) (next args)] [nil args]) + [attr-map init-expr] (if (and (hooks/map-node? (first args)) (next args)) [(hooks/sexpr (first args)) (fnext args)] [nil (first args)]) + + attr-map (if doc-string (assoc attr-map :doc doc-string) attr-map) + sym+meta (if attr-map (with-meta sym attr-map) sym) + + rewritten + (hooks/list-node + [(hooks/token-node core-macro-sym) + sym+meta + init-expr])] + + #_(println "old node:" node) + #_(println "new node:" rewritten) + {:node rewritten})) + +(defn def* [arg] (-def-impl arg 'def)) +(defn defonce [arg] (-def-impl arg 'clojure.core/defonce)) diff --git a/.gitignore b/.gitignore index a3bf747..52461bc 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .calva/mcp-server/port +.calva/repl.calva-repl .clj-kondo/.cache .cpcache .DS_Store From db55a9e350b00ab4adab7a3f821a0832ab679b60 Mon Sep 17 00:00:00 2001 From: Sean Corfield Date: Tue, 2 Sep 2025 16:49:17 -0400 Subject: [PATCH 02/17] start work on 0.1.8 Signed-off-by: Sean Corfield --- build.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.clj b/build.clj index 3777433..9d3a2a1 100644 --- a/build.clj +++ b/build.clj @@ -16,7 +16,7 @@ (def lib 'org.clj-commons/durable-queue) (defn- the-version [patch] (format "0.1.%s" patch)) -(def version (the-version "7")) +(def version (the-version "8")) ; unreleased (def snapshot (the-version "99-SNAPSHOT")) (def class-dir "target/classes") From 279ab0a8ff3c742f0b7473f0a3c3fbc480af5c12 Mon Sep 17 00:00:00 2001 From: Sean Corfield Date: Tue, 2 Sep 2025 16:49:56 -0400 Subject: [PATCH 03/17] local clj-kondo config needs to be exported! Signed-off-by: Sean Corfield --- .clj-kondo/config.edn | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 .clj-kondo/config.edn diff --git a/.clj-kondo/config.edn b/.clj-kondo/config.edn new file mode 100644 index 0000000..9e218c2 --- /dev/null +++ b/.clj-kondo/config.edn @@ -0,0 +1,2 @@ +{:lint-as {clj-commons.durable-queue/with-buffer clojure.core/let + durable-queue/with-buffer clojure.core/let}} From b6e31bb8fea2148ce27cd5630113ca0407498bd7 Mon Sep 17 00:00:00 2001 From: Sean Corfield Date: Tue, 2 Sep 2025 16:50:40 -0400 Subject: [PATCH 04/17] address lint issues and clean up code Signed-off-by: Sean Corfield --- src/durable_queue.clj | 97 +++++++++++++++++++------------------------ 1 file changed, 42 insertions(+), 55 deletions(-) diff --git a/src/durable_queue.clj b/src/durable_queue.clj index 813748b..05cfd50 100644 --- a/src/durable_queue.clj +++ b/src/durable_queue.clj @@ -1,37 +1,29 @@ (ns durable-queue (:require - [clojure.java.io :as io] - [byte-streams :as bs] - [clojure.string :as str] - [primitive-math :as p] - [taoensso.nippy :as nippy]) + [clj-commons.byte-streams :as bs] + [clj-commons.primitive-math :as p] + [clojure.java.io :as io] + [taoensso.nippy :as nippy]) (:import - [java.lang.reflect - Method - Field] - [java.util.concurrent - LinkedBlockingQueue - TimeoutException - TimeUnit] - [java.util.concurrent.atomic + [java.io + File + IOException + RandomAccessFile + Writer] + [java.lang.ref + WeakReference] + [java.lang.reflect + Method] + [java.nio ByteBuffer MappedByteBuffer] + [java.nio.channels + FileChannel$MapMode] + [java.util.concurrent LinkedBlockingQueue TimeUnit TimeoutException] + [java.util.concurrent.atomic AtomicLong] - [java.util.zip - CRC32] - [java.util.concurrent.locks + [java.util.concurrent.locks ReentrantReadWriteLock] - [java.io - Writer - File - RandomAccessFile - IOException] - [java.nio.channels - FileChannel - FileChannel$MapMode] - [java.nio - ByteBuffer - MappedByteBuffer] - [java.lang.ref - WeakReference])) + [java.util.zip + CRC32])) ;;; @@ -78,7 +70,7 @@ (^:private sync! [_]) (^:private invalidate [_ offset len]) (^:private ^ByteBuffer buffer [_]) - (^:private append-to-slab! [_ descriptor]) + (^:private append-to-slab! [_ task-descriptor]) (^:private read-write-lock [_])) (defmacro ^:private with-buffer [[buf slab] & body] @@ -139,12 +131,12 @@ (.invoke clean (.invoke cleaner buf nil) nil)) - (catch Throwable e + (catch Throwable _ ;; not much we can do here, sadly ))))) (defn- force-buffer - [^MappedByteBuffer buf offset length] + [^MappedByteBuffer buf _offset _length] (.force buf)) ;;; @@ -232,8 +224,8 @@ (lazy-seq (with-buffer [buf slab] (let [^ByteBuffer buf' (.position buf (p/inc pos)) - status (.get buf') - checksum (.getLong buf') + _status (.get buf') + _checksum (.getLong buf') size (.getInt buf')] ;; this shouldn't be necessary, but let's not gratuitously @@ -249,7 +241,7 @@ (slab->task-seq slab (+ pos header-size size))))))))) - (catch Throwable e + (catch Throwable _ ;; this implies unrecoverable corruption nil ))))) @@ -268,7 +260,7 @@ (read-write-lock [_] lock) - (buffer [this] + (buffer [_] (let [buf (or @buf (swap! buf (fn [buf] @@ -299,9 +291,9 @@ (compare-and-set! dirty [start end] [Integer/MAX_VALUE 0]) nil))))) - (append-to-slab! [this descriptor] + (append-to-slab! [this task-descriptor] (with-buffer [buf this] - (let [ary (nippy/freeze descriptor) + (let [ary (nippy/freeze task-descriptor) cnt (count ary) pos @position ^ByteBuffer buf (.position buf ^Long pos)] @@ -515,10 +507,6 @@ queue-name->current-slab (atom {}) ;; initialize - slabs (->> @queue-name->slabs vals (apply concat)) - slab->count (zipmap - slabs - (map #(atom (count (seq %))) slabs)) create-new-slab (fn [q-name] (let [slab (create-slab directory q-name (queue q-name) slab-size) empty-slabs (->> (@queue-name->slabs q-name) @@ -566,8 +554,7 @@ (fsync q) (let [end (System/currentTimeMillis)] (Thread/sleep (long (max 0 (- fsync-interval (- end start))))))) - (catch Throwable e - ))))))) + (catch Throwable _))))))) ;; populate queues with pre-existing tasks (let [empty-slabs (atom #{})] @@ -620,7 +607,7 @@ IQueues - (delete! [this] + (delete! [_] (doseq [s (->> @queue-name->slabs vals (apply concat))] (unmap s) (delete-slab s))) @@ -654,7 +641,7 @@ (immediate-stats (queue q-name) (get @queue-name->stats q-name)))) ks)))) - (take! [this q-name timeout timeout-val] + (take! [_ q-name timeout timeout-val] (let [q-name (munge (name q-name)) ^LinkedBlockingQueue q (queue q-name)] (try @@ -704,8 +691,8 @@ (when-not task (throw - (IllegalArgumentException. - (str "Can't enqueue task whose serialized representation is larger than :slab-size, which is currently " slab-size)))) + (IllegalArgumentException. + (str "Can't enqueue task whose serialized representation is larger than :slab-size, which is currently " slab-size)))) (when fsync-put? (sync! slab)) @@ -715,13 +702,13 @@ (if (zero? timeout) (.offer q task) (.offer q task timeout TimeUnit/MILLISECONDS)))] - (if-let [val (locking q - (queue! - (vary-meta (slab!) assoc - ::this this-ref - ::queue-name q-name - ::queue q - ::fsync? fsync-take?)))] + (if (locking q + (queue! + (vary-meta (slab!) assoc + ::this this-ref + ::queue-name q-name + ::queue q + ::fsync? fsync-take?))) (do (populate-stats! q-name) (let [^AtomicLong counter (get-in @queue-name->stats [q-name :enqueued])] From 9776db0b795ec57a6bee869ae1e729e3ba0cccd2 Mon Sep 17 00:00:00 2001 From: Sean Corfield Date: Tue, 2 Sep 2025 16:53:19 -0400 Subject: [PATCH 05/17] export clj-kondo config Signed-off-by: Sean Corfield --- .../clj-kondo.exports/org.clj-commons/durable-queue/config.edn | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 resources/clj-kondo.exports/org.clj-commons/durable-queue/config.edn diff --git a/resources/clj-kondo.exports/org.clj-commons/durable-queue/config.edn b/resources/clj-kondo.exports/org.clj-commons/durable-queue/config.edn new file mode 100644 index 0000000..9e218c2 --- /dev/null +++ b/resources/clj-kondo.exports/org.clj-commons/durable-queue/config.edn @@ -0,0 +1,2 @@ +{:lint-as {clj-commons.durable-queue/with-buffer clojure.core/let + durable-queue/with-buffer clojure.core/let}} From 2ef4072491815f808ec7c8399e2a23833f9e8533 Mon Sep 17 00:00:00 2001 From: Sean Corfield Date: Tue, 2 Sep 2025 17:18:54 -0400 Subject: [PATCH 06/17] remove :refer :all Signed-off-by: Sean Corfield --- CHANGELOG.md | 13 +++++ README.md | 22 ++++----- test/durable_queue_test.clj | 98 ++++++++++++++++++------------------- 3 files changed, 73 insertions(+), 60 deletions(-) create mode 100644 CHANGELOG.md diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..ca6891f --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,13 @@ +## CHANGES + +v0.1.8 in progress +* Clean up lint issues; export clj-kondo config +* Add a changelog! + +v0.1.7 -- 2025-09-02 +* Bring all dependencies up to date +* Matrix test against Clojure 1.10, 1.11, and 1.12 +* Matrix test against JDK 11, 17, and 21 (also tested on JDK 24 manually) +* Add `deps.edn`, `bb.edn`, `build.clj` +* Add GitHub Actions for test, snapshot, and release +* Remove non-functional CircleCI integration diff --git a/README.md b/README.md index ed223e8..9002434 100644 --- a/README.md +++ b/README.md @@ -15,20 +15,20 @@ This library implements a disk-backed task queue, allowing for queues that can s To interact with queues, first create a `queues` object by specifying a directory in the filesystem and an options map: ```clj -> (require '[durable-queue :refer :all]) +> (require '[durable-queue :as dq]) nil -> (def q (queues "/tmp" {})) +> (def q (dq/queues "/tmp" {})) #'q ``` This allows us to `put!` and `take!` tasks from named queues. `take!` is a blocking read, and will only return once a task is available or, if a timeout is defined (in milliseconds), once the timeout elapses: ```clj -> (take! q :foo 10 :timed-out!) +> (dq/take! q :foo 10 :timed-out!) :timed-out! -> (put! q :foo "a task") +> (dq/put! q :foo "a task") true -> (take! q :foo) +> (dq/take! q :foo) < :in-progress | "a task" > > (deref *1) "a task" @@ -39,20 +39,20 @@ Notice that the task has a value describing its progress, and a value describing Calling `take!` removed the task from the queue, but just because we've taken the task doesn't mean we've completed the action associated with it. In order to make sure the task isn't retried on restart, we must mark it as `complete!`. ```clj -> (put! q :foo "another task") +> (dq/put! q :foo "another task") true -> (take! q :foo) +> (dq/take! q :foo) < :in-progress | "another task" > -> (complete! *1) +> (dq/complete! *1) true ``` -If our task fails and we want to re-enqueue it to be tried again, we can instead call `(retry! task)`. Tasks which are marked for retry are added to the end of the current queue. +If our task fails and we want to re-enqueue it to be tried again, we can instead call `(dq/retry! task)`. Tasks which are marked for retry are added to the end of the current queue. To get a description of the current state of the queue, we can use `stats`, which returns a map of queue names onto various counts: ```clj -> (stats q) +> (dq/stats q) {:enqueued 2, :retried 0, :completed 1, @@ -90,7 +90,7 @@ A complete list of options is as follows: Disabling `:fsync-put?` will risk losing tasks if a process dies. Disabling `:fsync-take?` increases the chance of a task being re-run when a process dies. Disabling both will increase throughput of the queue by at least an order of magnitude (in the default configuration, ~1.5k tasks/sec on rotating disks and ~6k tasks/sec on SSD, with fsync completely disabled ~100k tasks/sec independent of hardware). -Writes can be batched using `fsync-threshold` and/or `fsync-interval`, or by explicitly calling `(durable-queue/fsync q)`. Setting the `fsync-threshold` to 10 will allow for ~25k tasks/sec on SSD, and still enforces a small upper boundary on how much data can be lost when the process dies. An exception will be thrown if both per-task and batch sync options are set. +Writes can be batched using `fsync-threshold` and/or `fsync-interval`, or by explicitly calling `(dq/fsync q)`. Setting the `fsync-threshold` to 10 will allow for ~25k tasks/sec on SSD, and still enforces a small upper boundary on how much data can be lost when the process dies. An exception will be thrown if both per-task and batch sync options are set. ### license diff --git a/test/durable_queue_test.clj b/test/durable_queue_test.clj index 4865f56..0c67f3a 100644 --- a/test/durable_queue_test.clj +++ b/test/durable_queue_test.clj @@ -1,8 +1,8 @@ (ns durable-queue-test (:require [clojure.java.io :as io] - [clojure.test :refer :all] - [durable-queue :refer :all] + [clojure.test :refer [deftest is]] + [durable-queue :as dq] [criterium.core :as c])) (defn clear-tmp-directory [] @@ -13,48 +13,48 @@ (deftest test-basic-put-take (clear-tmp-directory) - (let [q (queues "/tmp" {:slab-size 1024}) + (let [q (dq/queues "/tmp" {:slab-size 1024}) tasks (range 1e4)] (doseq [t tasks] - (put! q :foo t)) - (is (= tasks (map deref (immediate-task-seq q :foo)))) - (delete! q))) + (dq/put! q :foo t)) + (is (= tasks (map deref (dq/immediate-task-seq q :foo)))) + (dq/delete! q))) (deftest test-partial-slab-writes (clear-tmp-directory) (dotimes [i 10] - (put! (queues "/tmp") :foo i)) - (is (= (range 10) (map deref (immediate-task-seq (queues "/tmp") :foo))))) + (dq/put! (dq/queues "/tmp") :foo i)) + (is (= (range 10) (map deref (dq/immediate-task-seq (dq/queues "/tmp") :foo))))) (deftest test-retry (clear-tmp-directory) - (with-open [^java.io.Closeable q (queues "/tmp")] + (with-open [^java.io.Closeable q (dq/queues "/tmp")] (doseq [t (range 10)] - (put! q :foo t)) + (dq/put! q :foo t)) - (let [tasks' (immediate-task-seq q :foo)] + (let [tasks' (dq/immediate-task-seq q :foo)] (is (= (range 10) (map deref tasks'))) (doseq [t (take 5 tasks')] - (complete! t)) + (dq/complete! t)) (doseq [t (range 10 15)] - (put! q :foo t)))) + (dq/put! q :foo t)))) ;; create a new manager, which will mark all in-progress tasks as incomplete - (with-open [^java.io.Closeable q (queues "/tmp")] - (let [tasks' (immediate-task-seq q :foo)] + (with-open [^java.io.Closeable q (dq/queues "/tmp")] + (let [tasks' (dq/immediate-task-seq q :foo)] (is (= (range 5 15) (map deref tasks'))) (doseq [t (take 5 tasks')] - (complete! t)))) + (dq/complete! t)))) - (with-open [^java.io.Closeable q (queues "/tmp")] - (let [tasks' (immediate-task-seq q :foo)] + (with-open [^java.io.Closeable q (dq/queues "/tmp")] + (let [tasks' (dq/immediate-task-seq q :foo)] (is (= (range 10 15) (map deref tasks'))) (doseq [t (range 15 20)] - (put! q :foo t)))) + (dq/put! q :foo t)))) - (let [q (queues "/tmp" {:complete? even?})] - (is (= (remove even? (range 10 20)) (map deref (immediate-task-seq q :foo)))))) + (let [q (dq/queues "/tmp" {:complete? even?})] + (is (= (remove even? (range 10 20)) (map deref (dq/immediate-task-seq q :foo)))))) ;;; @@ -62,72 +62,72 @@ (clear-tmp-directory) (println "\n\n-- sync both") - (let [q (queues "/tmp" {:fsync-put? true, :fsync-take? true})] + (let [q (dq/queues "/tmp" {:fsync-put? true, :fsync-take? true})] (c/quick-bench (do - (put! q :foo 1) - (complete! (take! q :foo))))) + (dq/put! q :foo 1) + (dq/complete! (dq/take! q :foo))))) (println "\n\n-- sync take") - (let [q (queues "/tmp" {:fsync-put? false, :fsync-take? true})] + (let [q (dq/queues "/tmp" {:fsync-put? false, :fsync-take? true})] (c/quick-bench (do - (put! q :foo 1) - (complete! (take! q :foo))))) + (dq/put! q :foo 1) + (dq/complete! (dq/take! q :foo))))) (println "\n\n-- sync put") - (let [q (queues "/tmp" {:fsync-put? true, :fsync-take? false})] + (let [q (dq/queues "/tmp" {:fsync-put? true, :fsync-take? false})] (c/quick-bench (do - (put! q :foo 1) - (complete! (take! q :foo))))) + (dq/put! q :foo 1) + (dq/complete! (dq/take! q :foo))))) (println "\n\n-- sync every 10 writes") - (let [q (queues "/tmp" {:fsync-put? false, :fsync-threshold 10})] + (let [q (dq/queues "/tmp" {:fsync-put? false, :fsync-threshold 10})] (c/quick-bench (do - (put! q :foo 1) - (complete! (take! q :foo))))) + (dq/put! q :foo 1) + (dq/complete! (dq/take! q :foo))))) (println "\n\n-- sync every 100 writes") - (let [q (queues "/tmp" {:fsync-put? false, :fsync-threshold 100})] + (let [q (dq/queues "/tmp" {:fsync-put? false, :fsync-threshold 100})] (c/quick-bench (do - (put! q :foo 1) - (complete! (take! q :foo))))) + (dq/put! q :foo 1) + (dq/complete! (dq/take! q :foo))))) (println "\n\n-- sync every 100ms") - (let [q (queues "/tmp" {:fsync-put? false, :fsync-interval 100})] + (let [q (dq/queues "/tmp" {:fsync-put? false, :fsync-interval 100})] (c/quick-bench (do - (put! q :foo 1) - (complete! (take! q :foo))))) + (dq/put! q :foo 1) + (dq/complete! (dq/take! q :foo))))) (println "\n\n-- sync neither") - (let [q (queues "/tmp" {:fsync-put? false, :fsync-take? false})] + (let [q (dq/queues "/tmp" {:fsync-put? false, :fsync-take? false})] (c/quick-bench (do - (put! q :foo 1) - (complete! (take! q :foo)))))) + (dq/put! q :foo 1) + (dq/complete! (dq/take! q :foo)))))) ;;; (deftest ^:stress stress-queue-size (clear-tmp-directory) - (with-open [^java.io.Closeable q (queues "/tmp")] + (with-open [^java.io.Closeable q (dq/queues "/tmp")] (let [ary (byte-array 1e6)] (dotimes [i 1e6] (aset ary i (byte (rand-int 127)))) (dotimes [_ 1e5] - (put! q :stress ary)))) + (dq/put! q :stress ary)))) - (with-open [^java.io.Closeable q (queues "/tmp" {:complete? (constantly false)})] - (let [s (doall (immediate-task-seq q :stress))] + (with-open [^java.io.Closeable q (dq/queues "/tmp" {:complete? (constantly false)})] + (let [s (doall (dq/immediate-task-seq q :stress))] (doseq [t s] - (retry! t))) - (let [s (immediate-task-seq q :stress)] + (dq/retry! t))) + (let [s (dq/immediate-task-seq q :stress)] (doseq [t s] - (complete! t)))) + (dq/complete! t)))) (clear-tmp-directory)) From e2750b6d9c64a10c93ed22971311e3ad3d3fbf5f Mon Sep 17 00:00:00 2001 From: Sean Corfield Date: Tue, 2 Sep 2025 17:32:28 -0400 Subject: [PATCH 07/17] fixes #29 by adding clj-commons ns prefix this follows other clj-commons libs Signed-off-by: Sean Corfield --- CHANGELOG.md | 3 +- README.md | 2 +- build.clj | 4 +- project.clj | 2 +- src/clj_commons/durable_queue.clj | 786 ++++++++++++++++++++++++ src/durable_queue.clj | 15 +- test/clj_commons/durable_queue_test.clj | 133 ++++ test/durable_queue_test.clj | 1 + 8 files changed, 934 insertions(+), 12 deletions(-) create mode 100644 src/clj_commons/durable_queue.clj create mode 100644 test/clj_commons/durable_queue_test.clj diff --git a/CHANGELOG.md b/CHANGELOG.md index ca6891f..c8fe949 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## CHANGES -v0.1.8 in progress +v0.2.0 in progress +* Deprecated single-segment `durable-queue` ns; use `clj-commons.durable-queue` instead * Clean up lint issues; export clj-kondo config * Add a changelog! diff --git a/README.md b/README.md index 9002434..dd2028c 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ This library implements a disk-backed task queue, allowing for queues that can s To interact with queues, first create a `queues` object by specifying a directory in the filesystem and an options map: ```clj -> (require '[durable-queue :as dq]) +> (require '[clj-commons.durable-queue :as dq]) nil > (def q (dq/queues "/tmp" {})) #'q diff --git a/build.clj b/build.clj index 9d3a2a1..0ad0438 100644 --- a/build.clj +++ b/build.clj @@ -15,8 +15,8 @@ [deps-deploy.deps-deploy :as dd])) (def lib 'org.clj-commons/durable-queue) -(defn- the-version [patch] (format "0.1.%s" patch)) -(def version (the-version "8")) ; unreleased +(defn- the-version [patch] (format "0.2.%s" patch)) +(def version (the-version "0")) ; 0.2.0 unreleased (def snapshot (the-version "99-SNAPSHOT")) (def class-dir "target/classes") diff --git a/project.clj b/project.clj index a8933aa..453313c 100644 --- a/project.clj +++ b/project.clj @@ -21,5 +21,5 @@ :benchmark :benchmark :stress :stress} :codox {:writer codox-md.writer/write-docs - :include [durable-queue]} + :include [clj-commons.durable-queue]} :jvm-opts ^:replace ["-server" "-Xmx100m"]) diff --git a/src/clj_commons/durable_queue.clj b/src/clj_commons/durable_queue.clj new file mode 100644 index 0000000..472c6c3 --- /dev/null +++ b/src/clj_commons/durable_queue.clj @@ -0,0 +1,786 @@ +(ns clj-commons.durable-queue + (:require + [clj-commons.byte-streams :as bs] + [clj-commons.primitive-math :as p] + [clojure.java.io :as io] + [taoensso.nippy :as nippy]) + (:import + [java.io + File + IOException + RandomAccessFile + Writer] + [java.lang.ref + WeakReference] + [java.lang.reflect + Method] + [java.nio ByteBuffer MappedByteBuffer] + [java.nio.channels + FileChannel$MapMode] + [java.util.concurrent LinkedBlockingQueue TimeUnit TimeoutException] + [java.util.concurrent.atomic + AtomicLong] + [java.util.concurrent.locks + ReentrantReadWriteLock] + [java.util.zip + CRC32])) + +;;; + +(defmacro ^:private with-lock [lock & body] + `(let [^ReentrantReadWriteLock lock# ~lock + read-lock# (.readLock lock#)] + (do + (.lock read-lock#) + (try + ~@body + (finally + (.unlock read-lock#)))))) + +(defmacro ^:private with-exclusive-lock [lock & body] + `(let [^ReentrantReadWriteLock lock# ~lock + write-lock# (.writeLock lock#)] + (do + (.lock write-lock#) + (try + ~@body + (finally + (.unlock write-lock#)))))) + +;;; + +(defn- checksum ^long [^long length ^bytes ary] + (let [crc (CRC32.)] + (dotimes [i 4] + (.update crc (p/>> length i))) + (.update crc ary) + (.getValue crc))) + +;;; + +(def ^:private ^:const header-size 14) + +(defprotocol ITask + (^:private status [_] "Returns the task status") + (^:private status! [_ status] "Sets the task status")) + +(defprotocol ITaskSlab + (^:private unmap [_] "Temporarily releases mapped byte buffer until it's needed again.") + (^:private mapped? [_] "Returns true if the slab is actively mapped into memory.") + (^:private sync! [_]) + (^:private invalidate [_ offset len]) + (^:private ^ByteBuffer buffer [_]) + (^:private append-to-slab! [_ task-descriptor]) + (^:private read-write-lock [_])) + +(defmacro ^:private with-buffer [[buf slab] & body] + `(with-lock (read-write-lock ~slab) + (when-let [~buf (buffer ~slab)] + ~@body))) + +;;; + +(defn create-buffer [filename size] + (let [raf (doto (RandomAccessFile. (io/file filename) "rw") + (.setLength size))] + (try + (let [fc (.getChannel raf)] + (try + (let [buf (.map fc FileChannel$MapMode/READ_WRITE 0 size)] + (doto buf + (.put 0 (byte 0)) + .force)) + (finally + (.close fc)))) + (finally + (.close raf))))) + +(defn load-buffer + ([filename] + (load-buffer filename nil nil)) + ([filename offset length] + (let [_ (assert (.exists (io/file filename))) + raf (RandomAccessFile. (io/file filename) "rw")] + (try + (let [fc (.getChannel raf)] + (try + (.map fc + FileChannel$MapMode/READ_WRITE + (or offset 0) + (or length (.length raf))) + (finally + (.close fc)))) + (finally + (.close raf)))))) + +(let [clean (delay + (doto (.getMethod + (Class/forName "sun.misc.Cleaner") + "clean" + nil) + (.setAccessible true)))] + (defn- unmap-buffer + "A delightful endrun on the JVM's mmap GC mechanism" + [^ByteBuffer buf] + (when (.isDirect buf) + (try + + (let [^Method clean @clean + cleaner (doto (.getMethod (class buf) "cleaner" nil) + (.setAccessible true))] + (.invoke clean + (.invoke cleaner buf nil) + nil)) + (catch Throwable _ + ;; not much we can do here, sadly + ))))) + +(defn- force-buffer + [^MappedByteBuffer buf _offset _length] + (.force buf)) + +;;; + +;; a single task within a slab, assumes that the buffer is sliced around +;; the task's boundaries +(defrecord Task + [slab + ^long offset + ^long length + status + deserializer] + clojure.lang.IDeref + (deref [_] + (deserializer)) + ITask + (status [_] + (with-buffer [buf slab] + (or @status + (let [s (case (.get buf (p/+ offset 1)) + 0 :incomplete + 1 :in-progress + 2 :complete)] + (reset! status s) + s)))) + (status! [_ s] + (with-buffer [buf slab] + (reset! status s) + (.put buf (p/+ offset 1) + (byte + (case s + :incomplete 0 + :in-progress 1 + :complete 2))) + (invalidate slab (p/+ offset 1) 1) + nil))) + +(defn- task [slab offset len] + (Task. + slab + offset + len + (atom nil) + (fn [] + (with-buffer [buf slab] + (let [^ByteBuffer buf (-> buf + (.position ^Long offset) + ^ByteBuffer + (.limit ^Long (+ offset len)) + .slice) + checksum' (.getLong buf 2) + ary (bs/to-byte-array (.position buf header-size))] + (when-not (== (checksum (.getInt buf 10) ary) checksum') + (throw (IOException. "checksum mismatch"))) + (nippy/thaw ary)))))) + +(defmethod print-method Task [t ^Writer w] + (.write w + (str "< " (status t) " | " (pr-str @t) " >"))) + +;;; + +;; the byte layout is +;; [ exists? : int8 +;; state : int8 +;; checksum : int64 +;; size : int32 +;; payload : array ] +;; valid values for 'exists' is 0 (no), 1 (yes) +;; valid values for 'state' is 0 (unclaimed), 1 (in progress), 2 (complete) +(defn- slab->task-seq + "Takes a slab, and returns a sequence of the tasks it contains." + ([slab] + (slab->task-seq slab 0)) + ([slab ^long pos] + (with-buffer [buf slab] + (try + (let [^ByteBuffer buf' (.position buf pos)] + + ;; is there a next task, and is there space left in the buffer? + (when (and + (<= header-size (.remaining buf')) + (== 1 (.get buf'))) + + (lazy-seq + (with-buffer [buf slab] + (let [^ByteBuffer buf' (.position buf (p/inc pos)) + _status (.get buf') + _checksum (.getLong buf') + size (.getInt buf')] + + ;; this shouldn't be necessary, but let's not gratuitously + ;; overreach our bounds + (when (< size (.remaining buf')) + (cons + + (task + slab + pos + (+ header-size size)) + + (slab->task-seq + slab + (+ pos header-size size))))))))) + (catch Throwable _ + ;; this implies unrecoverable corruption + nil + ))))) + +(deftype TaskSlab + [filename + q-name + queue + buf ;; a clearable atom holding the buffer + position ;; an atom storing the write position of the slab + lock + dirty ;; an atom containing an interval of dirty bytes + ] + ITaskSlab + + (read-write-lock [_] + lock) + + (buffer [_] + (let [buf (or @buf + (swap! buf + (fn [buf] + (or buf (load-buffer filename)))))] + (.duplicate ^ByteBuffer buf))) + + (mapped? [_] + (boolean @buf)) + + (unmap [_] + (with-exclusive-lock lock + (when-let [b @buf] + (reset! buf nil) + (unmap-buffer b)))) + + (invalidate [_ start' len] + (let [end' (+ start' len)] + (swap! dirty + (fn [[start end]] + [(min start start') (max end end')])))) + + (sync! [this] + (let [[start end] @dirty] + (when (< start end) + (with-buffer [_ this] + (let [buf @buf] + (force-buffer buf start (- end start)) + (compare-and-set! dirty [start end] [Integer/MAX_VALUE 0]) + nil))))) + + (append-to-slab! [this task-descriptor] + (with-buffer [buf this] + (let [ary (nippy/freeze task-descriptor) + cnt (count ary) + pos @position + ^ByteBuffer buf (.position buf ^Long pos)] + + (when (> (.remaining buf) (+ (count ary) header-size)) + ;; write to the buffer + (doto buf + (.position ^Long pos) + (.put (byte 1)) ;; exists + (.put (byte 0)) ;; incomplete + (.putLong (checksum cnt ary)) + (.putInt cnt) + (.put ^bytes ary) + (.put (byte 0))) ;; next doesn't exist + + (swap! position + header-size cnt) + + (invalidate this pos (+ header-size cnt)) + + ;; return a task to enqueue in-memory + (task + this + pos + (+ header-size cnt)))))) + + clojure.lang.Seqable + (seq [this] + (slab->task-seq this)) + + Comparable + (compareTo [_ x] + (assert (instance? TaskSlab x)) + (compare filename (.filename ^TaskSlab x)))) + +(def ^:private fs-monitor (Object.)) + +(defn- delete-slab + [^TaskSlab slab] + (locking fs-monitor + (unmap slab) + (.delete (io/file (.filename slab))))) + +(defn- create-slab + "Creates a new slab file, ensuring a new file name that is lexicographically greater than + any existing files for that queue name." + ([directory q-name queue size] + (locking fs-monitor + (let [pattern (re-pattern (str "^" q-name "_(\\d{6}$)")) + last-number (->> directory + io/file + .listFiles + (map #(.getName ^File %)) + (map #(second (re-find pattern %))) + (remove nil?) + (map #(Long/parseLong %)) + sort + last) + n (if last-number (inc last-number) 0) + f (io/file (str directory "/" q-name "_" (format "%06d" n)))] + + (when-not (.createNewFile f) + (throw (IOException. (str "Could not create new slab file at " (.getAbsolutePath f))))) + + (TaskSlab. + (.getAbsolutePath f) + q-name + queue + (atom (create-buffer f size)) + (atom 0) + (ReentrantReadWriteLock.) + (atom [Integer/MAX_VALUE 0])))))) + +(defn- file->slab + "Transforms a file into a slab representing that file's contents." + [filename q-name queue] + (let [pos (atom 0) + slab (TaskSlab. + filename + q-name + queue + (atom nil) + pos + (ReentrantReadWriteLock.) + (atom [Integer/MAX_VALUE 0])) + len (->> slab + (map :length) + (reduce +))] + (reset! pos len) + (unmap slab) + slab)) + +(defn- directory->queue-name->slab-files + "Returns a map of queue names onto slab files for that queue." + [directory] + (let [queue->file (->> directory + io/file + .listFiles + (filter #(re-find #"^\w+_\d{6}$" (.getName ^File %))) + (group-by #(second (re-find #"^(\w+)_\d{6}$" (.getName ^File %)))))] + (zipmap + (keys queue->file) + (map + (fn [files] + (->> files + (map #(.getAbsolutePath ^File %)) + sort)) + (vals queue->file))))) + +;;; + +(defn- initial-stats [^long count] + {:enqueued (AtomicLong. count) + :retried (AtomicLong. 0) + :completed (AtomicLong. 0)}) + +(defn- immediate-stats [^LinkedBlockingQueue q {:keys [enqueued retried completed]}] + (let [cnt (.size q) + completed (.get ^AtomicLong completed) + enqueued (.get ^AtomicLong enqueued)] + {:enqueued enqueued + :retried (.get ^AtomicLong retried) + :completed completed + :in-progress (- (- enqueued completed) cnt)})) + +;;; + +(defprotocol IQueues + (^:private mark-complete! [_ q-name]) + (^:private mark-retry! [_ q-name]) + (delete! [_] + "Deletes all files associated with the queues.") + (stats [_] + "Returns a map of queue names onto information about the immediate state of the queue.") + (fsync [_] + "Forces an fsync on all modified files.") + (take! + [_ q-name] + [_ q-name timeout timeout-val] + "A blocking dequeue from `name`. If `timeout` is specified, returns `timeout-val` if + no task is available within `timeout` milliseconds.") + (put! + [_ q-name task-descriptor] + [_ q-name task-descriptor timeout] + "A blocking enqueue to `name`. If `timeout` is specified, returns `false` if unable to + enqueue within `timeout` milliseconds, `true` otherwise.")) + +(defn queues + "Creates a point of interaction for queues, backed by disk storage in `directory`. + + The following options can be specified: + + max-queue-size - the maximum number of elements that can be in the queue before `put!` + blocks. Defaults to `Integer/MAX_VALUE`. + + complete? - a predicate that is run on pre-existing tasks to check if they were already + completed. If the tasks in the queue are non-idempotent, this must be + specified for correct behavior. Defaults to always returning false. + + slab-size - The size, in bytes, of the backing files for the queue. Defaults to 16mb. + + fsync-put? - if true, each `put!` will force an fsync. Defaults to true. + + fsync-take? - if true, each `take!` will force an fsync. Defaults to false." + ([directory] + (queues directory nil)) + ([directory + {:keys [max-queue-size + complete? + slab-size + fsync-put? + fsync-take? + fsync-threshold + fsync-interval] + :or {max-queue-size Integer/MAX_VALUE + complete? nil + slab-size (* 64 1024 1024) + fsync-put? true + fsync-take? false}}] + + (assert + (not + (and + (or fsync-threshold fsync-interval) + (or fsync-take? fsync-put?))) + "Both batch and per-task fsync options are enabled, which is probably not what you intended.") + + (.mkdirs (io/file directory)) + + (let [ + + queue (memoize (fn [_] (LinkedBlockingQueue. (int max-queue-size)))) + queue-name->files (directory->queue-name->slab-files directory) + + ;; core state stores + queue-name->slabs (atom + (zipmap + (keys queue-name->files) + (->> queue-name->files + (map + (fn [[queue-name files]] + (map #(file->slab % queue-name (queue queue-name)) files))) + vec))) + + queue-name->stats (atom + (zipmap + (keys queue-name->files) + (map + #(initial-stats (count (queue %))) + (keys queue-name->files)))) + + queue-name->current-slab (atom {}) + + ;; initialize + create-new-slab (fn [q-name] + (let [slab (create-slab directory q-name (queue q-name) slab-size) + empty-slabs (->> (@queue-name->slabs q-name) + (filter (fn [slab] + (->> slab + (remove #(= :complete (status %))) + empty?))) + set)] + + ;; delete empty slabs + (doseq [s empty-slabs] + (delete-slab s)) + + ;; update list of active slabs + (swap! queue-name->slabs update-in [q-name] + #(conj (vec (remove empty-slabs %)) slab)) + + ;; unmap all slabs but the first (which is being consumed) + ;; and the last (which is being written to) + (doseq [s (-> (@queue-name->slabs q-name) rest butlast)] + (unmap s)) + slab)) + + populate-stats! #(when-not (contains? @queue-name->stats %) + (swap! queue-name->stats assoc % (initial-stats 0))) + + this-ref (promise) + + action-counter (AtomicLong. 0) + + mark-action! (if fsync-threshold + (fn [] + (when (zero? (rem (.incrementAndGet action-counter) fsync-threshold)) + (fsync @this-ref))) + (fn []))] + + ;; + (when fsync-interval + (future + (let [ref (WeakReference. @this-ref)] + (while (.get ref) + (when-let [q (.get ref)] + (try + (let [start (System/currentTimeMillis)] + (fsync q) + (let [end (System/currentTimeMillis)] + (Thread/sleep (long (max 0 (- fsync-interval (- end start))))))) + (catch Throwable _))))))) + + ;; populate queues with pre-existing tasks + (let [empty-slabs (atom #{})] + (doseq [[q slabs] @queue-name->slabs] + (let [^LinkedBlockingQueue q' (queue q)] + (doseq [slab slabs] + (let [tasks (->> slab + (map #(vary-meta % assoc + ::this this-ref + ::queue q' + ::queue-name q + ::fsync? fsync-take?)) + (remove #(or (= :complete (status %)) + (and complete? (complete? @%)))))] + + (if (empty? tasks) + + ;; if there aren't any active tasks, just delete the slab + (do + (delete-slab slab) + (swap! empty-slabs conj slab)) + + (do + (doseq [task tasks] + (status! task :incomplete) + (when-not (.offer q' task) + (throw + (IllegalArgumentException. + "'max-queue-size' insufficient to hold existing tasks.")))) + (unmap slab))))) + + (let [^AtomicLong counter (get-in @queue-name->stats [q :enqueued])] + (.addAndGet counter (count (queue q)))))) + + (swap! queue-name->slabs + (fn [m] + (->> m + (map + (fn [[q slabs]] + [q (remove @empty-slabs slabs)])) + (into {}))))) + + (deliver this-ref + (reify + + java.io.Closeable + (close [_] + (doseq [s (->> @queue-name->slabs vals (apply concat))] + (unmap s))) + + IQueues + + (delete! [_] + (doseq [s (->> @queue-name->slabs vals (apply concat))] + (unmap s) + (delete-slab s))) + + (fsync [_] + (doseq [slab (->> @queue-name->slabs vals (apply concat))] + (sync! slab))) + + (mark-retry! [_ q-name] + (mark-action!) + (populate-stats! q-name) + (let [^AtomicLong retry-counter (get-in @queue-name->stats [q-name :retried])] + (.incrementAndGet retry-counter))) + + (mark-complete! [_ q-name] + (mark-action!) + (populate-stats! q-name) + (let [^AtomicLong retry-counter (get-in @queue-name->stats [q-name :completed])] + (.incrementAndGet retry-counter))) + + (stats [_] + (let [ks (keys @queue-name->stats)] + (zipmap ks + (map + (fn [q-name] + (merge + {:num-slabs (-> @queue-name->slabs (get q-name) count) + :num-active-slabs (->> (get @queue-name->slabs q-name) + (filter mapped?) + count)} + (immediate-stats (queue q-name) (get @queue-name->stats q-name)))) + ks)))) + + (take! [_ q-name timeout timeout-val] + (let [q-name (munge (name q-name)) + ^LinkedBlockingQueue q (queue q-name)] + (try + (if-let [t (if (zero? timeout) + (.poll q) + (.poll q timeout TimeUnit/MILLISECONDS))] + + (let [slab (:slab t)] + + ;; if we've moved onto a new slab, unmap all but the current and + ;; last slabs + (let [old-slab (@queue-name->current-slab q-name)] + (when-not (= slab old-slab) + (swap! queue-name->current-slab assoc q-name slab) + (doseq [s (->> (get @queue-name->slabs q-name) + butlast + (remove #(= slab %)))] + (unmap s)))) + + (status! t :in-progress) + ;; we don't need to fsync here, because in-progress and incomplete + ;; are effectively equivalent on restart + + t) + timeout-val) + (catch TimeoutException _ + timeout-val)))) + + (take! [this q-name] + (take! this q-name Long/MAX_VALUE nil)) + + (put! [_ q-name task-descriptor timeout] + (let [q-name (munge (name q-name)) + ^LinkedBlockingQueue q (queue q-name) + slab! (fn [] + (let [slabs (@queue-name->slabs q-name) + slab (last slabs) + task (when slab + (append-to-slab! slab task-descriptor)) + + ;; if no task was created, we need to create a new slab file + ;; and try again + slab (if task + slab + (create-new-slab q-name)) + task (or task (append-to-slab! slab task-descriptor))] + + (when-not task + (throw + (IllegalArgumentException. + (str "Can't enqueue task whose serialized representation is larger than :slab-size, which is currently " slab-size)))) + + (when fsync-put? + (sync! slab)) + task)) + + queue! (fn [task] + (if (zero? timeout) + (.offer q task) + (.offer q task timeout TimeUnit/MILLISECONDS)))] + (if (locking q + (queue! + (vary-meta (slab!) assoc + ::this this-ref + ::queue-name q-name + ::queue q + ::fsync? fsync-take?))) + (do + (populate-stats! q-name) + (let [^AtomicLong counter (get-in @queue-name->stats [q-name :enqueued])] + (.incrementAndGet counter)) + true) + false))) + + (put! [this q-name task-descriptor] + (put! this q-name task-descriptor Long/MAX_VALUE)))) + + @this-ref))) + +;;; + +(defn task-seq + "Returns an infinite lazy sequence of tasks for `q-name`." + [qs q-name] + (lazy-seq + (cons + (take! qs q-name) + (task-seq qs q-name)))) + +(defn immediate-task-seq + "Returns a finite lazy sequence of tasks for `q-name` which terminates once there are + no more tasks immediately available." + [qs q-name] + (lazy-seq + (let [task (take! qs q-name 0 ::none)] + (when-not (= ::none task) + (cons + task + (immediate-task-seq qs q-name)))))) + +(defn interval-task-seq + "Returns a lazy sequence of tasks that can be consumed in `interval` milliseconds. This will + terminate after that time has elapsed, even if there are still tasks immediately available." + [qs q-name interval] + (let [now (System/currentTimeMillis)] + (lazy-seq + (let [now' (System/currentTimeMillis) + remaining (- interval (- now' now))] + (when (pos? remaining) + (let [task (take! qs q-name remaining ::none)] + (when-not (= ::none task) + (cons + task + (interval-task-seq qs q-name (- interval (- (System/currentTimeMillis) now))))))))))) + +(defn complete! + "Marks a task as complete." + [task] + (if (identical? :complete (status task)) + false + (do + (status! task :complete) + (when (-> task meta ::fsync?) + (sync! (:slab task))) + (mark-complete! @(-> task meta ::this) (-> task meta ::queue-name)) + true))) + +(defn retry! + "Marks a task as available for retry." + [task] + (if (or + (identical? :complete (status task)) + (identical? :incomplete (status task))) + false + (do + (status! task :incomplete) + (when (-> task meta ::fsync?) + (sync! (:slab task))) + (mark-retry! @(-> task meta ::this) (-> task meta ::queue-name)) + (let [^LinkedBlockingQueue q (-> task meta ::queue)] + (.put q task)) + true))) diff --git a/src/durable_queue.clj b/src/durable_queue.clj index 05cfd50..405ecea 100644 --- a/src/durable_queue.clj +++ b/src/durable_queue.clj @@ -1,4 +1,5 @@ -(ns durable-queue +(ns ^:no-doc durable-queue + "DEPRECATED in 0.2.0: use clj-commons.durable-queue instead." (:require [clj-commons.byte-streams :as bs] [clj-commons.primitive-math :as p] @@ -11,19 +12,19 @@ RandomAccessFile Writer] [java.lang.ref - WeakReference] + WeakReference] [java.lang.reflect - Method] + Method] [java.nio ByteBuffer MappedByteBuffer] [java.nio.channels - FileChannel$MapMode] + FileChannel$MapMode] [java.util.concurrent LinkedBlockingQueue TimeUnit TimeoutException] [java.util.concurrent.atomic - AtomicLong] + AtomicLong] [java.util.concurrent.locks - ReentrantReadWriteLock] + ReentrantReadWriteLock] [java.util.zip - CRC32])) + CRC32])) ;;; diff --git a/test/clj_commons/durable_queue_test.clj b/test/clj_commons/durable_queue_test.clj new file mode 100644 index 0000000..5e460e2 --- /dev/null +++ b/test/clj_commons/durable_queue_test.clj @@ -0,0 +1,133 @@ +(ns clj-commons.durable-queue-test + (:require + [clj-commons.durable-queue :as dq] + [clojure.java.io :as io] + [clojure.test :refer [deftest is]] + [criterium.core :as c])) + +(defn clear-tmp-directory [] + (doseq [f (->> (#'clj-commons.durable-queue/directory->queue-name->slab-files "/tmp") + vals + (apply concat))] + (.delete (io/file f)))) + +(deftest test-basic-put-take + (clear-tmp-directory) + (let [q (dq/queues "/tmp" {:slab-size 1024}) + tasks (range 1e4)] + (doseq [t tasks] + (dq/put! q :foo t)) + (is (= tasks (map deref (dq/immediate-task-seq q :foo)))) + (dq/delete! q))) + +(deftest test-partial-slab-writes + (clear-tmp-directory) + (dotimes [i 10] + (dq/put! (dq/queues "/tmp") :foo i)) + (is (= (range 10) (map deref (dq/immediate-task-seq (dq/queues "/tmp") :foo))))) + +(deftest test-retry + (clear-tmp-directory) + (with-open [^java.io.Closeable q (dq/queues "/tmp")] + + (doseq [t (range 10)] + (dq/put! q :foo t)) + + (let [tasks' (dq/immediate-task-seq q :foo)] + (is (= (range 10) (map deref tasks'))) + (doseq [t (take 5 tasks')] + (dq/complete! t)) + (doseq [t (range 10 15)] + (dq/put! q :foo t)))) + + ;; create a new manager, which will mark all in-progress tasks as incomplete + (with-open [^java.io.Closeable q (dq/queues "/tmp")] + (let [tasks' (dq/immediate-task-seq q :foo)] + (is (= (range 5 15) (map deref tasks'))) + (doseq [t (take 5 tasks')] + (dq/complete! t)))) + + (with-open [^java.io.Closeable q (dq/queues "/tmp")] + (let [tasks' (dq/immediate-task-seq q :foo)] + (is (= (range 10 15) (map deref tasks'))) + (doseq [t (range 15 20)] + (dq/put! q :foo t)))) + + (let [q (dq/queues "/tmp" {:complete? even?})] + (is (= (remove even? (range 10 20)) (map deref (dq/immediate-task-seq q :foo)))))) + +;;; + +(deftest ^:benchmark benchmark-put-take + (clear-tmp-directory) + + (println "\n\n-- sync both") + (let [q (dq/queues "/tmp" {:fsync-put? true, :fsync-take? true})] + (c/quick-bench + (do + (dq/put! q :foo 1) + (dq/complete! (dq/take! q :foo))))) + + (println "\n\n-- sync take") + (let [q (dq/queues "/tmp" {:fsync-put? false, :fsync-take? true})] + (c/quick-bench + (do + (dq/put! q :foo 1) + (dq/complete! (dq/take! q :foo))))) + + (println "\n\n-- sync put") + (let [q (dq/queues "/tmp" {:fsync-put? true, :fsync-take? false})] + (c/quick-bench + (do + (dq/put! q :foo 1) + (dq/complete! (dq/take! q :foo))))) + + (println "\n\n-- sync every 10 writes") + (let [q (dq/queues "/tmp" {:fsync-put? false, :fsync-threshold 10})] + (c/quick-bench + (do + (dq/put! q :foo 1) + (dq/complete! (dq/take! q :foo))))) + + (println "\n\n-- sync every 100 writes") + (let [q (dq/queues "/tmp" {:fsync-put? false, :fsync-threshold 100})] + (c/quick-bench + (do + (dq/put! q :foo 1) + (dq/complete! (dq/take! q :foo))))) + + (println "\n\n-- sync every 100ms") + (let [q (dq/queues "/tmp" {:fsync-put? false, :fsync-interval 100})] + (c/quick-bench + (do + (dq/put! q :foo 1) + (dq/complete! (dq/take! q :foo))))) + + (println "\n\n-- sync neither") + (let [q (dq/queues "/tmp" {:fsync-put? false, :fsync-take? false})] + (c/quick-bench + (do + (dq/put! q :foo 1) + (dq/complete! (dq/take! q :foo)))))) + +;;; + +(deftest ^:stress stress-queue-size + (clear-tmp-directory) + + (with-open [^java.io.Closeable q (dq/queues "/tmp")] + (let [ary (byte-array 1e6)] + (dotimes [i 1e6] + (aset ary i (byte (rand-int 127)))) + (dotimes [_ 1e5] + (dq/put! q :stress ary)))) + + (with-open [^java.io.Closeable q (dq/queues "/tmp" {:complete? (constantly false)})] + (let [s (doall (dq/immediate-task-seq q :stress))] + (doseq [t s] + (dq/retry! t))) + (let [s (dq/immediate-task-seq q :stress)] + (doseq [t s] + (dq/complete! t)))) + + (clear-tmp-directory)) diff --git a/test/durable_queue_test.clj b/test/durable_queue_test.clj index 0c67f3a..68da3d4 100644 --- a/test/durable_queue_test.clj +++ b/test/durable_queue_test.clj @@ -1,4 +1,5 @@ (ns durable-queue-test + "This tests the DEPRECATED single-segment namespace." (:require [clojure.java.io :as io] [clojure.test :refer [deftest is]] From 7458f53f81a80629af1f4f80c21a24d3e21ea8e6 Mon Sep 17 00:00:00 2001 From: Sean Corfield Date: Tue, 2 Sep 2025 17:40:24 -0400 Subject: [PATCH 08/17] fixes #21 by using nanoTime Signed-off-by: Sean Corfield --- src/clj_commons/durable_queue.clj | 14 +++++++------- src/durable_queue.clj | 14 +++++++------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/clj_commons/durable_queue.clj b/src/clj_commons/durable_queue.clj index 472c6c3..3953d42 100644 --- a/src/clj_commons/durable_queue.clj +++ b/src/clj_commons/durable_queue.clj @@ -550,10 +550,10 @@ (while (.get ref) (when-let [q (.get ref)] (try - (let [start (System/currentTimeMillis)] + (let [start (System/nanoTime)] (fsync q) - (let [end (System/currentTimeMillis)] - (Thread/sleep (long (max 0 (- fsync-interval (- end start))))))) + (let [end (System/nanoTime)] + (Thread/sleep (long (max 0 (- (* 1000000 fsync-interval) (- end start))))))) (catch Throwable _))))))) ;; populate queues with pre-existing tasks @@ -746,16 +746,16 @@ "Returns a lazy sequence of tasks that can be consumed in `interval` milliseconds. This will terminate after that time has elapsed, even if there are still tasks immediately available." [qs q-name interval] - (let [now (System/currentTimeMillis)] + (let [now (System/nanoTime)] (lazy-seq - (let [now' (System/currentTimeMillis) - remaining (- interval (- now' now))] + (let [now' (System/nanoTime) + remaining (- (* 1000000 interval) (- now' now))] (when (pos? remaining) (let [task (take! qs q-name remaining ::none)] (when-not (= ::none task) (cons task - (interval-task-seq qs q-name (- interval (- (System/currentTimeMillis) now))))))))))) + (interval-task-seq qs q-name (- (* 1000000 interval) (- (System/nanoTime) now))))))))))) (defn complete! "Marks a task as complete." diff --git a/src/durable_queue.clj b/src/durable_queue.clj index 405ecea..c0823fc 100644 --- a/src/durable_queue.clj +++ b/src/durable_queue.clj @@ -551,10 +551,10 @@ (while (.get ref) (when-let [q (.get ref)] (try - (let [start (System/currentTimeMillis)] + (let [start (System/nanoTime)] (fsync q) - (let [end (System/currentTimeMillis)] - (Thread/sleep (long (max 0 (- fsync-interval (- end start))))))) + (let [end (System/nanoTime)] + (Thread/sleep (long (max 0 (- (* 1000000 fsync-interval) (- end start))))))) (catch Throwable _))))))) ;; populate queues with pre-existing tasks @@ -747,16 +747,16 @@ "Returns a lazy sequence of tasks that can be consumed in `interval` milliseconds. This will terminate after that time has elapsed, even if there are still tasks immediately available." [qs q-name interval] - (let [now (System/currentTimeMillis)] + (let [now (System/nanoTime)] (lazy-seq - (let [now' (System/currentTimeMillis) - remaining (- interval (- now' now))] + (let [now' (System/nanoTime) + remaining (- (* 1000000 interval) (- now' now))] (when (pos? remaining) (let [task (take! qs q-name remaining ::none)] (when-not (= ::none task) (cons task - (interval-task-seq qs q-name (- interval (- (System/currentTimeMillis) now))))))))))) + (interval-task-seq qs q-name (- (* 1000000 interval) (- (System/nanoTime) now))))))))))) (defn complete! "Marks a task as complete." From 176336655af03aa796c2fb415986f52cd3474557 Mon Sep 17 00:00:00 2001 From: Sean Corfield Date: Tue, 2 Sep 2025 17:42:42 -0400 Subject: [PATCH 09/17] fixes #20 by updating docstring Signed-off-by: Sean Corfield --- src/clj_commons/durable_queue.clj | 8 +++++++- src/durable_queue.clj | 8 +++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/src/clj_commons/durable_queue.clj b/src/clj_commons/durable_queue.clj index 3953d42..c6ebef3 100644 --- a/src/clj_commons/durable_queue.clj +++ b/src/clj_commons/durable_queue.clj @@ -456,7 +456,13 @@ fsync-put? - if true, each `put!` will force an fsync. Defaults to true. - fsync-take? - if true, each `take!` will force an fsync. Defaults to false." + fsync-take? - if true, each `take!` will force an fsync. Defaults to false. + + fsync-threshold - The maximum number of writes (puts, takes, retries, completes) that + can be performed before an fsync is performed. + + fsync-interval - The maximum amount of time, in milliseconds, that can elapse before + an fsync is performed." ([directory] (queues directory nil)) ([directory diff --git a/src/durable_queue.clj b/src/durable_queue.clj index c0823fc..eb17134 100644 --- a/src/durable_queue.clj +++ b/src/durable_queue.clj @@ -457,7 +457,13 @@ fsync-put? - if true, each `put!` will force an fsync. Defaults to true. - fsync-take? - if true, each `take!` will force an fsync. Defaults to false." + fsync-take? - if true, each `take!` will force an fsync. Defaults to false. + + fsync-threshold - The maximum number of writes (puts, takes, retries, completes) that + can be performed before an fsync is performed. + + fsync-interval - The maximum amount of time, in milliseconds, that can elapse before + an fsync is performed." ([directory] (queues directory nil)) ([directory From d146c06cb9de7cfcfaf0612cea8f4eab49230aee Mon Sep 17 00:00:00 2001 From: Sean Corfield Date: Sat, 6 Sep 2025 16:10:46 -0400 Subject: [PATCH 10/17] ignore rebel history Signed-off-by: Sean Corfield --- .clj-kondo/imports/org.clj-commons/durable-queue/config.edn | 2 ++ .gitignore | 1 + 2 files changed, 3 insertions(+) create mode 100644 .clj-kondo/imports/org.clj-commons/durable-queue/config.edn diff --git a/.clj-kondo/imports/org.clj-commons/durable-queue/config.edn b/.clj-kondo/imports/org.clj-commons/durable-queue/config.edn new file mode 100644 index 0000000..9e218c2 --- /dev/null +++ b/.clj-kondo/imports/org.clj-commons/durable-queue/config.edn @@ -0,0 +1,2 @@ +{:lint-as {clj-commons.durable-queue/with-buffer clojure.core/let + durable-queue/with-buffer clojure.core/let}} diff --git a/.gitignore b/.gitignore index 52461bc..9f7fb2e 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ .idea/ .lsp/.cache .portal/vs-code.edn +.rebel_readline_history *.class *.iml *.jar From 2d7c81c970b6c18288768c654b8aa6ef5c61092f Mon Sep 17 00:00:00 2001 From: Sean Corfield Date: Sat, 6 Sep 2025 17:11:51 -0400 Subject: [PATCH 11/17] fixes #30 by adding delete-queue! and delete-all! and deprecating delete! as dangerous Signed-off-by: Sean Corfield --- CHANGELOG.md | 2 ++ README.md | 14 ++++++++ src/clj_commons/durable_queue.clj | 48 +++++++++++++++++-------- test/clj_commons/durable_queue_test.clj | 3 +- 4 files changed, 50 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c8fe949..373cfb3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ v0.2.0 in progress * Deprecated single-segment `durable-queue` ns; use `clj-commons.durable-queue` instead +* Deprecate `delete!` because it is dangerous: it leaves queues in a corrupted state (because it deletes files without cleaning up in-memory state). +* Address [#30](https://github.com/clj-commons/durable-queue/issues/30) by adding `delete-queue!` and `delete-all!` to safely delete an individual queue (and its files) and safely delete all queues (and their files). * Clean up lint issues; export clj-kondo config * Add a changelog! diff --git a/README.md b/README.md index dd2028c..eb33a28 100644 --- a/README.md +++ b/README.md @@ -70,6 +70,20 @@ To get a description of the current state of the queue, we can use `stats`, whic | `:num-slabs` | the number of underlying files which are being used to store tasks | | `:num-active-slabs` | the number of underlying files which are currently open and mapped into memory | +As of version 0.2.0, you can delete an entire queue using `delete-queue!`, which will remove all tasks and delete all files associated with the queue: + +```clj +> (dq/delete-queue! q :foo) +{} +``` + +As of version 0.2.0, you can also delete all queues using `delete-all!`, which will remove all tasks and delete all files associated with all queues: + +```clj +> (dq/delete-all! q) +nil +``` + ### configuring the queues `queues` can be given a number of different options, which can affect its performance and correctness. diff --git a/src/clj_commons/durable_queue.clj b/src/clj_commons/durable_queue.clj index c6ebef3..faefa24 100644 --- a/src/clj_commons/durable_queue.clj +++ b/src/clj_commons/durable_queue.clj @@ -424,7 +424,11 @@ (^:private mark-complete! [_ q-name]) (^:private mark-retry! [_ q-name]) (delete! [_] - "Deletes all files associated with the queues.") + "DEPRECATED: Deletes all files associated with all queues. Dangerous!") + (delete-queue! [_ q-name] + "Deletes the specific queue, including all associated files.") + (delete-all! [_] + "Deletes all queues, including all associated files.") (stats [_] "Returns a map of queue names onto information about the immediate state of the queue.") (fsync [_] @@ -432,12 +436,12 @@ (take! [_ q-name] [_ q-name timeout timeout-val] - "A blocking dequeue from `name`. If `timeout` is specified, returns `timeout-val` if + "A blocking dequeue from `q-name`. If `timeout` is specified, returns `timeout-val` if no task is available within `timeout` milliseconds.") (put! [_ q-name task-descriptor] [_ q-name task-descriptor timeout] - "A blocking enqueue to `name`. If `timeout` is specified, returns `false` if unable to + "A blocking enqueue to `q-name`. If `timeout` is specified, returns `false` if unable to enqueue within `timeout` milliseconds, `true` otherwise.")) (defn queues @@ -613,11 +617,25 @@ IQueues - (delete! [_] + (delete! [_] ; DEPRECATED and dangerous since it breaks the queues (doseq [s (->> @queue-name->slabs vals (apply concat))] (unmap s) (delete-slab s))) + (delete-queue! [_ q-name] + (let [q-name (munge (name q-name))] + (doseq [s (get @queue-name->slabs q-name)] + (unmap s) + (delete-slab s)) + (.clear (queue q-name)) + (swap! queue-name->stats dissoc q-name) + (swap! queue-name->slabs dissoc q-name) + (swap! queue-name->current-slab dissoc q-name))) + + (delete-all! [this] + (doseq [q (keys @queue-name->stats )] + (delete-queue! this q))) + (fsync [_] (doseq [slab (->> @queue-name->slabs vals (apply concat))] (sync! slab))) @@ -637,15 +655,15 @@ (stats [_] (let [ks (keys @queue-name->stats)] (zipmap ks - (map - (fn [q-name] - (merge - {:num-slabs (-> @queue-name->slabs (get q-name) count) - :num-active-slabs (->> (get @queue-name->slabs q-name) - (filter mapped?) - count)} - (immediate-stats (queue q-name) (get @queue-name->stats q-name)))) - ks)))) + (map + (fn [q-name] + (merge + {:num-slabs (-> @queue-name->slabs (get q-name) count) + :num-active-slabs (->> (get @queue-name->slabs q-name) + (filter mapped?) + count)} + (immediate-stats (queue q-name) (get @queue-name->stats q-name)))) + ks)))) (take! [_ q-name timeout timeout-val] (let [q-name (munge (name q-name)) @@ -663,8 +681,8 @@ (when-not (= slab old-slab) (swap! queue-name->current-slab assoc q-name slab) (doseq [s (->> (get @queue-name->slabs q-name) - butlast - (remove #(= slab %)))] + butlast + (remove #(= slab %)))] (unmap s)))) (status! t :in-progress) diff --git a/test/clj_commons/durable_queue_test.clj b/test/clj_commons/durable_queue_test.clj index 5e460e2..b2f36d4 100644 --- a/test/clj_commons/durable_queue_test.clj +++ b/test/clj_commons/durable_queue_test.clj @@ -17,8 +17,7 @@ tasks (range 1e4)] (doseq [t tasks] (dq/put! q :foo t)) - (is (= tasks (map deref (dq/immediate-task-seq q :foo)))) - (dq/delete! q))) + (is (= tasks (map deref (dq/immediate-task-seq q :foo)))))) (deftest test-partial-slab-writes (clear-tmp-directory) From f46adf257f81e48e0a634c77b10ef4bc3483a102 Mon Sep 17 00:00:00 2001 From: Sean Corfield Date: Sun, 7 Sep 2025 15:36:30 -0400 Subject: [PATCH 12/17] minor cleanup no more leiningen; sonatype snapshots not needed; log4j2 config not needed document extra cli options for bb testing tasks Signed-off-by: Sean Corfield --- .github/CODEOWNERS | 1 + bb.edn | 6 ++++++ deps.edn | 7 ++----- project.clj | 25 ------------------------- 4 files changed, 9 insertions(+), 30 deletions(-) delete mode 100644 project.clj diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 46e71d0..7c82509 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1 +1,2 @@ * @slipset +* @seancorfield diff --git a/bb.edn b/bb.edn index 2a2f3f8..88d4ba3 100644 --- a/bb.edn +++ b/bb.edn @@ -1,3 +1,9 @@ +;; all tasks that run tests can also accept: +;; all - run tests against all supported Clojure versions +;; bench - run only benchmarks +;; stress - run only stress tests +;; 1.10/1.11/1.12 - run only against the specified Clojure version +;; jdk11/jdk17/jdk21/jdk24 - indicate which JDK you are using (defaults to jdk24) {:tasks {:requires [[clojure.string :as str]] diff --git a/deps.edn b/deps.edn index cdb1bc8..291f70f 100644 --- a/deps.edn +++ b/deps.edn @@ -1,6 +1,4 @@ -{:mvn/repos {"sonatype" {:url "https://oss.sonatype.org/content/repositories/snapshots/"} - "ossrh-snapshots" {:url "https://s01.oss.sonatype.org/content/repositories/snapshots"}} - :paths ["src"] +{:paths ["src"] :deps {org.clojure/clojure {:mvn/version "1.10.3"} com.taoensso/nippy {:mvn/version "3.6.0"} org.clj-commons/primitive-math {:mvn/version "1.0.1"} @@ -21,8 +19,7 @@ :test {:extra-paths ["test"] :extra-deps {io.github.cognitect-labs/test-runner {:git/tag "v0.5.1" :git/sha "dfb30dd"} - criterium/criterium {:mvn/version "0.4.6"}} - :jvm-opts ["-Dlog4j2.configurationFile=log4j2-info.properties"]} + criterium/criterium {:mvn/version "0.4.6"}}} :runner {:main-opts ["-m" "cognitect.test-runner"]} :jdk11 {} :jdk17 {} diff --git a/project.clj b/project.clj deleted file mode 100644 index 453313c..0000000 --- a/project.clj +++ /dev/null @@ -1,25 +0,0 @@ -(defproject org.clj-commons/durable-queue - (or (System/getenv "PROJECT_VERSION") "0.1.7") - :description "a in-process task-queue that is backed by disk." - :url "https://github.com/clj-commons/durable-queue" - :license {:name "Eclipse Public License" - :url "http://www.eclipse.org/legal/epl-v10.html"} - :deploy-repositories [["clojars" {:url "https://repo.clojars.org" - :username :env/clojars_username - :password :env/clojars_password - :sign-releases true}]] - :dependencies [[com.taoensso/nippy "3.6.0"] - [org.clj-commons/primitive-math "1.0.1"] - [org.clj-commons/byte-streams "0.3.4"]] - :profiles {:dev {:dependencies [[org.clojure/clojure "1.10.3"] - [criterium/criterium "0.4.6"]]} - :1.10 {:dependencies [[org.clojure/clojure "1.10.3"]]} - :1.11 {:dependencies [[org.clojure/clojure "1.11.4"]]} - :1.12 {:dependencies [[org.clojure/clojure "1.12.2"]]}} - :global-vars {*warn-on-reflection* true} - :test-selectors {:default #(not (some #{:benchmark :stress} (keys %))) - :benchmark :benchmark - :stress :stress} - :codox {:writer codox-md.writer/write-docs - :include [clj-commons.durable-queue]} - :jvm-opts ^:replace ["-server" "-Xmx100m"]) From 99f252169046c7c9b4f2d10c05113393c4d0004e Mon Sep 17 00:00:00 2001 From: Sean Corfield Date: Sun, 7 Sep 2025 15:43:00 -0400 Subject: [PATCH 13/17] clarify default clojure version Signed-off-by: Sean Corfield --- bb.edn | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bb.edn b/bb.edn index 88d4ba3..4405620 100644 --- a/bb.edn +++ b/bb.edn @@ -2,7 +2,7 @@ ;; all - run tests against all supported Clojure versions ;; bench - run only benchmarks ;; stress - run only stress tests -;; 1.10/1.11/1.12 - run only against the specified Clojure version +;; 1.10/1.11/1.12 - run only against the specified Clojure version (defaults to 1.10) ;; jdk11/jdk17/jdk21/jdk24 - indicate which JDK you are using (defaults to jdk24) {:tasks {:requires [[clojure.string :as str]] From 4842acfafca2f5ffd7752b698f77402ed3c50163 Mon Sep 17 00:00:00 2001 From: Sean Corfield Date: Sun, 21 Sep 2025 15:05:46 -0400 Subject: [PATCH 14/17] remove old ns (to avoid conflicts) Signed-off-by: Sean Corfield --- CHANGELOG.md | 2 +- src/durable_queue.clj | 793 ------------------------------------ test/durable_queue_test.clj | 134 ------ 3 files changed, 1 insertion(+), 928 deletions(-) delete mode 100644 src/durable_queue.clj delete mode 100644 test/durable_queue_test.clj diff --git a/CHANGELOG.md b/CHANGELOG.md index 373cfb3..2afcecc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,7 @@ ## CHANGES v0.2.0 in progress -* Deprecated single-segment `durable-queue` ns; use `clj-commons.durable-queue` instead +* Removed single-segment `durable-queue` ns; use `clj-commons.durable-queue` instead * Deprecate `delete!` because it is dangerous: it leaves queues in a corrupted state (because it deletes files without cleaning up in-memory state). * Address [#30](https://github.com/clj-commons/durable-queue/issues/30) by adding `delete-queue!` and `delete-all!` to safely delete an individual queue (and its files) and safely delete all queues (and their files). * Clean up lint issues; export clj-kondo config diff --git a/src/durable_queue.clj b/src/durable_queue.clj deleted file mode 100644 index eb17134..0000000 --- a/src/durable_queue.clj +++ /dev/null @@ -1,793 +0,0 @@ -(ns ^:no-doc durable-queue - "DEPRECATED in 0.2.0: use clj-commons.durable-queue instead." - (:require - [clj-commons.byte-streams :as bs] - [clj-commons.primitive-math :as p] - [clojure.java.io :as io] - [taoensso.nippy :as nippy]) - (:import - [java.io - File - IOException - RandomAccessFile - Writer] - [java.lang.ref - WeakReference] - [java.lang.reflect - Method] - [java.nio ByteBuffer MappedByteBuffer] - [java.nio.channels - FileChannel$MapMode] - [java.util.concurrent LinkedBlockingQueue TimeUnit TimeoutException] - [java.util.concurrent.atomic - AtomicLong] - [java.util.concurrent.locks - ReentrantReadWriteLock] - [java.util.zip - CRC32])) - -;;; - -(defmacro ^:private with-lock [lock & body] - `(let [^ReentrantReadWriteLock lock# ~lock - read-lock# (.readLock lock#)] - (do - (.lock read-lock#) - (try - ~@body - (finally - (.unlock read-lock#)))))) - -(defmacro ^:private with-exclusive-lock [lock & body] - `(let [^ReentrantReadWriteLock lock# ~lock - write-lock# (.writeLock lock#)] - (do - (.lock write-lock#) - (try - ~@body - (finally - (.unlock write-lock#)))))) - -;;; - -(defn- checksum ^long [^long length ^bytes ary] - (let [crc (CRC32.)] - (dotimes [i 4] - (.update crc (p/>> length i))) - (.update crc ary) - (.getValue crc))) - -;;; - -(def ^:private ^:const header-size 14) - -(defprotocol ITask - (^:private status [_] "Returns the task status") - (^:private status! [_ status] "Sets the task status")) - -(defprotocol ITaskSlab - (^:private unmap [_] "Temporarily releases mapped byte buffer until it's needed again.") - (^:private mapped? [_] "Returns true if the slab is actively mapped into memory.") - (^:private sync! [_]) - (^:private invalidate [_ offset len]) - (^:private ^ByteBuffer buffer [_]) - (^:private append-to-slab! [_ task-descriptor]) - (^:private read-write-lock [_])) - -(defmacro ^:private with-buffer [[buf slab] & body] - `(with-lock (read-write-lock ~slab) - (when-let [~buf (buffer ~slab)] - ~@body))) - -;;; - -(defn create-buffer [filename size] - (let [raf (doto (RandomAccessFile. (io/file filename) "rw") - (.setLength size))] - (try - (let [fc (.getChannel raf)] - (try - (let [buf (.map fc FileChannel$MapMode/READ_WRITE 0 size)] - (doto buf - (.put 0 (byte 0)) - .force)) - (finally - (.close fc)))) - (finally - (.close raf))))) - -(defn load-buffer - ([filename] - (load-buffer filename nil nil)) - ([filename offset length] - (let [_ (assert (.exists (io/file filename))) - raf (RandomAccessFile. (io/file filename) "rw")] - (try - (let [fc (.getChannel raf)] - (try - (.map fc - FileChannel$MapMode/READ_WRITE - (or offset 0) - (or length (.length raf))) - (finally - (.close fc)))) - (finally - (.close raf)))))) - -(let [clean (delay - (doto (.getMethod - (Class/forName "sun.misc.Cleaner") - "clean" - nil) - (.setAccessible true)))] - (defn- unmap-buffer - "A delightful endrun on the JVM's mmap GC mechanism" - [^ByteBuffer buf] - (when (.isDirect buf) - (try - - (let [^Method clean @clean - cleaner (doto (.getMethod (class buf) "cleaner" nil) - (.setAccessible true))] - (.invoke clean - (.invoke cleaner buf nil) - nil)) - (catch Throwable _ - ;; not much we can do here, sadly - ))))) - -(defn- force-buffer - [^MappedByteBuffer buf _offset _length] - (.force buf)) - -;;; - -;; a single task within a slab, assumes that the buffer is sliced around -;; the task's boundaries -(defrecord Task - [slab - ^long offset - ^long length - status - deserializer] - clojure.lang.IDeref - (deref [_] - (deserializer)) - ITask - (status [_] - (with-buffer [buf slab] - (or @status - (let [s (case (.get buf (p/+ offset 1)) - 0 :incomplete - 1 :in-progress - 2 :complete)] - (reset! status s) - s)))) - (status! [_ s] - (with-buffer [buf slab] - (reset! status s) - (.put buf (p/+ offset 1) - (byte - (case s - :incomplete 0 - :in-progress 1 - :complete 2))) - (invalidate slab (p/+ offset 1) 1) - nil))) - -(defn- task [slab offset len] - (Task. - slab - offset - len - (atom nil) - (fn [] - (with-buffer [buf slab] - (let [^ByteBuffer buf (-> buf - (.position ^Long offset) - ^ByteBuffer - (.limit ^Long (+ offset len)) - .slice) - checksum' (.getLong buf 2) - ary (bs/to-byte-array (.position buf header-size))] - (when-not (== (checksum (.getInt buf 10) ary) checksum') - (throw (IOException. "checksum mismatch"))) - (nippy/thaw ary)))))) - -(defmethod print-method Task [t ^Writer w] - (.write w - (str "< " (status t) " | " (pr-str @t) " >"))) - -;;; - -;; the byte layout is -;; [ exists? : int8 -;; state : int8 -;; checksum : int64 -;; size : int32 -;; payload : array ] -;; valid values for 'exists' is 0 (no), 1 (yes) -;; valid values for 'state' is 0 (unclaimed), 1 (in progress), 2 (complete) -(defn- slab->task-seq - "Takes a slab, and returns a sequence of the tasks it contains." - ([slab] - (slab->task-seq slab 0)) - ([slab ^long pos] - (with-buffer [buf slab] - (try - (let [^ByteBuffer buf' (.position buf pos)] - - ;; is there a next task, and is there space left in the buffer? - (when (and - (<= header-size (.remaining buf')) - (== 1 (.get buf'))) - - (lazy-seq - (with-buffer [buf slab] - (let [^ByteBuffer buf' (.position buf (p/inc pos)) - _status (.get buf') - _checksum (.getLong buf') - size (.getInt buf')] - - ;; this shouldn't be necessary, but let's not gratuitously - ;; overreach our bounds - (when (< size (.remaining buf')) - (cons - - (task - slab - pos - (+ header-size size)) - - (slab->task-seq - slab - (+ pos header-size size))))))))) - (catch Throwable _ - ;; this implies unrecoverable corruption - nil - ))))) - -(deftype TaskSlab - [filename - q-name - queue - buf ;; a clearable atom holding the buffer - position ;; an atom storing the write position of the slab - lock - dirty ;; an atom containing an interval of dirty bytes - ] - ITaskSlab - - (read-write-lock [_] - lock) - - (buffer [_] - (let [buf (or @buf - (swap! buf - (fn [buf] - (or buf (load-buffer filename)))))] - (.duplicate ^ByteBuffer buf))) - - (mapped? [_] - (boolean @buf)) - - (unmap [_] - (with-exclusive-lock lock - (when-let [b @buf] - (reset! buf nil) - (unmap-buffer b)))) - - (invalidate [_ start' len] - (let [end' (+ start' len)] - (swap! dirty - (fn [[start end]] - [(min start start') (max end end')])))) - - (sync! [this] - (let [[start end] @dirty] - (when (< start end) - (with-buffer [_ this] - (let [buf @buf] - (force-buffer buf start (- end start)) - (compare-and-set! dirty [start end] [Integer/MAX_VALUE 0]) - nil))))) - - (append-to-slab! [this task-descriptor] - (with-buffer [buf this] - (let [ary (nippy/freeze task-descriptor) - cnt (count ary) - pos @position - ^ByteBuffer buf (.position buf ^Long pos)] - - (when (> (.remaining buf) (+ (count ary) header-size)) - ;; write to the buffer - (doto buf - (.position ^Long pos) - (.put (byte 1)) ;; exists - (.put (byte 0)) ;; incomplete - (.putLong (checksum cnt ary)) - (.putInt cnt) - (.put ^bytes ary) - (.put (byte 0))) ;; next doesn't exist - - (swap! position + header-size cnt) - - (invalidate this pos (+ header-size cnt)) - - ;; return a task to enqueue in-memory - (task - this - pos - (+ header-size cnt)))))) - - clojure.lang.Seqable - (seq [this] - (slab->task-seq this)) - - Comparable - (compareTo [_ x] - (assert (instance? TaskSlab x)) - (compare filename (.filename ^TaskSlab x)))) - -(def ^:private fs-monitor (Object.)) - -(defn- delete-slab - [^TaskSlab slab] - (locking fs-monitor - (unmap slab) - (.delete (io/file (.filename slab))))) - -(defn- create-slab - "Creates a new slab file, ensuring a new file name that is lexicographically greater than - any existing files for that queue name." - ([directory q-name queue size] - (locking fs-monitor - (let [pattern (re-pattern (str "^" q-name "_(\\d{6}$)")) - last-number (->> directory - io/file - .listFiles - (map #(.getName ^File %)) - (map #(second (re-find pattern %))) - (remove nil?) - (map #(Long/parseLong %)) - sort - last) - n (if last-number (inc last-number) 0) - f (io/file (str directory "/" q-name "_" (format "%06d" n)))] - - (when-not (.createNewFile f) - (throw (IOException. (str "Could not create new slab file at " (.getAbsolutePath f))))) - - (TaskSlab. - (.getAbsolutePath f) - q-name - queue - (atom (create-buffer f size)) - (atom 0) - (ReentrantReadWriteLock.) - (atom [Integer/MAX_VALUE 0])))))) - -(defn- file->slab - "Transforms a file into a slab representing that file's contents." - [filename q-name queue] - (let [pos (atom 0) - slab (TaskSlab. - filename - q-name - queue - (atom nil) - pos - (ReentrantReadWriteLock.) - (atom [Integer/MAX_VALUE 0])) - len (->> slab - (map :length) - (reduce +))] - (reset! pos len) - (unmap slab) - slab)) - -(defn- directory->queue-name->slab-files - "Returns a map of queue names onto slab files for that queue." - [directory] - (let [queue->file (->> directory - io/file - .listFiles - (filter #(re-find #"^\w+_\d{6}$" (.getName ^File %))) - (group-by #(second (re-find #"^(\w+)_\d{6}$" (.getName ^File %)))))] - (zipmap - (keys queue->file) - (map - (fn [files] - (->> files - (map #(.getAbsolutePath ^File %)) - sort)) - (vals queue->file))))) - -;;; - -(defn- initial-stats [^long count] - {:enqueued (AtomicLong. count) - :retried (AtomicLong. 0) - :completed (AtomicLong. 0)}) - -(defn- immediate-stats [^LinkedBlockingQueue q {:keys [enqueued retried completed]}] - (let [cnt (.size q) - completed (.get ^AtomicLong completed) - enqueued (.get ^AtomicLong enqueued)] - {:enqueued enqueued - :retried (.get ^AtomicLong retried) - :completed completed - :in-progress (- (- enqueued completed) cnt)})) - -;;; - -(defprotocol IQueues - (^:private mark-complete! [_ q-name]) - (^:private mark-retry! [_ q-name]) - (delete! [_] - "Deletes all files associated with the queues.") - (stats [_] - "Returns a map of queue names onto information about the immediate state of the queue.") - (fsync [_] - "Forces an fsync on all modified files.") - (take! - [_ q-name] - [_ q-name timeout timeout-val] - "A blocking dequeue from `name`. If `timeout` is specified, returns `timeout-val` if - no task is available within `timeout` milliseconds.") - (put! - [_ q-name task-descriptor] - [_ q-name task-descriptor timeout] - "A blocking enqueue to `name`. If `timeout` is specified, returns `false` if unable to - enqueue within `timeout` milliseconds, `true` otherwise.")) - -(defn queues - "Creates a point of interaction for queues, backed by disk storage in `directory`. - - The following options can be specified: - - max-queue-size - the maximum number of elements that can be in the queue before `put!` - blocks. Defaults to `Integer/MAX_VALUE`. - - complete? - a predicate that is run on pre-existing tasks to check if they were already - completed. If the tasks in the queue are non-idempotent, this must be - specified for correct behavior. Defaults to always returning false. - - slab-size - The size, in bytes, of the backing files for the queue. Defaults to 16mb. - - fsync-put? - if true, each `put!` will force an fsync. Defaults to true. - - fsync-take? - if true, each `take!` will force an fsync. Defaults to false. - - fsync-threshold - The maximum number of writes (puts, takes, retries, completes) that - can be performed before an fsync is performed. - - fsync-interval - The maximum amount of time, in milliseconds, that can elapse before - an fsync is performed." - ([directory] - (queues directory nil)) - ([directory - {:keys [max-queue-size - complete? - slab-size - fsync-put? - fsync-take? - fsync-threshold - fsync-interval] - :or {max-queue-size Integer/MAX_VALUE - complete? nil - slab-size (* 64 1024 1024) - fsync-put? true - fsync-take? false}}] - - (assert - (not - (and - (or fsync-threshold fsync-interval) - (or fsync-take? fsync-put?))) - "Both batch and per-task fsync options are enabled, which is probably not what you intended.") - - (.mkdirs (io/file directory)) - - (let [ - - queue (memoize (fn [_] (LinkedBlockingQueue. (int max-queue-size)))) - queue-name->files (directory->queue-name->slab-files directory) - - ;; core state stores - queue-name->slabs (atom - (zipmap - (keys queue-name->files) - (->> queue-name->files - (map - (fn [[queue-name files]] - (map #(file->slab % queue-name (queue queue-name)) files))) - vec))) - - queue-name->stats (atom - (zipmap - (keys queue-name->files) - (map - #(initial-stats (count (queue %))) - (keys queue-name->files)))) - - queue-name->current-slab (atom {}) - - ;; initialize - create-new-slab (fn [q-name] - (let [slab (create-slab directory q-name (queue q-name) slab-size) - empty-slabs (->> (@queue-name->slabs q-name) - (filter (fn [slab] - (->> slab - (remove #(= :complete (status %))) - empty?))) - set)] - - ;; delete empty slabs - (doseq [s empty-slabs] - (delete-slab s)) - - ;; update list of active slabs - (swap! queue-name->slabs update-in [q-name] - #(conj (vec (remove empty-slabs %)) slab)) - - ;; unmap all slabs but the first (which is being consumed) - ;; and the last (which is being written to) - (doseq [s (-> (@queue-name->slabs q-name) rest butlast)] - (unmap s)) - slab)) - - populate-stats! #(when-not (contains? @queue-name->stats %) - (swap! queue-name->stats assoc % (initial-stats 0))) - - this-ref (promise) - - action-counter (AtomicLong. 0) - - mark-action! (if fsync-threshold - (fn [] - (when (zero? (rem (.incrementAndGet action-counter) fsync-threshold)) - (fsync @this-ref))) - (fn []))] - - ;; - (when fsync-interval - (future - (let [ref (WeakReference. @this-ref)] - (while (.get ref) - (when-let [q (.get ref)] - (try - (let [start (System/nanoTime)] - (fsync q) - (let [end (System/nanoTime)] - (Thread/sleep (long (max 0 (- (* 1000000 fsync-interval) (- end start))))))) - (catch Throwable _))))))) - - ;; populate queues with pre-existing tasks - (let [empty-slabs (atom #{})] - (doseq [[q slabs] @queue-name->slabs] - (let [^LinkedBlockingQueue q' (queue q)] - (doseq [slab slabs] - (let [tasks (->> slab - (map #(vary-meta % assoc - ::this this-ref - ::queue q' - ::queue-name q - ::fsync? fsync-take?)) - (remove #(or (= :complete (status %)) - (and complete? (complete? @%)))))] - - (if (empty? tasks) - - ;; if there aren't any active tasks, just delete the slab - (do - (delete-slab slab) - (swap! empty-slabs conj slab)) - - (do - (doseq [task tasks] - (status! task :incomplete) - (when-not (.offer q' task) - (throw - (IllegalArgumentException. - "'max-queue-size' insufficient to hold existing tasks.")))) - (unmap slab))))) - - (let [^AtomicLong counter (get-in @queue-name->stats [q :enqueued])] - (.addAndGet counter (count (queue q)))))) - - (swap! queue-name->slabs - (fn [m] - (->> m - (map - (fn [[q slabs]] - [q (remove @empty-slabs slabs)])) - (into {}))))) - - (deliver this-ref - (reify - - java.io.Closeable - (close [_] - (doseq [s (->> @queue-name->slabs vals (apply concat))] - (unmap s))) - - IQueues - - (delete! [_] - (doseq [s (->> @queue-name->slabs vals (apply concat))] - (unmap s) - (delete-slab s))) - - (fsync [_] - (doseq [slab (->> @queue-name->slabs vals (apply concat))] - (sync! slab))) - - (mark-retry! [_ q-name] - (mark-action!) - (populate-stats! q-name) - (let [^AtomicLong retry-counter (get-in @queue-name->stats [q-name :retried])] - (.incrementAndGet retry-counter))) - - (mark-complete! [_ q-name] - (mark-action!) - (populate-stats! q-name) - (let [^AtomicLong retry-counter (get-in @queue-name->stats [q-name :completed])] - (.incrementAndGet retry-counter))) - - (stats [_] - (let [ks (keys @queue-name->stats)] - (zipmap ks - (map - (fn [q-name] - (merge - {:num-slabs (-> @queue-name->slabs (get q-name) count) - :num-active-slabs (->> (get @queue-name->slabs q-name) - (filter mapped?) - count)} - (immediate-stats (queue q-name) (get @queue-name->stats q-name)))) - ks)))) - - (take! [_ q-name timeout timeout-val] - (let [q-name (munge (name q-name)) - ^LinkedBlockingQueue q (queue q-name)] - (try - (if-let [t (if (zero? timeout) - (.poll q) - (.poll q timeout TimeUnit/MILLISECONDS))] - - (let [slab (:slab t)] - - ;; if we've moved onto a new slab, unmap all but the current and - ;; last slabs - (let [old-slab (@queue-name->current-slab q-name)] - (when-not (= slab old-slab) - (swap! queue-name->current-slab assoc q-name slab) - (doseq [s (->> (get @queue-name->slabs q-name) - butlast - (remove #(= slab %)))] - (unmap s)))) - - (status! t :in-progress) - ;; we don't need to fsync here, because in-progress and incomplete - ;; are effectively equivalent on restart - - t) - timeout-val) - (catch TimeoutException _ - timeout-val)))) - - (take! [this q-name] - (take! this q-name Long/MAX_VALUE nil)) - - (put! [_ q-name task-descriptor timeout] - (let [q-name (munge (name q-name)) - ^LinkedBlockingQueue q (queue q-name) - slab! (fn [] - (let [slabs (@queue-name->slabs q-name) - slab (last slabs) - task (when slab - (append-to-slab! slab task-descriptor)) - - ;; if no task was created, we need to create a new slab file - ;; and try again - slab (if task - slab - (create-new-slab q-name)) - task (or task (append-to-slab! slab task-descriptor))] - - (when-not task - (throw - (IllegalArgumentException. - (str "Can't enqueue task whose serialized representation is larger than :slab-size, which is currently " slab-size)))) - - (when fsync-put? - (sync! slab)) - task)) - - queue! (fn [task] - (if (zero? timeout) - (.offer q task) - (.offer q task timeout TimeUnit/MILLISECONDS)))] - (if (locking q - (queue! - (vary-meta (slab!) assoc - ::this this-ref - ::queue-name q-name - ::queue q - ::fsync? fsync-take?))) - (do - (populate-stats! q-name) - (let [^AtomicLong counter (get-in @queue-name->stats [q-name :enqueued])] - (.incrementAndGet counter)) - true) - false))) - - (put! [this q-name task-descriptor] - (put! this q-name task-descriptor Long/MAX_VALUE)))) - - @this-ref))) - -;;; - -(defn task-seq - "Returns an infinite lazy sequence of tasks for `q-name`." - [qs q-name] - (lazy-seq - (cons - (take! qs q-name) - (task-seq qs q-name)))) - -(defn immediate-task-seq - "Returns a finite lazy sequence of tasks for `q-name` which terminates once there are - no more tasks immediately available." - [qs q-name] - (lazy-seq - (let [task (take! qs q-name 0 ::none)] - (when-not (= ::none task) - (cons - task - (immediate-task-seq qs q-name)))))) - -(defn interval-task-seq - "Returns a lazy sequence of tasks that can be consumed in `interval` milliseconds. This will - terminate after that time has elapsed, even if there are still tasks immediately available." - [qs q-name interval] - (let [now (System/nanoTime)] - (lazy-seq - (let [now' (System/nanoTime) - remaining (- (* 1000000 interval) (- now' now))] - (when (pos? remaining) - (let [task (take! qs q-name remaining ::none)] - (when-not (= ::none task) - (cons - task - (interval-task-seq qs q-name (- (* 1000000 interval) (- (System/nanoTime) now))))))))))) - -(defn complete! - "Marks a task as complete." - [task] - (if (identical? :complete (status task)) - false - (do - (status! task :complete) - (when (-> task meta ::fsync?) - (sync! (:slab task))) - (mark-complete! @(-> task meta ::this) (-> task meta ::queue-name)) - true))) - -(defn retry! - "Marks a task as available for retry." - [task] - (if (or - (identical? :complete (status task)) - (identical? :incomplete (status task))) - false - (do - (status! task :incomplete) - (when (-> task meta ::fsync?) - (sync! (:slab task))) - (mark-retry! @(-> task meta ::this) (-> task meta ::queue-name)) - (let [^LinkedBlockingQueue q (-> task meta ::queue)] - (.put q task)) - true))) diff --git a/test/durable_queue_test.clj b/test/durable_queue_test.clj deleted file mode 100644 index 68da3d4..0000000 --- a/test/durable_queue_test.clj +++ /dev/null @@ -1,134 +0,0 @@ -(ns durable-queue-test - "This tests the DEPRECATED single-segment namespace." - (:require - [clojure.java.io :as io] - [clojure.test :refer [deftest is]] - [durable-queue :as dq] - [criterium.core :as c])) - -(defn clear-tmp-directory [] - (doseq [f (->> (#'durable-queue/directory->queue-name->slab-files "/tmp") - vals - (apply concat))] - (.delete (io/file f)))) - -(deftest test-basic-put-take - (clear-tmp-directory) - (let [q (dq/queues "/tmp" {:slab-size 1024}) - tasks (range 1e4)] - (doseq [t tasks] - (dq/put! q :foo t)) - (is (= tasks (map deref (dq/immediate-task-seq q :foo)))) - (dq/delete! q))) - -(deftest test-partial-slab-writes - (clear-tmp-directory) - (dotimes [i 10] - (dq/put! (dq/queues "/tmp") :foo i)) - (is (= (range 10) (map deref (dq/immediate-task-seq (dq/queues "/tmp") :foo))))) - -(deftest test-retry - (clear-tmp-directory) - (with-open [^java.io.Closeable q (dq/queues "/tmp")] - - (doseq [t (range 10)] - (dq/put! q :foo t)) - - (let [tasks' (dq/immediate-task-seq q :foo)] - (is (= (range 10) (map deref tasks'))) - (doseq [t (take 5 tasks')] - (dq/complete! t)) - (doseq [t (range 10 15)] - (dq/put! q :foo t)))) - - ;; create a new manager, which will mark all in-progress tasks as incomplete - (with-open [^java.io.Closeable q (dq/queues "/tmp")] - (let [tasks' (dq/immediate-task-seq q :foo)] - (is (= (range 5 15) (map deref tasks'))) - (doseq [t (take 5 tasks')] - (dq/complete! t)))) - - (with-open [^java.io.Closeable q (dq/queues "/tmp")] - (let [tasks' (dq/immediate-task-seq q :foo)] - (is (= (range 10 15) (map deref tasks'))) - (doseq [t (range 15 20)] - (dq/put! q :foo t)))) - - (let [q (dq/queues "/tmp" {:complete? even?})] - (is (= (remove even? (range 10 20)) (map deref (dq/immediate-task-seq q :foo)))))) - -;;; - -(deftest ^:benchmark benchmark-put-take - (clear-tmp-directory) - - (println "\n\n-- sync both") - (let [q (dq/queues "/tmp" {:fsync-put? true, :fsync-take? true})] - (c/quick-bench - (do - (dq/put! q :foo 1) - (dq/complete! (dq/take! q :foo))))) - - (println "\n\n-- sync take") - (let [q (dq/queues "/tmp" {:fsync-put? false, :fsync-take? true})] - (c/quick-bench - (do - (dq/put! q :foo 1) - (dq/complete! (dq/take! q :foo))))) - - (println "\n\n-- sync put") - (let [q (dq/queues "/tmp" {:fsync-put? true, :fsync-take? false})] - (c/quick-bench - (do - (dq/put! q :foo 1) - (dq/complete! (dq/take! q :foo))))) - - (println "\n\n-- sync every 10 writes") - (let [q (dq/queues "/tmp" {:fsync-put? false, :fsync-threshold 10})] - (c/quick-bench - (do - (dq/put! q :foo 1) - (dq/complete! (dq/take! q :foo))))) - - (println "\n\n-- sync every 100 writes") - (let [q (dq/queues "/tmp" {:fsync-put? false, :fsync-threshold 100})] - (c/quick-bench - (do - (dq/put! q :foo 1) - (dq/complete! (dq/take! q :foo))))) - - (println "\n\n-- sync every 100ms") - (let [q (dq/queues "/tmp" {:fsync-put? false, :fsync-interval 100})] - (c/quick-bench - (do - (dq/put! q :foo 1) - (dq/complete! (dq/take! q :foo))))) - - (println "\n\n-- sync neither") - (let [q (dq/queues "/tmp" {:fsync-put? false, :fsync-take? false})] - (c/quick-bench - (do - (dq/put! q :foo 1) - (dq/complete! (dq/take! q :foo)))))) - -;;; - -(deftest ^:stress stress-queue-size - (clear-tmp-directory) - - (with-open [^java.io.Closeable q (dq/queues "/tmp")] - (let [ary (byte-array 1e6)] - (dotimes [i 1e6] - (aset ary i (byte (rand-int 127)))) - (dotimes [_ 1e5] - (dq/put! q :stress ary)))) - - (with-open [^java.io.Closeable q (dq/queues "/tmp" {:complete? (constantly false)})] - (let [s (doall (dq/immediate-task-seq q :stress))] - (doseq [t s] - (dq/retry! t))) - (let [s (dq/immediate-task-seq q :stress)] - (doseq [t s] - (dq/complete! t)))) - - (clear-tmp-directory)) From ff3cc82046edfb4097c5fd46e9730be93c181db8 Mon Sep 17 00:00:00 2001 From: Sean Corfield Date: Sun, 21 Sep 2025 17:23:28 -0400 Subject: [PATCH 15/17] update jdk default to 25 Signed-off-by: Sean Corfield --- bb.edn | 4 ++-- deps.edn | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/bb.edn b/bb.edn index 4405620..ca2da21 100644 --- a/bb.edn +++ b/bb.edn @@ -3,7 +3,7 @@ ;; bench - run only benchmarks ;; stress - run only stress tests ;; 1.10/1.11/1.12 - run only against the specified Clojure version (defaults to 1.10) -;; jdk11/jdk17/jdk21/jdk24 - indicate which JDK you are using (defaults to jdk24) +;; jdk11/jdk17/jdk21/jdk25 - indicate which JDK you are using (defaults to jdk25) {:tasks {:requires [[clojure.string :as str]] @@ -19,7 +19,7 @@ *command-line-args*)) jdk (or base-jdk ;; sean's local default: - "jdk24") + "jdk25") test-selectors (cond bench? ["-i" ":benchmark"] stress? ["-i" ":stress"] diff --git a/deps.edn b/deps.edn index 291f70f..4cb0801 100644 --- a/deps.edn +++ b/deps.edn @@ -24,4 +24,4 @@ :jdk11 {} :jdk17 {} :jdk21 {} - :jdk24 {:jvm-opts ["--enable-native-access=ALL-UNNAMED"]}}} + :jdk25 {:jvm-opts ["--enable-native-access=ALL-UNNAMED"]}}} From 5cb9fbaca1858b1ef0556d33262d07168017f6ef Mon Sep 17 00:00:00 2001 From: Sean Corfield Date: Sun, 2 Nov 2025 17:50:14 -0500 Subject: [PATCH 16/17] prep for 0.2.0 Signed-off-by: Sean Corfield --- CHANGELOG.md | 4 ++-- README.md | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2afcecc..1462b88 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,7 @@ ## CHANGES -v0.2.0 in progress -* Removed single-segment `durable-queue` ns; use `clj-commons.durable-queue` instead +v0.2.0 -- 2025-11-02 +* **Removed single-segment `durable-queue` ns; use `clj-commons.durable-queue` instead** * Deprecate `delete!` because it is dangerous: it leaves queues in a corrupted state (because it deletes files without cleaning up in-memory state). * Address [#30](https://github.com/clj-commons/durable-queue/issues/30) by adding `delete-queue!` and `delete-all!` to safely delete an individual queue (and its files) and safely delete all queues (and their files). * Clean up lint issues; export clj-kondo config diff --git a/README.md b/README.md index eb33a28..fac6287 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ This library implements a disk-backed task queue, allowing for queues that can s ### usage ```clj -[org.clj-commons/durable-queue "0.1.7"] +[org.clj-commons/durable-queue "0.2.0"] ``` To interact with queues, first create a `queues` object by specifying a directory in the filesystem and an options map: From 7278db6415b0aa209a33b30ac6ba6bf904dd4e7a Mon Sep 17 00:00:00 2001 From: Sean Corfield Date: Sun, 2 Nov 2025 17:55:09 -0500 Subject: [PATCH 17/17] update deps Signed-off-by: Sean Corfield --- .github/workflows/test-and-release.yml | 2 +- .github/workflows/test-and-snapshot.yml | 4 ++-- .github/workflows/test.yml | 2 +- CHANGELOG.md | 1 + deps.edn | 4 ++-- 5 files changed, 7 insertions(+), 6 deletions(-) diff --git a/.github/workflows/test-and-release.yml b/.github/workflows/test-and-release.yml index 9d65d63..ca3ffb5 100644 --- a/.github/workflows/test-and-release.yml +++ b/.github/workflows/test-and-release.yml @@ -19,7 +19,7 @@ jobs: - name: Setup Clojure uses: DeLaGuardo/setup-clojure@master with: - cli: '1.12.2.1565' + cli: '1.12.3.1577' bb: 'latest' - name: Cache All The Things uses: actions/cache@v4 diff --git a/.github/workflows/test-and-snapshot.yml b/.github/workflows/test-and-snapshot.yml index 56c8ae7..a565da0 100644 --- a/.github/workflows/test-and-snapshot.yml +++ b/.github/workflows/test-and-snapshot.yml @@ -17,7 +17,7 @@ jobs: - name: Setup Clojure uses: DeLaGuardo/setup-clojure@master with: - cli: '1.12.2.1565' + cli: '1.12.3.1577' bb: 'latest' - name: Cache All The Things uses: actions/cache@v4 @@ -49,7 +49,7 @@ jobs: - name: Setup Clojure uses: DeLaGuardo/setup-clojure@master with: - cli: '1.12.2.1565' + cli: '1.12.3.1577' bb: 'latest' - name: Cache All The Things uses: actions/cache@v4 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 49e7b55..52d4a90 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -17,7 +17,7 @@ jobs: - name: Setup Clojure uses: DeLaGuardo/setup-clojure@master with: - cli: '1.12.2.1565' + cli: '1.12.3.1577' bb: 'latest' - name: Cache All The Things uses: actions/cache@v4 diff --git a/CHANGELOG.md b/CHANGELOG.md index 1462b88..26ba838 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ v0.2.0 -- 2025-11-02 * Deprecate `delete!` because it is dangerous: it leaves queues in a corrupted state (because it deletes files without cleaning up in-memory state). * Address [#30](https://github.com/clj-commons/durable-queue/issues/30) by adding `delete-queue!` and `delete-all!` to safely delete an individual queue (and its files) and safely delete all queues (and their files). * Clean up lint issues; export clj-kondo config +* Update dependencies (again) * Add a changelog! v0.1.7 -- 2025-09-02 diff --git a/deps.edn b/deps.edn index 4cb0801..eff3287 100644 --- a/deps.edn +++ b/deps.edn @@ -6,14 +6,14 @@ :aliases {;; for help: clojure -T:deps:build help/doc - :build {:deps {io.github.clojure/tools.build {:mvn/version "0.10.10"} + :build {:deps {io.github.clojure/tools.build {:mvn/version "0.10.11"} slipset/deps-deploy {:mvn/version "0.2.2"}} :ns-default build} ;; versions to test against: :1.10 {:override-deps {org.clojure/clojure {:mvn/version "1.10.3"}}} :1.11 {:override-deps {org.clojure/clojure {:mvn/version "1.11.4"}}} - :1.12 {:override-deps {org.clojure/clojure {:mvn/version "1.12.2"}}} + :1.12 {:override-deps {org.clojure/clojure {:mvn/version "1.12.3"}}} ;; running tests/checks of various kinds: :test {:extra-paths ["test"]