diff --git a/.clj-kondo/config.edn b/.clj-kondo/config.edn new file mode 100644 index 0000000..9e218c2 --- /dev/null +++ b/.clj-kondo/config.edn @@ -0,0 +1,2 @@ +{:lint-as {clj-commons.durable-queue/with-buffer clojure.core/let + durable-queue/with-buffer clojure.core/let}} diff --git a/.clj-kondo/imports/babashka/fs/config.edn b/.clj-kondo/imports/babashka/fs/config.edn new file mode 100644 index 0000000..23f3609 --- /dev/null +++ b/.clj-kondo/imports/babashka/fs/config.edn @@ -0,0 +1 @@ +{:lint-as {babashka.fs/with-temp-dir clojure.core/let}} diff --git a/.clj-kondo/imports/http-kit/http-kit/config.edn b/.clj-kondo/imports/http-kit/http-kit/config.edn new file mode 100644 index 0000000..e9dbcd8 --- /dev/null +++ b/.clj-kondo/imports/http-kit/http-kit/config.edn @@ -0,0 +1,3 @@ + +{:hooks + {:analyze-call {org.httpkit.server/with-channel httpkit.with-channel/with-channel}}} diff --git a/.clj-kondo/imports/http-kit/http-kit/httpkit/with_channel.clj b/.clj-kondo/imports/http-kit/http-kit/httpkit/with_channel.clj new file mode 100644 index 0000000..b429de8 --- /dev/null +++ b/.clj-kondo/imports/http-kit/http-kit/httpkit/with_channel.clj @@ -0,0 +1,16 @@ +(ns httpkit.with-channel + (:require [clj-kondo.hooks-api :as api])) + +(defn with-channel [{node :node}] + (let [[request channel & body] (rest (:children node))] + (when-not (and request channel) (throw (ex-info "No request or channel provided" {}))) + (when-not (api/token-node? channel) (throw (ex-info "Missing channel argument" {}))) + (let [new-node + (api/list-node + (list* + (api/token-node 'let) + (api/vector-node [channel (api/vector-node [])]) + request + body))] + + {:node new-node}))) diff --git a/.clj-kondo/imports/org.clj-commons/byte-streams/clj_commons/byte_streams.clj_kondo b/.clj-kondo/imports/org.clj-commons/byte-streams/clj_commons/byte_streams.clj_kondo new file mode 100644 index 0000000..588e47e --- /dev/null +++ b/.clj-kondo/imports/org.clj-commons/byte-streams/clj_commons/byte_streams.clj_kondo @@ -0,0 +1,11 @@ +(ns clj-commons.byte-streams) + +;; TODO: propagate type info from src/dst +(defmacro def-conversion + "Kondo hook" + [[src dst :as conversion] params & body] + `(fn [~(first params) + ~(if-let [options (second params)] + options + `_#)] + ~@body)) diff --git a/.clj-kondo/imports/org.clj-commons/byte-streams/config.edn b/.clj-kondo/imports/org.clj-commons/byte-streams/config.edn new file mode 100644 index 0000000..ae60fe4 --- /dev/null +++ b/.clj-kondo/imports/org.clj-commons/byte-streams/config.edn @@ -0,0 +1,9 @@ +{:lint-as {byte-streams.utils/defprotocol+ clojure.core/defprotocol + byte-streams.utils/deftype+ clojure.core/deftype + byte-streams.utils/defrecord+ clojure.core/defrecord + byte-streams.utils/definterface+ clojure.core/definterface + clj-commons.byte-streams.utils/defprotocol+ clojure.core/defprotocol + clj-commons.byte-streams.utils/deftype+ clojure.core/deftype + clj-commons.byte-streams.utils/defrecord+ clojure.core/defrecord + clj-commons.byte-streams.utils/definterface+ clojure.core/definterface} + :hooks {:macroexpand {clj-commons.byte-streams/def-conversion clj-commons.byte-streams/def-conversion}}} diff --git a/.clj-kondo/imports/org.clj-commons/durable-queue/config.edn b/.clj-kondo/imports/org.clj-commons/durable-queue/config.edn new file mode 100644 index 0000000..9e218c2 --- /dev/null +++ b/.clj-kondo/imports/org.clj-commons/durable-queue/config.edn @@ -0,0 +1,2 @@ +{:lint-as {clj-commons.durable-queue/with-buffer clojure.core/let + durable-queue/with-buffer clojure.core/let}} diff --git a/.clj-kondo/imports/potemkin/potemkin/config.edn b/.clj-kondo/imports/potemkin/potemkin/config.edn new file mode 100644 index 0000000..3f59f3e --- /dev/null +++ b/.clj-kondo/imports/potemkin/potemkin/config.edn @@ -0,0 +1,62 @@ +{:lint-as {potemkin.collections/compile-if clojure.core/if + potemkin.collections/reify-map-type clojure.core/reify + potemkin.collections/def-map-type clj-kondo.lint-as/def-catch-all + potemkin.collections/def-derived-map clj-kondo.lint-as/def-catch-all + + potemkin.types/reify+ clojure.core/reify + potemkin.types/defprotocol+ clojure.core/defprotocol + potemkin.types/deftype+ clojure.core/deftype + potemkin.types/defrecord+ clojure.core/defrecord + potemkin.types/definterface+ clojure.core/defprotocol + potemkin.types/extend-protocol+ clojure.core/extend-protocol + potemkin.types/def-abstract-type clj-kondo.lint-as/def-catch-all + + potemkin.utils/doit clojure.core/doseq + potemkin.utils/doary clojure.core/doseq + potemkin.utils/condp-case clojure.core/condp + potemkin.utils/fast-bound-fn clojure.core/bound-fn + + potemkin.walk/prewalk clojure.walk/prewalk + potemkin.walk/postwalk clojure.walk/postwalk + potemkin.walk/walk clojure.walk/walk + + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + ;;;; top-level from import-vars + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + + ;; Have hooks + ;;potemkin/import-fn potemkin.namespaces/import-fn + ;;potemkin/import-macro potemkin.namespaces/import-macro + ;;potemkin/import-def potemkin.namespaces/import-def + + ;; Internal, not transitive + ;;potemkin/unify-gensyms potemkin.macros/unify-gensyms + ;;potemkin/normalize-gensyms potemkin.macros/normalize-gensyms + ;;potemkin/equivalent? potemkin.macros/equivalent? + + potemkin/condp-case clojure.core/condp + potemkin/doit potemkin.utils/doit + potemkin/doary potemkin.utils/doary + + potemkin/def-abstract-type clj-kondo.lint-as/def-catch-all + potemkin/reify+ clojure.core/reify + potemkin/defprotocol+ clojure.core/defprotocol + potemkin/deftype+ clojure.core/deftype + potemkin/defrecord+ clojure.core/defrecord + potemkin/definterface+ clojure.core/defprotocol + potemkin/extend-protocol+ clojure.core/extend-protocol + + potemkin/reify-map-type clojure.core/reify + potemkin/def-derived-map clj-kondo.lint-as/def-catch-all + potemkin/def-map-type clj-kondo.lint-as/def-catch-all} + + ;; leave import-vars alone, kondo special-cases it + :hooks {:macroexpand {#_#_potemkin.namespaces/import-vars potemkin.namespaces/import-vars + potemkin.namespaces/import-fn potemkin.namespaces/import-fn + potemkin.namespaces/import-macro potemkin.namespaces/import-macro + potemkin.namespaces/import-def potemkin.namespaces/import-def + + #_#_potemkin/import-vars potemkin.namespaces/import-vars + potemkin/import-fn potemkin.namespaces/import-fn + potemkin/import-macro potemkin.namespaces/import-macro + potemkin/import-def potemkin.namespaces/import-def}}} diff --git a/.clj-kondo/imports/potemkin/potemkin/potemkin/namespaces.clj b/.clj-kondo/imports/potemkin/potemkin/potemkin/namespaces.clj new file mode 100644 index 0000000..a247af5 --- /dev/null +++ b/.clj-kondo/imports/potemkin/potemkin/potemkin/namespaces.clj @@ -0,0 +1,56 @@ +(ns potemkin.namespaces + (:require [clj-kondo.hooks-api :as api])) + +(defn import-macro* + ([sym] + `(def ~(-> sym name symbol) ~sym)) + ([sym name] + `(def ~name ~sym))) + +(defmacro import-fn + ([sym] + (import-macro* sym)) + ([sym name] + (import-macro* sym name))) + +(defmacro import-macro + ([sym] + (import-macro* sym)) + ([sym name] + (import-macro* sym name))) + +(defmacro import-def + ([sym] + (import-macro* sym)) + ([sym name] + (import-macro* sym name))) + +#_ +(defmacro import-vars + "Imports a list of vars from other namespaces." + [& syms] + (let [unravel (fn unravel [x] + (if (sequential? x) + (->> x + rest + (mapcat unravel) + (map + #(symbol + (str (first x) + (when-let [n (namespace %)] + (str "." n))) + (name %)))) + [x])) + syms (mapcat unravel syms) + result `(do + ~@(map + (fn [sym] + (let [vr (resolve sym) + m (meta vr)] + (cond + (nil? vr) `(throw (ex-info (format "`%s` does not exist" '~sym) {})) + (:macro m) `(def ~(-> sym name symbol) ~sym) + (:arglists m) `(def ~(-> sym name symbol) ~sym) + :else `(def ~(-> sym name symbol) ~sym)))) + syms))] + result)) diff --git a/.clj-kondo/imports/rewrite-clj/rewrite-clj/config.edn b/.clj-kondo/imports/rewrite-clj/rewrite-clj/config.edn new file mode 100644 index 0000000..19ecae9 --- /dev/null +++ b/.clj-kondo/imports/rewrite-clj/rewrite-clj/config.edn @@ -0,0 +1,5 @@ +{:lint-as + {rewrite-clj.zip/subedit-> clojure.core/-> + rewrite-clj.zip/subedit->> clojure.core/->> + rewrite-clj.zip/edit-> clojure.core/-> + rewrite-clj.zip/edit->> clojure.core/->>}} diff --git a/.clj-kondo/imports/taoensso/encore/config.edn b/.clj-kondo/imports/taoensso/encore/config.edn new file mode 100644 index 0000000..975c943 --- /dev/null +++ b/.clj-kondo/imports/taoensso/encore/config.edn @@ -0,0 +1,7 @@ +{:hooks + {:analyze-call + {taoensso.encore/defalias taoensso.encore-hooks/defalias + taoensso.encore/defaliases taoensso.encore-hooks/defaliases + taoensso.encore/defn-cached taoensso.encore-hooks/defn-cached + taoensso.encore/defonce taoensso.encore-hooks/defonce + taoensso.encore/def* taoensso.encore-hooks/def*}}} diff --git a/.clj-kondo/imports/taoensso/encore/taoensso/encore_hooks.clj b/.clj-kondo/imports/taoensso/encore/taoensso/encore_hooks.clj new file mode 100644 index 0000000..d133f73 --- /dev/null +++ b/.clj-kondo/imports/taoensso/encore/taoensso/encore_hooks.clj @@ -0,0 +1,88 @@ +(ns taoensso.encore-hooks + "I don't personally use clj-kondo, so these hooks are + kindly authored and maintained by contributors. + PRs very welcome! - Peter Taoussanis" + (:refer-clojure :exclude [defonce]) + (:require + [clj-kondo.hooks-api :as hooks])) + +(defn defalias + [{:keys [node]}] + (let [[alias src-raw _attrs body] (rest (:children node)) + src (or src-raw alias) + sym (if src-raw (hooks/sexpr alias) (symbol (name (hooks/sexpr src))))] + {:node + (with-meta + (hooks/list-node + [(hooks/token-node 'def) + (hooks/token-node sym) + (if body + (hooks/list-node + ;; use :body in the def to avoid unused import/private var warnings + [(hooks/token-node 'or) body src]) + src)]) + (meta src))})) + +(defn defaliases + [{:keys [node]}] + (let [alias-nodes (rest (:children node))] + {:node + (hooks/list-node + (into + [(hooks/token-node 'do)] + (map + (fn alias->defalias [alias-node] + (cond + (hooks/token-node? alias-node) + (hooks/list-node + [(hooks/token-node 'taoensso.encore/defalias) + alias-node]) + + (hooks/map-node? alias-node) + (let [{:keys [src alias attrs body]} (hooks/sexpr alias-node) + ;; workaround as can't seem to (get) using a token-node + ;; and there's no update-keys (yet) in sci apparently + [& {:as node-as-map}] (:children alias-node) + {:keys [attrs body]} (zipmap (map hooks/sexpr (keys node-as-map)) + (vals node-as-map))] + (hooks/list-node + [(hooks/token-node 'taoensso.encore/defalias) + (or alias src) (hooks/token-node src) attrs body]))))) + alias-nodes))})) + +(defn defn-cached + [{:keys [node]}] + (let [[sym _opts binding-vec & body] (rest (:children node))] + {:node + (hooks/list-node + (list + (hooks/token-node 'def) + sym + (hooks/list-node + (list* + (hooks/token-node 'fn) + binding-vec + body))))})) + +(defn -def-impl + [{:keys [node]} core-macro-sym] + ;; args = [sym doc-string? attr-map? init-expr] + (let [[sym & args] (rest (:children node)) + [doc-string args] (if (and (hooks/string-node? (first args)) (next args)) [(hooks/sexpr (first args)) (next args)] [nil args]) + [attr-map init-expr] (if (and (hooks/map-node? (first args)) (next args)) [(hooks/sexpr (first args)) (fnext args)] [nil (first args)]) + + attr-map (if doc-string (assoc attr-map :doc doc-string) attr-map) + sym+meta (if attr-map (with-meta sym attr-map) sym) + + rewritten + (hooks/list-node + [(hooks/token-node core-macro-sym) + sym+meta + init-expr])] + + #_(println "old node:" node) + #_(println "new node:" rewritten) + {:node rewritten})) + +(defn def* [arg] (-def-impl arg 'def)) +(defn defonce [arg] (-def-impl arg 'clojure.core/defonce)) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 46e71d0..7c82509 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1 +1,2 @@ * @slipset +* @seancorfield diff --git a/.github/workflows/test-and-release.yml b/.github/workflows/test-and-release.yml index 9d65d63..ca3ffb5 100644 --- a/.github/workflows/test-and-release.yml +++ b/.github/workflows/test-and-release.yml @@ -19,7 +19,7 @@ jobs: - name: Setup Clojure uses: DeLaGuardo/setup-clojure@master with: - cli: '1.12.2.1565' + cli: '1.12.3.1577' bb: 'latest' - name: Cache All The Things uses: actions/cache@v4 diff --git a/.github/workflows/test-and-snapshot.yml b/.github/workflows/test-and-snapshot.yml index 56c8ae7..a565da0 100644 --- a/.github/workflows/test-and-snapshot.yml +++ b/.github/workflows/test-and-snapshot.yml @@ -17,7 +17,7 @@ jobs: - name: Setup Clojure uses: DeLaGuardo/setup-clojure@master with: - cli: '1.12.2.1565' + cli: '1.12.3.1577' bb: 'latest' - name: Cache All The Things uses: actions/cache@v4 @@ -49,7 +49,7 @@ jobs: - name: Setup Clojure uses: DeLaGuardo/setup-clojure@master with: - cli: '1.12.2.1565' + cli: '1.12.3.1577' bb: 'latest' - name: Cache All The Things uses: actions/cache@v4 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 49e7b55..52d4a90 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -17,7 +17,7 @@ jobs: - name: Setup Clojure uses: DeLaGuardo/setup-clojure@master with: - cli: '1.12.2.1565' + cli: '1.12.3.1577' bb: 'latest' - name: Cache All The Things uses: actions/cache@v4 diff --git a/.gitignore b/.gitignore index a3bf747..9f7fb2e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,10 +1,12 @@ .calva/mcp-server/port +.calva/repl.calva-repl .clj-kondo/.cache .cpcache .DS_Store .idea/ .lsp/.cache .portal/vs-code.edn +.rebel_readline_history *.class *.iml *.jar diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..26ba838 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,17 @@ +## CHANGES + +v0.2.0 -- 2025-11-02 +* **Removed single-segment `durable-queue` ns; use `clj-commons.durable-queue` instead** +* Deprecate `delete!` because it is dangerous: it leaves queues in a corrupted state (because it deletes files without cleaning up in-memory state). +* Address [#30](https://github.com/clj-commons/durable-queue/issues/30) by adding `delete-queue!` and `delete-all!` to safely delete an individual queue (and its files) and safely delete all queues (and their files). +* Clean up lint issues; export clj-kondo config +* Update dependencies (again) +* Add a changelog! + +v0.1.7 -- 2025-09-02 +* Bring all dependencies up to date +* Matrix test against Clojure 1.10, 1.11, and 1.12 +* Matrix test against JDK 11, 17, and 21 (also tested on JDK 24 manually) +* Add `deps.edn`, `bb.edn`, `build.clj` +* Add GitHub Actions for test, snapshot, and release +* Remove non-functional CircleCI integration diff --git a/README.md b/README.md index ed223e8..fac6287 100644 --- a/README.md +++ b/README.md @@ -9,26 +9,26 @@ This library implements a disk-backed task queue, allowing for queues that can s ### usage ```clj -[org.clj-commons/durable-queue "0.1.7"] +[org.clj-commons/durable-queue "0.2.0"] ``` To interact with queues, first create a `queues` object by specifying a directory in the filesystem and an options map: ```clj -> (require '[durable-queue :refer :all]) +> (require '[clj-commons.durable-queue :as dq]) nil -> (def q (queues "/tmp" {})) +> (def q (dq/queues "/tmp" {})) #'q ``` This allows us to `put!` and `take!` tasks from named queues. `take!` is a blocking read, and will only return once a task is available or, if a timeout is defined (in milliseconds), once the timeout elapses: ```clj -> (take! q :foo 10 :timed-out!) +> (dq/take! q :foo 10 :timed-out!) :timed-out! -> (put! q :foo "a task") +> (dq/put! q :foo "a task") true -> (take! q :foo) +> (dq/take! q :foo) < :in-progress | "a task" > > (deref *1) "a task" @@ -39,20 +39,20 @@ Notice that the task has a value describing its progress, and a value describing Calling `take!` removed the task from the queue, but just because we've taken the task doesn't mean we've completed the action associated with it. In order to make sure the task isn't retried on restart, we must mark it as `complete!`. ```clj -> (put! q :foo "another task") +> (dq/put! q :foo "another task") true -> (take! q :foo) +> (dq/take! q :foo) < :in-progress | "another task" > -> (complete! *1) +> (dq/complete! *1) true ``` -If our task fails and we want to re-enqueue it to be tried again, we can instead call `(retry! task)`. Tasks which are marked for retry are added to the end of the current queue. +If our task fails and we want to re-enqueue it to be tried again, we can instead call `(dq/retry! task)`. Tasks which are marked for retry are added to the end of the current queue. To get a description of the current state of the queue, we can use `stats`, which returns a map of queue names onto various counts: ```clj -> (stats q) +> (dq/stats q) {:enqueued 2, :retried 0, :completed 1, @@ -70,6 +70,20 @@ To get a description of the current state of the queue, we can use `stats`, whic | `:num-slabs` | the number of underlying files which are being used to store tasks | | `:num-active-slabs` | the number of underlying files which are currently open and mapped into memory | +As of version 0.2.0, you can delete an entire queue using `delete-queue!`, which will remove all tasks and delete all files associated with the queue: + +```clj +> (dq/delete-queue! q :foo) +{} +``` + +As of version 0.2.0, you can also delete all queues using `delete-all!`, which will remove all tasks and delete all files associated with all queues: + +```clj +> (dq/delete-all! q) +nil +``` + ### configuring the queues `queues` can be given a number of different options, which can affect its performance and correctness. @@ -90,7 +104,7 @@ A complete list of options is as follows: Disabling `:fsync-put?` will risk losing tasks if a process dies. Disabling `:fsync-take?` increases the chance of a task being re-run when a process dies. Disabling both will increase throughput of the queue by at least an order of magnitude (in the default configuration, ~1.5k tasks/sec on rotating disks and ~6k tasks/sec on SSD, with fsync completely disabled ~100k tasks/sec independent of hardware). -Writes can be batched using `fsync-threshold` and/or `fsync-interval`, or by explicitly calling `(durable-queue/fsync q)`. Setting the `fsync-threshold` to 10 will allow for ~25k tasks/sec on SSD, and still enforces a small upper boundary on how much data can be lost when the process dies. An exception will be thrown if both per-task and batch sync options are set. +Writes can be batched using `fsync-threshold` and/or `fsync-interval`, or by explicitly calling `(dq/fsync q)`. Setting the `fsync-threshold` to 10 will allow for ~25k tasks/sec on SSD, and still enforces a small upper boundary on how much data can be lost when the process dies. An exception will be thrown if both per-task and batch sync options are set. ### license diff --git a/bb.edn b/bb.edn index 2a2f3f8..ca2da21 100644 --- a/bb.edn +++ b/bb.edn @@ -1,3 +1,9 @@ +;; all tasks that run tests can also accept: +;; all - run tests against all supported Clojure versions +;; bench - run only benchmarks +;; stress - run only stress tests +;; 1.10/1.11/1.12 - run only against the specified Clojure version (defaults to 1.10) +;; jdk11/jdk17/jdk21/jdk25 - indicate which JDK you are using (defaults to jdk25) {:tasks {:requires [[clojure.string :as str]] @@ -13,7 +19,7 @@ *command-line-args*)) jdk (or base-jdk ;; sean's local default: - "jdk24") + "jdk25") test-selectors (cond bench? ["-i" ":benchmark"] stress? ["-i" ":stress"] diff --git a/build.clj b/build.clj index 3777433..0ad0438 100644 --- a/build.clj +++ b/build.clj @@ -15,8 +15,8 @@ [deps-deploy.deps-deploy :as dd])) (def lib 'org.clj-commons/durable-queue) -(defn- the-version [patch] (format "0.1.%s" patch)) -(def version (the-version "7")) +(defn- the-version [patch] (format "0.2.%s" patch)) +(def version (the-version "0")) ; 0.2.0 unreleased (def snapshot (the-version "99-SNAPSHOT")) (def class-dir "target/classes") diff --git a/deps.edn b/deps.edn index cdb1bc8..eff3287 100644 --- a/deps.edn +++ b/deps.edn @@ -1,6 +1,4 @@ -{:mvn/repos {"sonatype" {:url "https://oss.sonatype.org/content/repositories/snapshots/"} - "ossrh-snapshots" {:url "https://s01.oss.sonatype.org/content/repositories/snapshots"}} - :paths ["src"] +{:paths ["src"] :deps {org.clojure/clojure {:mvn/version "1.10.3"} com.taoensso/nippy {:mvn/version "3.6.0"} org.clj-commons/primitive-math {:mvn/version "1.0.1"} @@ -8,23 +6,22 @@ :aliases {;; for help: clojure -T:deps:build help/doc - :build {:deps {io.github.clojure/tools.build {:mvn/version "0.10.10"} + :build {:deps {io.github.clojure/tools.build {:mvn/version "0.10.11"} slipset/deps-deploy {:mvn/version "0.2.2"}} :ns-default build} ;; versions to test against: :1.10 {:override-deps {org.clojure/clojure {:mvn/version "1.10.3"}}} :1.11 {:override-deps {org.clojure/clojure {:mvn/version "1.11.4"}}} - :1.12 {:override-deps {org.clojure/clojure {:mvn/version "1.12.2"}}} + :1.12 {:override-deps {org.clojure/clojure {:mvn/version "1.12.3"}}} ;; running tests/checks of various kinds: :test {:extra-paths ["test"] :extra-deps {io.github.cognitect-labs/test-runner {:git/tag "v0.5.1" :git/sha "dfb30dd"} - criterium/criterium {:mvn/version "0.4.6"}} - :jvm-opts ["-Dlog4j2.configurationFile=log4j2-info.properties"]} + criterium/criterium {:mvn/version "0.4.6"}}} :runner {:main-opts ["-m" "cognitect.test-runner"]} :jdk11 {} :jdk17 {} :jdk21 {} - :jdk24 {:jvm-opts ["--enable-native-access=ALL-UNNAMED"]}}} + :jdk25 {:jvm-opts ["--enable-native-access=ALL-UNNAMED"]}}} diff --git a/project.clj b/project.clj deleted file mode 100644 index a8933aa..0000000 --- a/project.clj +++ /dev/null @@ -1,25 +0,0 @@ -(defproject org.clj-commons/durable-queue - (or (System/getenv "PROJECT_VERSION") "0.1.7") - :description "a in-process task-queue that is backed by disk." - :url "https://github.com/clj-commons/durable-queue" - :license {:name "Eclipse Public License" - :url "http://www.eclipse.org/legal/epl-v10.html"} - :deploy-repositories [["clojars" {:url "https://repo.clojars.org" - :username :env/clojars_username - :password :env/clojars_password - :sign-releases true}]] - :dependencies [[com.taoensso/nippy "3.6.0"] - [org.clj-commons/primitive-math "1.0.1"] - [org.clj-commons/byte-streams "0.3.4"]] - :profiles {:dev {:dependencies [[org.clojure/clojure "1.10.3"] - [criterium/criterium "0.4.6"]]} - :1.10 {:dependencies [[org.clojure/clojure "1.10.3"]]} - :1.11 {:dependencies [[org.clojure/clojure "1.11.4"]]} - :1.12 {:dependencies [[org.clojure/clojure "1.12.2"]]}} - :global-vars {*warn-on-reflection* true} - :test-selectors {:default #(not (some #{:benchmark :stress} (keys %))) - :benchmark :benchmark - :stress :stress} - :codox {:writer codox-md.writer/write-docs - :include [durable-queue]} - :jvm-opts ^:replace ["-server" "-Xmx100m"]) diff --git a/resources/clj-kondo.exports/org.clj-commons/durable-queue/config.edn b/resources/clj-kondo.exports/org.clj-commons/durable-queue/config.edn new file mode 100644 index 0000000..9e218c2 --- /dev/null +++ b/resources/clj-kondo.exports/org.clj-commons/durable-queue/config.edn @@ -0,0 +1,2 @@ +{:lint-as {clj-commons.durable-queue/with-buffer clojure.core/let + durable-queue/with-buffer clojure.core/let}} diff --git a/src/durable_queue.clj b/src/clj_commons/durable_queue.clj similarity index 85% rename from src/durable_queue.clj rename to src/clj_commons/durable_queue.clj index 813748b..faefa24 100644 --- a/src/durable_queue.clj +++ b/src/clj_commons/durable_queue.clj @@ -1,37 +1,29 @@ -(ns durable-queue +(ns clj-commons.durable-queue (:require - [clojure.java.io :as io] - [byte-streams :as bs] - [clojure.string :as str] - [primitive-math :as p] - [taoensso.nippy :as nippy]) + [clj-commons.byte-streams :as bs] + [clj-commons.primitive-math :as p] + [clojure.java.io :as io] + [taoensso.nippy :as nippy]) (:import - [java.lang.reflect - Method - Field] - [java.util.concurrent - LinkedBlockingQueue - TimeoutException - TimeUnit] - [java.util.concurrent.atomic + [java.io + File + IOException + RandomAccessFile + Writer] + [java.lang.ref + WeakReference] + [java.lang.reflect + Method] + [java.nio ByteBuffer MappedByteBuffer] + [java.nio.channels + FileChannel$MapMode] + [java.util.concurrent LinkedBlockingQueue TimeUnit TimeoutException] + [java.util.concurrent.atomic AtomicLong] - [java.util.zip - CRC32] - [java.util.concurrent.locks + [java.util.concurrent.locks ReentrantReadWriteLock] - [java.io - Writer - File - RandomAccessFile - IOException] - [java.nio.channels - FileChannel - FileChannel$MapMode] - [java.nio - ByteBuffer - MappedByteBuffer] - [java.lang.ref - WeakReference])) + [java.util.zip + CRC32])) ;;; @@ -78,7 +70,7 @@ (^:private sync! [_]) (^:private invalidate [_ offset len]) (^:private ^ByteBuffer buffer [_]) - (^:private append-to-slab! [_ descriptor]) + (^:private append-to-slab! [_ task-descriptor]) (^:private read-write-lock [_])) (defmacro ^:private with-buffer [[buf slab] & body] @@ -139,12 +131,12 @@ (.invoke clean (.invoke cleaner buf nil) nil)) - (catch Throwable e + (catch Throwable _ ;; not much we can do here, sadly ))))) (defn- force-buffer - [^MappedByteBuffer buf offset length] + [^MappedByteBuffer buf _offset _length] (.force buf)) ;;; @@ -232,8 +224,8 @@ (lazy-seq (with-buffer [buf slab] (let [^ByteBuffer buf' (.position buf (p/inc pos)) - status (.get buf') - checksum (.getLong buf') + _status (.get buf') + _checksum (.getLong buf') size (.getInt buf')] ;; this shouldn't be necessary, but let's not gratuitously @@ -249,7 +241,7 @@ (slab->task-seq slab (+ pos header-size size))))))))) - (catch Throwable e + (catch Throwable _ ;; this implies unrecoverable corruption nil ))))) @@ -268,7 +260,7 @@ (read-write-lock [_] lock) - (buffer [this] + (buffer [_] (let [buf (or @buf (swap! buf (fn [buf] @@ -299,9 +291,9 @@ (compare-and-set! dirty [start end] [Integer/MAX_VALUE 0]) nil))))) - (append-to-slab! [this descriptor] + (append-to-slab! [this task-descriptor] (with-buffer [buf this] - (let [ary (nippy/freeze descriptor) + (let [ary (nippy/freeze task-descriptor) cnt (count ary) pos @position ^ByteBuffer buf (.position buf ^Long pos)] @@ -432,7 +424,11 @@ (^:private mark-complete! [_ q-name]) (^:private mark-retry! [_ q-name]) (delete! [_] - "Deletes all files associated with the queues.") + "DEPRECATED: Deletes all files associated with all queues. Dangerous!") + (delete-queue! [_ q-name] + "Deletes the specific queue, including all associated files.") + (delete-all! [_] + "Deletes all queues, including all associated files.") (stats [_] "Returns a map of queue names onto information about the immediate state of the queue.") (fsync [_] @@ -440,12 +436,12 @@ (take! [_ q-name] [_ q-name timeout timeout-val] - "A blocking dequeue from `name`. If `timeout` is specified, returns `timeout-val` if + "A blocking dequeue from `q-name`. If `timeout` is specified, returns `timeout-val` if no task is available within `timeout` milliseconds.") (put! [_ q-name task-descriptor] [_ q-name task-descriptor timeout] - "A blocking enqueue to `name`. If `timeout` is specified, returns `false` if unable to + "A blocking enqueue to `q-name`. If `timeout` is specified, returns `false` if unable to enqueue within `timeout` milliseconds, `true` otherwise.")) (defn queues @@ -464,7 +460,13 @@ fsync-put? - if true, each `put!` will force an fsync. Defaults to true. - fsync-take? - if true, each `take!` will force an fsync. Defaults to false." + fsync-take? - if true, each `take!` will force an fsync. Defaults to false. + + fsync-threshold - The maximum number of writes (puts, takes, retries, completes) that + can be performed before an fsync is performed. + + fsync-interval - The maximum amount of time, in milliseconds, that can elapse before + an fsync is performed." ([directory] (queues directory nil)) ([directory @@ -515,10 +517,6 @@ queue-name->current-slab (atom {}) ;; initialize - slabs (->> @queue-name->slabs vals (apply concat)) - slab->count (zipmap - slabs - (map #(atom (count (seq %))) slabs)) create-new-slab (fn [q-name] (let [slab (create-slab directory q-name (queue q-name) slab-size) empty-slabs (->> (@queue-name->slabs q-name) @@ -562,12 +560,11 @@ (while (.get ref) (when-let [q (.get ref)] (try - (let [start (System/currentTimeMillis)] + (let [start (System/nanoTime)] (fsync q) - (let [end (System/currentTimeMillis)] - (Thread/sleep (long (max 0 (- fsync-interval (- end start))))))) - (catch Throwable e - ))))))) + (let [end (System/nanoTime)] + (Thread/sleep (long (max 0 (- (* 1000000 fsync-interval) (- end start))))))) + (catch Throwable _))))))) ;; populate queues with pre-existing tasks (let [empty-slabs (atom #{})] @@ -620,11 +617,25 @@ IQueues - (delete! [this] + (delete! [_] ; DEPRECATED and dangerous since it breaks the queues (doseq [s (->> @queue-name->slabs vals (apply concat))] (unmap s) (delete-slab s))) + (delete-queue! [_ q-name] + (let [q-name (munge (name q-name))] + (doseq [s (get @queue-name->slabs q-name)] + (unmap s) + (delete-slab s)) + (.clear (queue q-name)) + (swap! queue-name->stats dissoc q-name) + (swap! queue-name->slabs dissoc q-name) + (swap! queue-name->current-slab dissoc q-name))) + + (delete-all! [this] + (doseq [q (keys @queue-name->stats )] + (delete-queue! this q))) + (fsync [_] (doseq [slab (->> @queue-name->slabs vals (apply concat))] (sync! slab))) @@ -644,17 +655,17 @@ (stats [_] (let [ks (keys @queue-name->stats)] (zipmap ks - (map - (fn [q-name] - (merge - {:num-slabs (-> @queue-name->slabs (get q-name) count) - :num-active-slabs (->> (get @queue-name->slabs q-name) - (filter mapped?) - count)} - (immediate-stats (queue q-name) (get @queue-name->stats q-name)))) - ks)))) - - (take! [this q-name timeout timeout-val] + (map + (fn [q-name] + (merge + {:num-slabs (-> @queue-name->slabs (get q-name) count) + :num-active-slabs (->> (get @queue-name->slabs q-name) + (filter mapped?) + count)} + (immediate-stats (queue q-name) (get @queue-name->stats q-name)))) + ks)))) + + (take! [_ q-name timeout timeout-val] (let [q-name (munge (name q-name)) ^LinkedBlockingQueue q (queue q-name)] (try @@ -670,8 +681,8 @@ (when-not (= slab old-slab) (swap! queue-name->current-slab assoc q-name slab) (doseq [s (->> (get @queue-name->slabs q-name) - butlast - (remove #(= slab %)))] + butlast + (remove #(= slab %)))] (unmap s)))) (status! t :in-progress) @@ -704,8 +715,8 @@ (when-not task (throw - (IllegalArgumentException. - (str "Can't enqueue task whose serialized representation is larger than :slab-size, which is currently " slab-size)))) + (IllegalArgumentException. + (str "Can't enqueue task whose serialized representation is larger than :slab-size, which is currently " slab-size)))) (when fsync-put? (sync! slab)) @@ -715,13 +726,13 @@ (if (zero? timeout) (.offer q task) (.offer q task timeout TimeUnit/MILLISECONDS)))] - (if-let [val (locking q - (queue! - (vary-meta (slab!) assoc - ::this this-ref - ::queue-name q-name - ::queue q - ::fsync? fsync-take?)))] + (if (locking q + (queue! + (vary-meta (slab!) assoc + ::this this-ref + ::queue-name q-name + ::queue q + ::fsync? fsync-take?))) (do (populate-stats! q-name) (let [^AtomicLong counter (get-in @queue-name->stats [q-name :enqueued])] @@ -759,16 +770,16 @@ "Returns a lazy sequence of tasks that can be consumed in `interval` milliseconds. This will terminate after that time has elapsed, even if there are still tasks immediately available." [qs q-name interval] - (let [now (System/currentTimeMillis)] + (let [now (System/nanoTime)] (lazy-seq - (let [now' (System/currentTimeMillis) - remaining (- interval (- now' now))] + (let [now' (System/nanoTime) + remaining (- (* 1000000 interval) (- now' now))] (when (pos? remaining) (let [task (take! qs q-name remaining ::none)] (when-not (= ::none task) (cons task - (interval-task-seq qs q-name (- interval (- (System/currentTimeMillis) now))))))))))) + (interval-task-seq qs q-name (- (* 1000000 interval) (- (System/nanoTime) now))))))))))) (defn complete! "Marks a task as complete." diff --git a/test/clj_commons/durable_queue_test.clj b/test/clj_commons/durable_queue_test.clj new file mode 100644 index 0000000..b2f36d4 --- /dev/null +++ b/test/clj_commons/durable_queue_test.clj @@ -0,0 +1,132 @@ +(ns clj-commons.durable-queue-test + (:require + [clj-commons.durable-queue :as dq] + [clojure.java.io :as io] + [clojure.test :refer [deftest is]] + [criterium.core :as c])) + +(defn clear-tmp-directory [] + (doseq [f (->> (#'clj-commons.durable-queue/directory->queue-name->slab-files "/tmp") + vals + (apply concat))] + (.delete (io/file f)))) + +(deftest test-basic-put-take + (clear-tmp-directory) + (let [q (dq/queues "/tmp" {:slab-size 1024}) + tasks (range 1e4)] + (doseq [t tasks] + (dq/put! q :foo t)) + (is (= tasks (map deref (dq/immediate-task-seq q :foo)))))) + +(deftest test-partial-slab-writes + (clear-tmp-directory) + (dotimes [i 10] + (dq/put! (dq/queues "/tmp") :foo i)) + (is (= (range 10) (map deref (dq/immediate-task-seq (dq/queues "/tmp") :foo))))) + +(deftest test-retry + (clear-tmp-directory) + (with-open [^java.io.Closeable q (dq/queues "/tmp")] + + (doseq [t (range 10)] + (dq/put! q :foo t)) + + (let [tasks' (dq/immediate-task-seq q :foo)] + (is (= (range 10) (map deref tasks'))) + (doseq [t (take 5 tasks')] + (dq/complete! t)) + (doseq [t (range 10 15)] + (dq/put! q :foo t)))) + + ;; create a new manager, which will mark all in-progress tasks as incomplete + (with-open [^java.io.Closeable q (dq/queues "/tmp")] + (let [tasks' (dq/immediate-task-seq q :foo)] + (is (= (range 5 15) (map deref tasks'))) + (doseq [t (take 5 tasks')] + (dq/complete! t)))) + + (with-open [^java.io.Closeable q (dq/queues "/tmp")] + (let [tasks' (dq/immediate-task-seq q :foo)] + (is (= (range 10 15) (map deref tasks'))) + (doseq [t (range 15 20)] + (dq/put! q :foo t)))) + + (let [q (dq/queues "/tmp" {:complete? even?})] + (is (= (remove even? (range 10 20)) (map deref (dq/immediate-task-seq q :foo)))))) + +;;; + +(deftest ^:benchmark benchmark-put-take + (clear-tmp-directory) + + (println "\n\n-- sync both") + (let [q (dq/queues "/tmp" {:fsync-put? true, :fsync-take? true})] + (c/quick-bench + (do + (dq/put! q :foo 1) + (dq/complete! (dq/take! q :foo))))) + + (println "\n\n-- sync take") + (let [q (dq/queues "/tmp" {:fsync-put? false, :fsync-take? true})] + (c/quick-bench + (do + (dq/put! q :foo 1) + (dq/complete! (dq/take! q :foo))))) + + (println "\n\n-- sync put") + (let [q (dq/queues "/tmp" {:fsync-put? true, :fsync-take? false})] + (c/quick-bench + (do + (dq/put! q :foo 1) + (dq/complete! (dq/take! q :foo))))) + + (println "\n\n-- sync every 10 writes") + (let [q (dq/queues "/tmp" {:fsync-put? false, :fsync-threshold 10})] + (c/quick-bench + (do + (dq/put! q :foo 1) + (dq/complete! (dq/take! q :foo))))) + + (println "\n\n-- sync every 100 writes") + (let [q (dq/queues "/tmp" {:fsync-put? false, :fsync-threshold 100})] + (c/quick-bench + (do + (dq/put! q :foo 1) + (dq/complete! (dq/take! q :foo))))) + + (println "\n\n-- sync every 100ms") + (let [q (dq/queues "/tmp" {:fsync-put? false, :fsync-interval 100})] + (c/quick-bench + (do + (dq/put! q :foo 1) + (dq/complete! (dq/take! q :foo))))) + + (println "\n\n-- sync neither") + (let [q (dq/queues "/tmp" {:fsync-put? false, :fsync-take? false})] + (c/quick-bench + (do + (dq/put! q :foo 1) + (dq/complete! (dq/take! q :foo)))))) + +;;; + +(deftest ^:stress stress-queue-size + (clear-tmp-directory) + + (with-open [^java.io.Closeable q (dq/queues "/tmp")] + (let [ary (byte-array 1e6)] + (dotimes [i 1e6] + (aset ary i (byte (rand-int 127)))) + (dotimes [_ 1e5] + (dq/put! q :stress ary)))) + + (with-open [^java.io.Closeable q (dq/queues "/tmp" {:complete? (constantly false)})] + (let [s (doall (dq/immediate-task-seq q :stress))] + (doseq [t s] + (dq/retry! t))) + (let [s (dq/immediate-task-seq q :stress)] + (doseq [t s] + (dq/complete! t)))) + + (clear-tmp-directory)) diff --git a/test/durable_queue_test.clj b/test/durable_queue_test.clj deleted file mode 100644 index 4865f56..0000000 --- a/test/durable_queue_test.clj +++ /dev/null @@ -1,133 +0,0 @@ -(ns durable-queue-test - (:require - [clojure.java.io :as io] - [clojure.test :refer :all] - [durable-queue :refer :all] - [criterium.core :as c])) - -(defn clear-tmp-directory [] - (doseq [f (->> (#'durable-queue/directory->queue-name->slab-files "/tmp") - vals - (apply concat))] - (.delete (io/file f)))) - -(deftest test-basic-put-take - (clear-tmp-directory) - (let [q (queues "/tmp" {:slab-size 1024}) - tasks (range 1e4)] - (doseq [t tasks] - (put! q :foo t)) - (is (= tasks (map deref (immediate-task-seq q :foo)))) - (delete! q))) - -(deftest test-partial-slab-writes - (clear-tmp-directory) - (dotimes [i 10] - (put! (queues "/tmp") :foo i)) - (is (= (range 10) (map deref (immediate-task-seq (queues "/tmp") :foo))))) - -(deftest test-retry - (clear-tmp-directory) - (with-open [^java.io.Closeable q (queues "/tmp")] - - (doseq [t (range 10)] - (put! q :foo t)) - - (let [tasks' (immediate-task-seq q :foo)] - (is (= (range 10) (map deref tasks'))) - (doseq [t (take 5 tasks')] - (complete! t)) - (doseq [t (range 10 15)] - (put! q :foo t)))) - - ;; create a new manager, which will mark all in-progress tasks as incomplete - (with-open [^java.io.Closeable q (queues "/tmp")] - (let [tasks' (immediate-task-seq q :foo)] - (is (= (range 5 15) (map deref tasks'))) - (doseq [t (take 5 tasks')] - (complete! t)))) - - (with-open [^java.io.Closeable q (queues "/tmp")] - (let [tasks' (immediate-task-seq q :foo)] - (is (= (range 10 15) (map deref tasks'))) - (doseq [t (range 15 20)] - (put! q :foo t)))) - - (let [q (queues "/tmp" {:complete? even?})] - (is (= (remove even? (range 10 20)) (map deref (immediate-task-seq q :foo)))))) - -;;; - -(deftest ^:benchmark benchmark-put-take - (clear-tmp-directory) - - (println "\n\n-- sync both") - (let [q (queues "/tmp" {:fsync-put? true, :fsync-take? true})] - (c/quick-bench - (do - (put! q :foo 1) - (complete! (take! q :foo))))) - - (println "\n\n-- sync take") - (let [q (queues "/tmp" {:fsync-put? false, :fsync-take? true})] - (c/quick-bench - (do - (put! q :foo 1) - (complete! (take! q :foo))))) - - (println "\n\n-- sync put") - (let [q (queues "/tmp" {:fsync-put? true, :fsync-take? false})] - (c/quick-bench - (do - (put! q :foo 1) - (complete! (take! q :foo))))) - - (println "\n\n-- sync every 10 writes") - (let [q (queues "/tmp" {:fsync-put? false, :fsync-threshold 10})] - (c/quick-bench - (do - (put! q :foo 1) - (complete! (take! q :foo))))) - - (println "\n\n-- sync every 100 writes") - (let [q (queues "/tmp" {:fsync-put? false, :fsync-threshold 100})] - (c/quick-bench - (do - (put! q :foo 1) - (complete! (take! q :foo))))) - - (println "\n\n-- sync every 100ms") - (let [q (queues "/tmp" {:fsync-put? false, :fsync-interval 100})] - (c/quick-bench - (do - (put! q :foo 1) - (complete! (take! q :foo))))) - - (println "\n\n-- sync neither") - (let [q (queues "/tmp" {:fsync-put? false, :fsync-take? false})] - (c/quick-bench - (do - (put! q :foo 1) - (complete! (take! q :foo)))))) - -;;; - -(deftest ^:stress stress-queue-size - (clear-tmp-directory) - - (with-open [^java.io.Closeable q (queues "/tmp")] - (let [ary (byte-array 1e6)] - (dotimes [i 1e6] - (aset ary i (byte (rand-int 127)))) - (dotimes [_ 1e5] - (put! q :stress ary)))) - - (with-open [^java.io.Closeable q (queues "/tmp" {:complete? (constantly false)})] - (let [s (doall (immediate-task-seq q :stress))] - (doseq [t s] - (retry! t))) - (let [s (immediate-task-seq q :stress)] - (doseq [t s] - (complete! t)))) - - (clear-tmp-directory))