Skip to content

Commit

Permalink
wip: Embed ref seqs if :embed-reference? is set to true
Browse files Browse the repository at this point in the history
  • Loading branch information
athos committed Oct 9, 2024
1 parent 66b4d3e commit 918aa86
Show file tree
Hide file tree
Showing 6 changed files with 219 additions and 141 deletions.
65 changes: 33 additions & 32 deletions src/cljam/io/cram/data_series.clj
Original file line number Diff line number Diff line change
Expand Up @@ -147,42 +147,43 @@

(def ^{:doc "Default encodings for all the data series"}
default-data-series-encodings
{:BF {:content-id 1, :codec :external, :compressor :gzip}
:CF {:content-id 2, :codec :external, :compressor :gzip}
:RI {:content-id 3, :codec :external, :compressor :gzip}
:RL {:content-id 4, :codec :external, :compressor :gzip}
:AP {:content-id 5, :codec :external, :compressor :gzip}
:RG {:content-id 6, :codec :external, :compressor :gzip}
:RN {:content-id 7, :codec :byte-array-stop, :stop-byte (int \tab), :compressor :gzip}
:MF {:content-id 8, :codec :external, :compressor :gzip}
:NS {:content-id 9, :codec :external, :compressor :gzip}
:NP {:content-id 10, :codec :external, :compressor :gzip}
:TS {:content-id 11, :codec :external, :compressor :gzip}
:NF {:content-id 12, :codec :external, :compressor :gzip}
:TL {:content-id 13, :codec :external, :compressor :gzip}
:FN {:content-id 14, :codec :external, :compressor :gzip}
:FC {:content-id 15, :codec :external, :compressor :gzip}
:FP {:content-id 16, :codec :external, :compressor :gzip}
:DL {:content-id 17, :codec :external, :compressor :gzip}
{:embedded-ref {::content-id 1, :codec :external, :compressor :gzip}
:BF {:content-id 2, :codec :external, :compressor :gzip}
:CF {:content-id 3, :codec :external, :compressor :gzip}
:RI {:content-id 4, :codec :external, :compressor :gzip}
:RL {:content-id 5, :codec :external, :compressor :gzip}
:AP {:content-id 6, :codec :external, :compressor :gzip}
:RG {:content-id 7, :codec :external, :compressor :gzip}
:RN {:content-id 8, :codec :byte-array-stop, :stop-byte (int \tab), :compressor :gzip}
:MF {:content-id 9, :codec :external, :compressor :gzip}
:NS {:content-id 10, :codec :external, :compressor :gzip}
:NP {:content-id 11, :codec :external, :compressor :gzip}
:TS {:content-id 12, :codec :external, :compressor :gzip}
:NF {:content-id 13, :codec :external, :compressor :gzip}
:TL {:content-id 14, :codec :external, :compressor :gzip}
:FN {:content-id 15, :codec :external, :compressor :gzip}
:FC {:content-id 16, :codec :external, :compressor :gzip}
:FP {:content-id 17, :codec :external, :compressor :gzip}
:DL {:content-id 18, :codec :external, :compressor :gzip}
:BB {:codec :byte-array-len
:len-encoding {:codec :external, :content-id 18, :compressor :gzip}
:val-encoding {:codec :external, :content-id 19, :compressor :gzip}}
:len-encoding {:codec :external, :content-id 19, :compressor :gzip}
:val-encoding {:codec :external, :content-id 20, :compressor :gzip}}
:QQ {:codec :byte-array-len
:len-encoding {:codec :external, :content-id 20, :compressor :gzip}
:val-encoding {:codec :external, :content-id 21, :compressor :gzip}}
:BS {:content-id 22, :codec :external, :compressor :gzip}
:len-encoding {:codec :external, :content-id 21, :compressor :gzip}
:val-encoding {:codec :external, :content-id 22, :compressor :gzip}}
:BS {:content-id 23, :codec :external, :compressor :gzip}
:IN {:codec :byte-array-len
:len-encoding {:codec :external, :content-id 23, :compressor :gzip}
:val-encoding {:codec :external, :content-id 24, :compressor :gzip}}
:RS {:content-id 25, :codec :external, :compressor :gzip}
:PD {:content-id 26, :codec :external, :compressor :gzip}
:HC {:content-id 27, :codec :external, :compressor :gzip}
:len-encoding {:codec :external, :content-id 24, :compressor :gzip}
:val-encoding {:codec :external, :content-id 25, :compressor :gzip}}
:RS {:content-id 26, :codec :external, :compressor :gzip}
:PD {:content-id 27, :codec :external, :compressor :gzip}
:HC {:content-id 28, :codec :external, :compressor :gzip}
:SC {:codec :byte-array-len
:len-encoding {:codec :external, :content-id 28, :compressor :gzip}
:val-encoding {:codec :external, :content-id 29, :compressor :gzip}}
:MQ {:content-id 30, :codec :external, :compressor :gzip}
:BA {:content-id 31, :codec :external, :compressor :gzip}
:QS {:content-id 32, :codec :external, :compressor :gzip}})
:len-encoding {:codec :external, :content-id 29, :compressor :gzip}
:val-encoding {:codec :external, :content-id 30, :compressor :gzip}}
:MQ {:content-id 31, :codec :external, :compressor :gzip}
:BA {:content-id 32, :codec :external, :compressor :gzip}
:QS {:content-id 33, :codec :external, :compressor :gzip}})

(defn- build-codec-encoder
[{:keys [codec content-id compressor] :as params} data-type content-id->state]
Expand Down
13 changes: 8 additions & 5 deletions src/cljam/io/cram/encode/context.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

(defn make-container-context
"Creates a new container context."
[cram-header seq-resolver]
[cram-header seq-resolver options]
(let [rname->idx (into {}
(map-indexed (fn [i {:keys [SN]}] [SN i]))
(:SQ cram-header))
Expand All @@ -24,14 +24,17 @@
:preservation-map preservation-map
:subst-mat subst-mat
:seq-resolver seq-resolver
:tag-dict-builder tag-dict-builder}))
:tag-dict-builder tag-dict-builder
:options options}))

(defn finalize-container-context
"Finalizes the builders in the container context and returns a new container
context containing those builders' results. This operation must be done before
creating a slice context."
[container-ctx alignment-stats ds-compressor-overrides tag-compressor-overrides]
(let [ds-encodings (-> ds/default-data-series-encodings
[container-ctx alignment-stats]
(let [{:keys [ds-compressor-overrides
tag-compressor-overrides]} (:options container-ctx)
ds-encodings (-> ds/default-data-series-encodings
(ds/apply-ds-compressor-overrides ds-compressor-overrides))
tag-dict (tag-dict/build-tag-dict (:tag-dict-builder container-ctx))
tag-encodings (-> (tag-dict/build-tag-encodings tag-dict)
Expand All @@ -46,7 +49,7 @@
"Creates a slice context for the ith slice from the given container context.
Note that the container context must be finalized with `finalize-container-context`."
[{:keys [alignment-stats ds-encodings tag-encodings] :as container-ctx} i]
(let [ds-encoders (ds/build-data-series-encoders ds-encodings)
(let [ds-encoders (ds/build-data-series-encoders (dissoc ds-encodings :embedded-ref))
tag-encoders (ds/build-tag-encoders tag-encodings)]
(assoc container-ctx
:alignment-stats (nth alignment-stats i)
Expand Down
7 changes: 5 additions & 2 deletions src/cljam/io/cram/encode/structure.clj
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,13 @@
(defn encode-compression-header-block
"Encodes a compression header block to the given OutputStream."
[out preservation-map subst-mat tag-dict ds-encodings tag-encodings]
(let [bs (with-out-byte-array
;; ensure that ds-encodings does not contain an encoding for :embedded-ref
;; pseudo data series
(let [ds-encodings' (dissoc ds-encodings :embedded-ref)
bs (with-out-byte-array
(fn [out']
(encode-preservation-map out' preservation-map subst-mat tag-dict)
(encode-data-series-encodings out' ds-encodings)
(encode-data-series-encodings out' ds-encodings')
(encode-tag-encoding-map out' tag-encodings)))]
(encode-block out :raw 1 0 (alength bs) bs)))

Expand Down
97 changes: 64 additions & 33 deletions src/cljam/io/cram/writer.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
(ns cljam.io.cram.writer
(:require [cljam.io.crai :as crai]
[cljam.io.cram.encode.alignment-stats :as stats]
[cljam.io.cram.encode.compressor :as compressor]
[cljam.io.cram.encode.context :as context]
[cljam.io.cram.encode.partitioning :as partition]
[cljam.io.cram.encode.record :as record]
Expand Down Expand Up @@ -51,36 +52,51 @@

(defn- preprocess-records
[cram-header seq-resolver options ^objects container-records]
(let [container-ctx (context/make-container-context cram-header seq-resolver)
{:keys [ds-compressor-overrides tag-compressor-overrides]} options
(let [container-ctx (context/make-container-context cram-header seq-resolver options)
stats (mapv (partial record/preprocess-slice-records container-ctx)
container-records)]
(context/finalize-container-context container-ctx
stats
ds-compressor-overrides
tag-compressor-overrides)))

(defn- generate-blocks [slice-ctx]
(->> (context/encoding-results slice-ctx)
(keep (fn [{:keys [^long raw-size] :as block}]
(when (pos? raw-size)
(update block :data
(fn [^bytes data]
(struct/generate-block (:compressor block) 4
(:content-id block) raw-size
data))))))
;; sort + dedupe by :content-id
(into (sorted-map) (map (juxt :content-id identity)))
vals
(cons {:content-id 0
:data (struct/generate-block :raw 5 0 0 (byte-array 0))})))

(defn- reference-md5
[{:keys [seq-resolver cram-header]} {:keys [^long ri ^long start ^long end]}]
(context/finalize-container-context container-ctx stats)))

(def ^:private ^:const embedded-ref-content-id 1)

(defn- generate-embedded-ref-block [slice-ctx ^bytes embedded-ref]
(let [raw-size (alength embedded-ref)
compr (compressor/compressor (get-in slice-ctx [:ds-encodings :embedded-ref :compressor]))
_ (with-open [^OutputStream os (compressor/compressor-output-stream compr)]
(.write os embedded-ref))
{:keys [compressor data]} (compressor/->compressed-result compr)]
{:content-id embedded-ref-content-id
:data (struct/generate-block compressor 4 embedded-ref-content-id raw-size data)}))

(defn- generate-blocks [slice-ctx ^bytes embedded-ref]
(letfn [(inject-embedded-ref-block [blocks]
(cond->> blocks
embedded-ref
(cons (generate-embedded-ref-block slice-ctx embedded-ref))))]
(->> (context/encoding-results slice-ctx)
(keep (fn [{:keys [^long raw-size] :as block}]
(when (pos? raw-size)
(update block :data
(fn [^bytes data]
(struct/generate-block (:compressor block) 4
(:content-id block) raw-size
data))))))
;; sort + dedupe by :content-id
(into (sorted-map) (map (juxt :content-id identity)))
vals
inject-embedded-ref-block
(cons {:content-id 0
:data (struct/generate-block :raw 5 0 0 (byte-array 0))}))))

(defn- resolve-reference-seq
^bytes [{:keys [seq-resolver cram-header]} {:keys [^long ri ^long start ^long end]}]
(let [chr (:SN (nth (:SQ cram-header) ri))]
(resolver/resolve-sequence seq-resolver chr start end)))

(defn- reference-md5 [slice-ctx ^bytes embedded-ref {:keys [^long ri] :as stats}]
(if (neg? ri)
(byte-array 16)
(let [chr (:SN (nth (:SQ cram-header) ri))
ref-bases (resolver/resolve-sequence seq-resolver chr start end)
(let [ref-bases (or embedded-ref (resolve-reference-seq slice-ctx stats))
md5 (MessageDigest/getInstance "md5")]
(.digest md5 ref-bases))))

Expand All @@ -94,11 +110,14 @@
(defn- generate-slice [slice-ctx counter slice-records]
(record/encode-slice-records slice-ctx slice-records)
(let [stats (:alignment-stats slice-ctx)
blocks (generate-blocks slice-ctx)
ref-md5 (reference-md5 slice-ctx stats)
embedded-ref (when (and (:embed-reference? (:options slice-ctx))
(>= (long (:ri stats)) 0))
(resolve-reference-seq slice-ctx stats))
blocks (generate-blocks slice-ctx embedded-ref)
ref-md5 (reference-md5 slice-ctx embedded-ref stats)
header (assoc (stats->header-base stats)
:counter counter
:embedded-reference -1
:embedded-reference (if embedded-ref embedded-ref-content-id -1)
:reference-md5 ref-md5)
header-block (struct/generate-slice-header-block header blocks)
block-data (mapv :data blocks)]
Expand All @@ -119,9 +138,20 @@
acc)))

(defn- generate-compression-header-block
^bytes [{:keys [preservation-map subst-mat tag-dict ds-encodings tag-encodings]}]
(struct/generate-compression-header-block preservation-map subst-mat tag-dict
ds-encodings tag-encodings))
^bytes
[{:keys [preservation-map subst-mat tag-dict ds-encodings tag-encodings]} slices]
(let [preservation-map' (cond-> preservation-map
;; A reference file is not required when either of
;; the following holds for every slice in the container:
;; - the slice has an embedded reference block
;; - the slice only contains unmapped records
(every? (fn [{:keys [header]}]
(or (>= (long (:embedded-reference header)) 0)
(= (long (:ref-seq-id header)) -1)))
slices)
(assoc :RR false))]
(struct/generate-compression-header-block preservation-map' subst-mat tag-dict
ds-encodings tag-encodings)))

(defn- generate-container-header [container-ctx ^bytes compression-header-block slices]
(let [stats (stats/merge-stats (:alignment-stats container-ctx))
Expand Down Expand Up @@ -175,7 +205,8 @@
(let [container-ctx (preprocess-records cram-header (.-seq-resolver wtr)
(.-options wtr) container-records)
slices (generate-slices container-ctx counter container-records)
compression-header-block (generate-compression-header-block container-ctx)
compression-header-block (generate-compression-header-block container-ctx
slices)
container-header (generate-container-header container-ctx
compression-header-block
slices)
Expand Down
Loading

0 comments on commit 918aa86

Please sign in to comment.