Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fine-grained partition of CRAM containers and slices #322

Merged
merged 2 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/cljam/io/cram.clj
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@
a sequence reader that reads sequences from the reference file.
This may be omitted only when the CRAM file to be read does not require
a reference file.
- records-per-slice: The maximum number of records a slice may contain.
Defaults to 10000.
- slices-per-container: The maximum number of slices a container may contain.
Defaults to 1.
- min-single-ref-slice-size: The minimum number of records required to emit
a single-reference slice. Defaults to 1000.
- ds-compressor-overrides: A function to override data series compressors.
Given a data series keyword, returns a keyword or a set of keywords
representing compression method. It may return another function to add
Expand Down
89 changes: 89 additions & 0 deletions src/cljam/io/cram/encode/partitioning.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
(ns cljam.io.cram.encode.partitioning
(:require [cljam.io.sam.util.header :as sam.header])
(:import [java.util ArrayList List]))

(defn- slice-record-collector
[header {:keys [^long records-per-slice ^long min-single-ref-slice-size]
:or {records-per-slice 10000, min-single-ref-slice-size 1000}}]
(let [sort-by-coord? (= (sam.header/sort-order header) sam.header/order-coordinate)
;; CRAM files sorted by coord can easily get back from a multi-ref state
;; to a single-ref state. To support this, multi-ref slices should be kept
;; as small as possible
multi-upper-bound (if sort-by-coord?
min-single-ref-slice-size
records-per-slice)]
(fn [empty-container? alns]
(let [slice-records (ArrayList.)]
(loop [ref-state nil, alns alns]
(if (seq alns)
(let [[{:keys [rname] :as aln} & more] alns
n-records (.size slice-records)]
(case ref-state
nil
(let [ref-state' (if (= rname "*") :unmapped rname)]
(.add slice-records aln)
(recur ref-state' more))

:unmapped
(if (< n-records records-per-slice)
(let [ref-state' (cond (= rname "*") ref-state
sort-by-coord? (throw
(ex-info
(str "Unmapped records "
"must be last")
{}))
:else :multi-ref)]
(.add slice-records aln)
(recur ref-state' more))
[ref-state slice-records alns])

:multi-ref
(if (< n-records multi-upper-bound)
(do (.add slice-records aln)
(recur ref-state more))
[ref-state slice-records alns])

(if (= ref-state rname)
(if (< n-records records-per-slice)
(do (.add slice-records aln)
(recur ref-state more))
[ref-state slice-records alns])
;; If the container already contains one or more single-ref
;; slices, instead of creating and adding a new multi-ref slice
;; to that container, add the accumulated single-ref slice
;; to the container and add the current record to the next slice
(if (and empty-container? (< n-records min-single-ref-slice-size))
(do (.add slice-records aln)
(recur :multi-ref more))
[ref-state slice-records alns]))))
[ref-state slice-records alns]))))))

(defn with-each-container
"Partitions the given alignment records into containers, which are represented
as a List of Lists of record maps, and calls the function f with them."
[header options alns f]
(let [{:keys [^long slices-per-container] :or {slices-per-container 1}} options
collect-fn (slice-record-collector header options)
container-records (ArrayList.)]
(loop [ref-state nil, written 0, ready 0, alns alns]
(if (seq alns)
(let [n-slices (.size container-records)
[ref-state' ^List slice-records alns'] (collect-fn (zero? n-slices) alns)]
(if (or (= n-slices slices-per-container)
(= ref-state' :multi-ref)
(and (some? ref-state) (not= ref-state ref-state')))
(let [written' (+ written ready)]
(when-not (.isEmpty container-records)
(f written container-records))
(.clear container-records)
(if (= ref-state' :multi-ref)
(do (f written' (doto (ArrayList.) (.add slice-records)))
(recur nil (+ written' (.size slice-records)) 0 alns'))
(let [ready' (.size slice-records)]
(.add container-records slice-records)
(recur ref-state' written' ready' alns'))))
(let [ready' (+ ready (.size slice-records))]
(.add container-records slice-records)
(recur ref-state' written ready' alns'))))
(when-not (.isEmpty container-records)
(f written container-records))))))
16 changes: 8 additions & 8 deletions src/cljam/io/cram/encode/record.clj
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
(ns cljam.io.cram.encode.record
(:require [cljam.io.cram.encode.alignment-stats :as stats]
[cljam.io.cram.encode.tag-dict :as tag-dict]
[cljam.io.cram.seq-resolver.protocol :as resolver]
[cljam.io.sam.util.cigar :as sam.cigar]
[cljam.io.sam.util.flag :as sam.flag]
[cljam.io.sam.util.option :as sam.option]
[cljam.io.cram.seq-resolver.protocol :as resolver]
[cljam.io.cram.encode.tag-dict :as tag-dict])
(:import [java.util Arrays]))
[cljam.io.sam.util.option :as sam.option])
(:import [java.util Arrays List]))

(defn- ref-index [rname->idx rname]
(if (= rname "*")
Expand Down Expand Up @@ -202,9 +202,9 @@
"Preprocesses slice records to calculate some record fields prior to record
encoding that are necessary for the CRAM writer to generate some header
components."
[{:keys [rname->idx subst-mat seq-resolver tag-dict-builder]} ^objects records]
(dotimes [i (alength records)]
(let [record (aget records i)
[{:keys [rname->idx subst-mat seq-resolver tag-dict-builder]} ^List records]
(dotimes [i (.size records)]
(let [record (.get records i)
;; these flag bits of CF are hard-coded at the moment:
;; - 0x01: quality scores stored as array (true)
;; - 0x02: detached (true)
Expand All @@ -217,4 +217,4 @@
record' (assoc record
::flag cf ::ref-index ri ::end end
::features fs ::tags-index tags-id)]
(aset records i record'))))
(.set records i record'))))
24 changes: 8 additions & 16 deletions src/cljam/io/cram/writer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
(:require [cljam.io.crai :as crai]
[cljam.io.cram.encode.alignment-stats :as stats]
[cljam.io.cram.encode.context :as context]
[cljam.io.cram.encode.partitioning :as partition]
[cljam.io.cram.encode.record :as record]
[cljam.io.cram.encode.structure :as struct]
[cljam.io.cram.seq-resolver.protocol :as resolver]
Expand Down Expand Up @@ -54,9 +55,7 @@
preservation-map
seq-resolver)
{:keys [ds-compressor-overrides tag-compressor-overrides]} options]
(dotimes [i (alength container-records)]
(let [slice-records (aget container-records i)]
(record/preprocess-slice-records container-ctx slice-records)))
(run! (partial record/preprocess-slice-records container-ctx) container-records)
(context/finalize-container-context container-ctx
ds-compressor-overrides
tag-compressor-overrides)))
Expand Down Expand Up @@ -180,8 +179,7 @@
compression-header-block (generate-compression-header-block container-ctx)
container-header (generate-container-header compression-header-block slices)
^DataOutputStream out (.-stream wtr)
container-offset (.size out)
counter' (:counter (peek slices))]
container-offset (.size out)]
(struct/encode-container-header out (assoc container-header :counter counter))
(.write out compression-header-block)
(run! (fn [{:keys [^bytes header-block data-blocks]}]
Expand All @@ -190,17 +188,11 @@
slices)
(when-let [index-writer (.-index-writer wtr)]
(write-index-entries index-writer container-offset container-header
slices container-records))
counter'))
Copy link
Member Author

@athos athos Aug 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that write-container now doesn't return the next counter value.

Previously, the CRAM writer needed to count the number of container records as writing out the container since the CRAM writer couldn't tell in advance how many records the container had in it.

Now the CRAM writer can tell in advance how many records are contained in each container by counting the number of records and packing them into slices/containers before writing them out.


(defn- partition-alignments [slices-per-container records-per-slice alns]
(->> alns
(partition-all records-per-slice)
(map object-array)
(partition-all slices-per-container)
(map object-array)))
slices container-records))))

(defn write-alignments
"Writes all the given alignments, which is a sequence of alignment maps."
[wtr alns header]
(reduce (partial write-container wtr header) 0 (partition-alignments 1 10000 alns)))
[^CRAMWriter wtr alns header]
(partition/with-each-container header (.-options wtr) alns
(fn [counter container-records]
(write-container wtr header counter container-records))))
201 changes: 201 additions & 0 deletions test/cljam/io/cram/encode/partitioning_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
(ns cljam.io.cram.encode.partitioning-test
(:require [cljam.io.cram.encode.partitioning :as partition]
[clojure.test :refer [are deftest testing]]))

(defn- partition-alignments [header options alns]
(let [acc (volatile! [])]
(partition/with-each-container header options alns
(fn [counter container-records]
(vswap! acc conj [counter (mapv vec container-records)])))
@acc))

(deftest with-each-container-test
(let [options {:slices-per-container 2
:records-per-slice 3
:min-single-ref-slice-size 2}]
(testing "sorted by coord"
(let [header {:HD {:SO "coordinate"}}]
(are [input expected]
(= expected
(partition-alignments header options input))
[]
[]

[{:qname "q1", :rname "chr1"}]
[[0 [[{:qname "q1", :rname "chr1"}]]]]

[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}]
[[0 [[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}]]]]

[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}
{:qname "q3", :rname "chr1"}
{:qname "q4", :rname "chr1"}]
[[0 [[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}
{:qname "q3", :rname "chr1"}]
[{:qname "q4", :rname "chr1"}]]]]

(map (fn [i] {:qname (str \q (inc (long i))), :rname "chr1"}) (range 6))
[[0 [[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}
{:qname "q3", :rname "chr1"}]
[{:qname "q4", :rname "chr1"}
{:qname "q5", :rname "chr1"}
{:qname "q6", :rname "chr1"}]]]]

(map (fn [i] {:qname (str \q (inc (long i))), :rname "chr1"}) (range 7))
[[0 [[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}
{:qname "q3", :rname "chr1"}]
[{:qname "q4", :rname "chr1"}
{:qname "q5", :rname "chr1"}
{:qname "q6", :rname "chr1"}]]]
[6 [[{:qname "q7", :rname "chr1"}]]]]

[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr2"}]
[[0 [[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr2"}]]]]

[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}
{:qname "q3", :rname "chr2"}]
[[0 [[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}]]]
[2 [[{:qname "q3", :rname "chr2"}]]]]

[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}
{:qname "q3", :rname "chr1"}
{:qname "q4", :rname "chr2"}]
[[0 [[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}
{:qname "q3", :rname "chr1"}]]]
[3 [[{:qname "q4", :rname "chr2"}]]]]

[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}
{:qname "q3", :rname "chr2"}
{:qname "q4", :rname "chr2"}]
[[0 [[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}]]]
[2 [[{:qname "q3", :rname "chr2"}
{:qname "q4", :rname "chr2"}]]]]

[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr2"}
{:qname "q3", :rname "chr2"}
{:qname "q4", :rname "chr2"}]
[[0 [[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr2"}]]]
[2 [[{:qname "q3", :rname "chr2"}
{:qname "q4", :rname "chr2"}]]]]

[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}
{:qname "q3", :rname "chr1"}
{:qname "q4", :rname "chr1"}
{:qname "q5", :rname "chr2"}]
[[0 [[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}
{:qname "q3", :rname "chr1"}]
[{:qname "q4", :rname "chr1"}]]]
[4 [[{:qname "q5", :rname "chr2"}]]]]

[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr2"}
{:qname "q3", :rname "chr2"}
{:qname "q4", :rname "chr3"}
{:qname "q5", :rname "chr4"}]
[[0 [[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr2"}]]]
[2 [[{:qname "q3", :rname "chr2"}
{:qname "q4", :rname "chr3"}]]]
[4 [[{:qname "q5", :rname "chr4"}]]]]

[{:qname "q1", :rname "*"}]
[[0 [[{:qname "q1", :rname "*"}]]]]

[{:qname "q1", :rname "*"}
{:qname "q2", :rname "*"}]
[[0 [[{:qname "q1", :rname "*"}
{:qname "q2", :rname "*"}]]]]

[{:qname "q1", :rname "*"}
{:qname "q2", :rname "*"}
{:qname "q3", :rname "*"}
{:qname "q4", :rname "*"}]
[[0 [[{:qname "q1", :rname "*"}
{:qname "q2", :rname "*"}
{:qname "q3", :rname "*"}]
[{:qname "q4", :rname "*"}]]]]

[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "*"}]
[[0 [[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "*"}]]]]

[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}
{:qname "q3", :rname "*"}]
[[0 [[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}]]]
[2 [[{:qname "q3", :rname "*"}]]]]

[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}
{:qname "q3", :rname "*"}
{:qname "q4", :rname "*"}]
[[0 [[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}]]]
[2 [[{:qname "q3", :rname "*"}
{:qname "q4", :rname "*"}]]]]

[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "*"}
{:qname "q3", :rname "*"}
{:qname "q4", :rname "*"}]
[[0 [[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "*"}]]]
[2 [[{:qname "q3", :rname "*"}
{:qname "q4", :rname "*"}]]]])
(are [input] (thrown? Exception (partition-alignments header options input))
[{:qname "q1", :rname "*"}
{:qname "q2", :rname "chr1"}]

[{:qname "q1", :rname "*"}
{:qname "q2", :rname "*"}
{:qname "q3", :rname "chr1"}])))
(testing "unsorted"
(are [input expected]
(= expected (partition-alignments {:HD {:SO "unsorted"}} options input))
[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr2"}
{:qname "q3", :rname "chr1"}]
[[0 [[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr2"}
{:qname "q3", :rname "chr1"}]]]]

[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr2"}
{:qname "q3", :rname "chr1"}
{:qname "q4", :rname "chr2"}]
[[0 [[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr2"}
{:qname "q3", :rname "chr1"}]]]
[3 [[{:qname "q4", :rname "chr2"}]]]]

[{:qname "q1", :rname "*"}
{:qname "q2", :rname "chr1"}]
[[0 [[{:qname "q1", :rname "*"}
{:qname "q2", :rname "chr1"}]]]]

[{:qname "q1", :rname "*"}
{:qname "q2", :rname "*"}
{:qname "q3", :rname "chr1"}]
[[0 [[{:qname "q1", :rname "*"}
{:qname "q2", :rname "*"}
{:qname "q3", :rname "chr1"}]]]]))))
Loading