From 11cf60fd0ae0a35af6a1c585ee4ec3d1d711c4a4 Mon Sep 17 00:00:00 2001 From: Shogo Ohta Date: Thu, 22 Aug 2024 18:25:53 +0900 Subject: [PATCH 1/2] Fine-tune CRAM container/slice partitioning --- src/cljam/io/cram.clj | 6 ++ src/cljam/io/cram/encode/partitioning.clj | 89 +++++++++++++++++++++++ src/cljam/io/cram/encode/record.clj | 16 ++-- src/cljam/io/cram/writer.clj | 24 ++---- 4 files changed, 111 insertions(+), 24 deletions(-) create mode 100644 src/cljam/io/cram/encode/partitioning.clj diff --git a/src/cljam/io/cram.clj b/src/cljam/io/cram.clj index e557b4d0..f618eb65 100644 --- a/src/cljam/io/cram.clj +++ b/src/cljam/io/cram.clj @@ -58,6 +58,12 @@ a sequence reader that reads sequences from the reference file. This may be omitted only when the CRAM file to be read does not require a reference file. + - records-per-slice: The maximum number of records a slice may contain. + Defaults to 10000. + - slices-per-container: The maximum number of slices a container may contain. + Defaults to 1. + - min-single-ref-slice-size: The minimum number of records required to emit + a single-reference slice. Defaults to 1000. - ds-compressor-overrides: A function to override data series compressors. Given a data series keyword, returns a keyword or a set of keywords representing compression method. It may return another function to add diff --git a/src/cljam/io/cram/encode/partitioning.clj b/src/cljam/io/cram/encode/partitioning.clj new file mode 100644 index 00000000..3cffa4b7 --- /dev/null +++ b/src/cljam/io/cram/encode/partitioning.clj @@ -0,0 +1,89 @@ +(ns cljam.io.cram.encode.partitioning + (:require [cljam.io.sam.util.header :as sam.header]) + (:import [java.util ArrayList List])) + +(defn- slice-record-collector + [header {:keys [^long records-per-slice ^long min-single-ref-slice-size] + :or {records-per-slice 10000, min-single-ref-slice-size 1000}}] + (let [sort-by-coord? (= (sam.header/sort-order header) sam.header/order-coordinate) + ;; CRAM files sorted by coord can easily get back from a multi-ref state + ;; to a single-ref state. To support this, multi-ref slices should be kept + ;; as small as possible + multi-upper-bound (if sort-by-coord? + min-single-ref-slice-size + records-per-slice)] + (fn [empty-container? alns] + (let [slice-records (ArrayList.)] + (loop [ref-state nil, alns alns] + (if (seq alns) + (let [[{:keys [rname] :as aln} & more] alns + n-records (.size slice-records)] + (case ref-state + nil + (let [ref-state' (if (= rname "*") :unmapped rname)] + (.add slice-records aln) + (recur ref-state' more)) + + :unmapped + (if (< n-records records-per-slice) + (let [ref-state' (cond (= rname "*") ref-state + sort-by-coord? (throw + (ex-info + (str "Unmapped records " + "must be last") + {})) + :else :multi-ref)] + (.add slice-records aln) + (recur ref-state' more)) + [ref-state slice-records alns]) + + :multi-ref + (if (< n-records multi-upper-bound) + (do (.add slice-records aln) + (recur ref-state more)) + [ref-state slice-records alns]) + + (if (= ref-state rname) + (if (< n-records records-per-slice) + (do (.add slice-records aln) + (recur ref-state more)) + [ref-state slice-records alns]) + ;; If the container already contains one or more single-ref + ;; slices, instead of creating and adding a new multi-ref slice + ;; to that container, add the accumulated single-ref slice + ;; to the container and add the current record to the next slice + (if (and empty-container? (< n-records min-single-ref-slice-size)) + (do (.add slice-records aln) + (recur :multi-ref more)) + [ref-state slice-records alns])))) + [ref-state slice-records alns])))))) + +(defn with-each-container + "Partitions the given alignment records into containers, which are represented + as a List of Lists of record maps, and calls the function f with them." + [header options alns f] + (let [{:keys [^long slices-per-container] :or {slices-per-container 1}} options + collect-fn (slice-record-collector header options) + container-records (ArrayList.)] + (loop [ref-state nil, written 0, ready 0, alns alns] + (if (seq alns) + (let [n-slices (.size container-records) + [ref-state' ^List slice-records alns'] (collect-fn (zero? n-slices) alns)] + (if (or (= n-slices slices-per-container) + (= ref-state' :multi-ref) + (and (some? ref-state) (not= ref-state ref-state'))) + (let [written' (+ written ready)] + (when-not (.isEmpty container-records) + (f written container-records)) + (.clear container-records) + (if (= ref-state' :multi-ref) + (do (f written' (doto (ArrayList.) (.add slice-records))) + (recur nil (+ written' (.size slice-records)) 0 alns')) + (let [ready' (.size slice-records)] + (.add container-records slice-records) + (recur ref-state' written' ready' alns')))) + (let [ready' (+ ready (.size slice-records))] + (.add container-records slice-records) + (recur ref-state' written ready' alns')))) + (when-not (.isEmpty container-records) + (f written container-records)))))) diff --git a/src/cljam/io/cram/encode/record.clj b/src/cljam/io/cram/encode/record.clj index 64685d40..bcd10e4a 100644 --- a/src/cljam/io/cram/encode/record.clj +++ b/src/cljam/io/cram/encode/record.clj @@ -1,11 +1,11 @@ (ns cljam.io.cram.encode.record (:require [cljam.io.cram.encode.alignment-stats :as stats] + [cljam.io.cram.encode.tag-dict :as tag-dict] + [cljam.io.cram.seq-resolver.protocol :as resolver] [cljam.io.sam.util.cigar :as sam.cigar] [cljam.io.sam.util.flag :as sam.flag] - [cljam.io.sam.util.option :as sam.option] - [cljam.io.cram.seq-resolver.protocol :as resolver] - [cljam.io.cram.encode.tag-dict :as tag-dict]) - (:import [java.util Arrays])) + [cljam.io.sam.util.option :as sam.option]) + (:import [java.util Arrays List])) (defn- ref-index [rname->idx rname] (if (= rname "*") @@ -202,9 +202,9 @@ "Preprocesses slice records to calculate some record fields prior to record encoding that are necessary for the CRAM writer to generate some header components." - [{:keys [rname->idx subst-mat seq-resolver tag-dict-builder]} ^objects records] - (dotimes [i (alength records)] - (let [record (aget records i) + [{:keys [rname->idx subst-mat seq-resolver tag-dict-builder]} ^List records] + (dotimes [i (.size records)] + (let [record (.get records i) ;; these flag bits of CF are hard-coded at the moment: ;; - 0x01: quality scores stored as array (true) ;; - 0x02: detached (true) @@ -217,4 +217,4 @@ record' (assoc record ::flag cf ::ref-index ri ::end end ::features fs ::tags-index tags-id)] - (aset records i record')))) + (.set records i record')))) diff --git a/src/cljam/io/cram/writer.clj b/src/cljam/io/cram/writer.clj index 02aafc4c..66b2dd92 100644 --- a/src/cljam/io/cram/writer.clj +++ b/src/cljam/io/cram/writer.clj @@ -2,6 +2,7 @@ (:require [cljam.io.crai :as crai] [cljam.io.cram.encode.alignment-stats :as stats] [cljam.io.cram.encode.context :as context] + [cljam.io.cram.encode.partitioning :as partition] [cljam.io.cram.encode.record :as record] [cljam.io.cram.encode.structure :as struct] [cljam.io.cram.seq-resolver.protocol :as resolver] @@ -54,9 +55,7 @@ preservation-map seq-resolver) {:keys [ds-compressor-overrides tag-compressor-overrides]} options] - (dotimes [i (alength container-records)] - (let [slice-records (aget container-records i)] - (record/preprocess-slice-records container-ctx slice-records))) + (run! (partial record/preprocess-slice-records container-ctx) container-records) (context/finalize-container-context container-ctx ds-compressor-overrides tag-compressor-overrides))) @@ -180,8 +179,7 @@ compression-header-block (generate-compression-header-block container-ctx) container-header (generate-container-header compression-header-block slices) ^DataOutputStream out (.-stream wtr) - container-offset (.size out) - counter' (:counter (peek slices))] + container-offset (.size out)] (struct/encode-container-header out (assoc container-header :counter counter)) (.write out compression-header-block) (run! (fn [{:keys [^bytes header-block data-blocks]}] @@ -190,17 +188,11 @@ slices) (when-let [index-writer (.-index-writer wtr)] (write-index-entries index-writer container-offset container-header - slices container-records)) - counter')) - -(defn- partition-alignments [slices-per-container records-per-slice alns] - (->> alns - (partition-all records-per-slice) - (map object-array) - (partition-all slices-per-container) - (map object-array))) + slices container-records)))) (defn write-alignments "Writes all the given alignments, which is a sequence of alignment maps." - [wtr alns header] - (reduce (partial write-container wtr header) 0 (partition-alignments 1 10000 alns))) + [^CRAMWriter wtr alns header] + (partition/with-each-container header (.-options wtr) alns + (fn [counter container-records] + (write-container wtr header counter container-records)))) From 7c93d239b048fe0598b7111900be0f9f0d080a97 Mon Sep 17 00:00:00 2001 From: Shogo Ohta Date: Tue, 27 Aug 2024 11:00:31 +0900 Subject: [PATCH 2/2] Add tests for CRAM container/slice partitioning --- .../io/cram/encode/partitioning_test.clj | 201 ++++++++++++++++++ test/cljam/io/cram/encode/record_test.clj | 11 +- 2 files changed, 207 insertions(+), 5 deletions(-) create mode 100644 test/cljam/io/cram/encode/partitioning_test.clj diff --git a/test/cljam/io/cram/encode/partitioning_test.clj b/test/cljam/io/cram/encode/partitioning_test.clj new file mode 100644 index 00000000..a6c9e2fd --- /dev/null +++ b/test/cljam/io/cram/encode/partitioning_test.clj @@ -0,0 +1,201 @@ +(ns cljam.io.cram.encode.partitioning-test + (:require [cljam.io.cram.encode.partitioning :as partition] + [clojure.test :refer [are deftest testing]])) + +(defn- partition-alignments [header options alns] + (let [acc (volatile! [])] + (partition/with-each-container header options alns + (fn [counter container-records] + (vswap! acc conj [counter (mapv vec container-records)]))) + @acc)) + +(deftest with-each-container-test + (let [options {:slices-per-container 2 + :records-per-slice 3 + :min-single-ref-slice-size 2}] + (testing "sorted by coord" + (let [header {:HD {:SO "coordinate"}}] + (are [input expected] + (= expected + (partition-alignments header options input)) + [] + [] + + [{:qname "q1", :rname "chr1"}] + [[0 [[{:qname "q1", :rname "chr1"}]]]] + + [{:qname "q1", :rname "chr1"} + {:qname "q2", :rname "chr1"}] + [[0 [[{:qname "q1", :rname "chr1"} + {:qname "q2", :rname "chr1"}]]]] + + [{:qname "q1", :rname "chr1"} + {:qname "q2", :rname "chr1"} + {:qname "q3", :rname "chr1"} + {:qname "q4", :rname "chr1"}] + [[0 [[{:qname "q1", :rname "chr1"} + {:qname "q2", :rname "chr1"} + {:qname "q3", :rname "chr1"}] + [{:qname "q4", :rname "chr1"}]]]] + + (map (fn [i] {:qname (str \q (inc (long i))), :rname "chr1"}) (range 6)) + [[0 [[{:qname "q1", :rname "chr1"} + {:qname "q2", :rname "chr1"} + {:qname "q3", :rname "chr1"}] + [{:qname "q4", :rname "chr1"} + {:qname "q5", :rname "chr1"} + {:qname "q6", :rname "chr1"}]]]] + + (map (fn [i] {:qname (str \q (inc (long i))), :rname "chr1"}) (range 7)) + [[0 [[{:qname "q1", :rname "chr1"} + {:qname "q2", :rname "chr1"} + {:qname "q3", :rname "chr1"}] + [{:qname "q4", :rname "chr1"} + {:qname "q5", :rname "chr1"} + {:qname "q6", :rname "chr1"}]]] + [6 [[{:qname "q7", :rname "chr1"}]]]] + + [{:qname "q1", :rname "chr1"} + {:qname "q2", :rname "chr2"}] + [[0 [[{:qname "q1", :rname "chr1"} + {:qname "q2", :rname "chr2"}]]]] + + [{:qname "q1", :rname "chr1"} + {:qname "q2", :rname "chr1"} + {:qname "q3", :rname "chr2"}] + [[0 [[{:qname "q1", :rname "chr1"} + {:qname "q2", :rname "chr1"}]]] + [2 [[{:qname "q3", :rname "chr2"}]]]] + + [{:qname "q1", :rname "chr1"} + {:qname "q2", :rname "chr1"} + {:qname "q3", :rname "chr1"} + {:qname "q4", :rname "chr2"}] + [[0 [[{:qname "q1", :rname "chr1"} + {:qname "q2", :rname "chr1"} + {:qname "q3", :rname "chr1"}]]] + [3 [[{:qname "q4", :rname "chr2"}]]]] + + [{:qname "q1", :rname "chr1"} + {:qname "q2", :rname "chr1"} + {:qname "q3", :rname "chr2"} + {:qname "q4", :rname "chr2"}] + [[0 [[{:qname "q1", :rname "chr1"} + {:qname "q2", :rname "chr1"}]]] + [2 [[{:qname "q3", :rname "chr2"} + {:qname "q4", :rname "chr2"}]]]] + + [{:qname "q1", :rname "chr1"} + {:qname "q2", :rname "chr2"} + {:qname "q3", :rname "chr2"} + {:qname "q4", :rname "chr2"}] + [[0 [[{:qname "q1", :rname "chr1"} + {:qname "q2", :rname "chr2"}]]] + [2 [[{:qname "q3", :rname "chr2"} + {:qname "q4", :rname "chr2"}]]]] + + [{:qname "q1", :rname "chr1"} + {:qname "q2", :rname "chr1"} + {:qname "q3", :rname "chr1"} + {:qname "q4", :rname "chr1"} + {:qname "q5", :rname "chr2"}] + [[0 [[{:qname "q1", :rname "chr1"} + {:qname "q2", :rname "chr1"} + {:qname "q3", :rname "chr1"}] + [{:qname "q4", :rname "chr1"}]]] + [4 [[{:qname "q5", :rname "chr2"}]]]] + + [{:qname "q1", :rname "chr1"} + {:qname "q2", :rname "chr2"} + {:qname "q3", :rname "chr2"} + {:qname "q4", :rname "chr3"} + {:qname "q5", :rname "chr4"}] + [[0 [[{:qname "q1", :rname "chr1"} + {:qname "q2", :rname "chr2"}]]] + [2 [[{:qname "q3", :rname "chr2"} + {:qname "q4", :rname "chr3"}]]] + [4 [[{:qname "q5", :rname "chr4"}]]]] + + [{:qname "q1", :rname "*"}] + [[0 [[{:qname "q1", :rname "*"}]]]] + + [{:qname "q1", :rname "*"} + {:qname "q2", :rname "*"}] + [[0 [[{:qname "q1", :rname "*"} + {:qname "q2", :rname "*"}]]]] + + [{:qname "q1", :rname "*"} + {:qname "q2", :rname "*"} + {:qname "q3", :rname "*"} + {:qname "q4", :rname "*"}] + [[0 [[{:qname "q1", :rname "*"} + {:qname "q2", :rname "*"} + {:qname "q3", :rname "*"}] + [{:qname "q4", :rname "*"}]]]] + + [{:qname "q1", :rname "chr1"} + {:qname "q2", :rname "*"}] + [[0 [[{:qname "q1", :rname "chr1"} + {:qname "q2", :rname "*"}]]]] + + [{:qname "q1", :rname "chr1"} + {:qname "q2", :rname "chr1"} + {:qname "q3", :rname "*"}] + [[0 [[{:qname "q1", :rname "chr1"} + {:qname "q2", :rname "chr1"}]]] + [2 [[{:qname "q3", :rname "*"}]]]] + + [{:qname "q1", :rname "chr1"} + {:qname "q2", :rname "chr1"} + {:qname "q3", :rname "*"} + {:qname "q4", :rname "*"}] + [[0 [[{:qname "q1", :rname "chr1"} + {:qname "q2", :rname "chr1"}]]] + [2 [[{:qname "q3", :rname "*"} + {:qname "q4", :rname "*"}]]]] + + [{:qname "q1", :rname "chr1"} + {:qname "q2", :rname "*"} + {:qname "q3", :rname "*"} + {:qname "q4", :rname "*"}] + [[0 [[{:qname "q1", :rname "chr1"} + {:qname "q2", :rname "*"}]]] + [2 [[{:qname "q3", :rname "*"} + {:qname "q4", :rname "*"}]]]]) + (are [input] (thrown? Exception (partition-alignments header options input)) + [{:qname "q1", :rname "*"} + {:qname "q2", :rname "chr1"}] + + [{:qname "q1", :rname "*"} + {:qname "q2", :rname "*"} + {:qname "q3", :rname "chr1"}]))) + (testing "unsorted" + (are [input expected] + (= expected (partition-alignments {:HD {:SO "unsorted"}} options input)) + [{:qname "q1", :rname "chr1"} + {:qname "q2", :rname "chr2"} + {:qname "q3", :rname "chr1"}] + [[0 [[{:qname "q1", :rname "chr1"} + {:qname "q2", :rname "chr2"} + {:qname "q3", :rname "chr1"}]]]] + + [{:qname "q1", :rname "chr1"} + {:qname "q2", :rname "chr2"} + {:qname "q3", :rname "chr1"} + {:qname "q4", :rname "chr2"}] + [[0 [[{:qname "q1", :rname "chr1"} + {:qname "q2", :rname "chr2"} + {:qname "q3", :rname "chr1"}]]] + [3 [[{:qname "q4", :rname "chr2"}]]]] + + [{:qname "q1", :rname "*"} + {:qname "q2", :rname "chr1"}] + [[0 [[{:qname "q1", :rname "*"} + {:qname "q2", :rname "chr1"}]]]] + + [{:qname "q1", :rname "*"} + {:qname "q2", :rname "*"} + {:qname "q3", :rname "chr1"}] + [[0 [[{:qname "q1", :rname "*"} + {:qname "q2", :rname "*"} + {:qname "q3", :rname "chr1"}]]]])))) diff --git a/test/cljam/io/cram/encode/record_test.clj b/test/cljam/io/cram/encode/record_test.clj index 3fda2a61..b00f0503 100644 --- a/test/cljam/io/cram/encode/record_test.clj +++ b/test/cljam/io/cram/encode/record_test.clj @@ -6,7 +6,8 @@ [cljam.io.sequence :as cseq] [cljam.test-common :as common] [clojure.test :refer [are deftest is testing]] - [clojure.walk :as walk])) + [clojure.walk :as walk]) + (:import [java.util ArrayList])) (def ^:private test-seq-resolver (let [seqs (with-open [r (cseq/reader common/test-fa-file)] @@ -78,7 +79,7 @@ (deftest preprocess-slice-records-test (let [cram-header {:SQ [{:SN "ref"}]} - records (object-array + records (ArrayList. [{:rname "ref", :pos 1, :cigar "5M", :seq "AGAAT", :qual "HFHHH" :options [{:RG {:type "Z", :value "rg001"}} {:MD {:type "Z", :value "2C2"}} @@ -132,7 +133,7 @@ ::record/flag 0x0b, ::record/ref-index -1, ::record/end 10, ::record/tags-index 1 ::record/features []}] (walk/prewalk #(if (.isArray (class %)) (vec %) %) - records))) + (vec records)))) (is (= [[{:tag :MD, :type \Z} {:tag :NM, :type \c}] []] (:tag-dict container-ctx))) @@ -158,7 +159,7 @@ :RG [{:ID "rg001"} {:ID "rg002"}]} - records (object-array + records (ArrayList. [{:qname "q001", :flag 99, :rname "ref", :pos 1, :end 5, :mapq 0, :cigar "5M", :rnext "=", :pnext 151, :tlen 150, :seq "AGAAT", :qual "HFHHH" :options [{:RG {:type "Z", :value "rg001"}} @@ -337,7 +338,7 @@ (let [cram-header {:SQ [{:SN "ref"} {:SN "ref2"}]} - records (object-array + records (ArrayList. [{:qname "q001", :flag 77, :rname "*", :pos 0, :end 0, :mapq 0, :cigar "*", :rnext "*", :pnext 0, :tlen 0, :seq "AATCC", :qual "CCFFF" :options []}