-
Notifications
You must be signed in to change notification settings - Fork 12
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
1 changed file
with
344 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,344 @@ | ||
(ns cljam.io.cram | ||
{:clj-kondo/ignore [:missing-docstring]} | ||
(:require [cljam.io.cram.codecs.rans4x8 :as rans] | ||
[cljam.io.cram.itf8 :as itf8] | ||
[cljam.io.util.byte-buffer :as bb]) | ||
(:import [java.io ByteArrayInputStream IOException] | ||
[java.nio Buffer ByteBuffer ByteOrder] | ||
[java.util Arrays] | ||
[org.apache.commons.compress.compressors CompressorStreamFactory])) | ||
|
||
(def ^:const cram-magic "CRAM") | ||
|
||
(defn decode-itf8-array [bb] | ||
(let [n (itf8/decode-itf8 bb)] | ||
(loop [i n, acc (transient [])] | ||
(if (zero? i) | ||
(persistent! acc) | ||
(recur (dec i) (conj! acc (itf8/decode-itf8 bb))))))) | ||
|
||
(defn decode-encoding [^ByteBuffer bb] | ||
(let [codec-id (itf8/decode-itf8 bb) | ||
_n (itf8/decode-itf8 bb)] | ||
(case codec-id | ||
0 {:codec :null} | ||
1 (let [content-id (itf8/decode-itf8 bb)] | ||
{:codec :external | ||
:content-id content-id}) | ||
3 (let [alphabet (decode-itf8-array bb) | ||
bit-len (decode-itf8-array bb)] | ||
{:codec :huffman, :alphabet alphabet, :bit-len bit-len}) | ||
4 (let [len-encoding (decode-encoding bb) | ||
val-encoding (decode-encoding bb)] | ||
{:codec :byte-array-len, :len-encoding len-encoding, :val-encoding val-encoding}) | ||
5 (let [stop-byte (.get bb) | ||
external-id (itf8/decode-itf8 bb)] | ||
{:codec :byte-array-stop, :stop-byte stop-byte, :external-id external-id}) | ||
6 (let [offset (itf8/decode-itf8 bb) | ||
length (itf8/decode-itf8 bb)] | ||
{:codec :beta, :offset offset, :length length}) | ||
7 (let [offset (itf8/decode-itf8 bb) | ||
k (itf8/decode-itf8 bb)] | ||
{:codec :subexp, :offset offset, :k k}) | ||
9 (let [offset (itf8/decode-itf8 bb)] | ||
{:codec :gamma, :offset offset}) | ||
(throw (ex-info (str "codec " codec-id " not supported") {}))))) | ||
|
||
(defn decode-file-definition [bb] | ||
(when-not (Arrays/equals ^bytes (bb/read-bytes bb 4) (.getBytes cram-magic)) | ||
(throw (IOException. "Invalid CRAM file"))) | ||
(let [major (bb/read-ubyte bb) | ||
minor (bb/read-ubyte bb) | ||
file-id (bb/read-bytes bb 20)] | ||
{:version {:major major :minor minor}, :id file-id})) | ||
|
||
(defn decode-container-header [^ByteBuffer bb] | ||
(let [len (.getInt bb) | ||
ref-seq-id (itf8/decode-itf8 bb) | ||
start-pos (itf8/decode-itf8 bb) | ||
span (itf8/decode-itf8 bb) | ||
n-records (itf8/decode-itf8 bb) | ||
counter (itf8/decode-ltf8 bb) | ||
n-bases (itf8/decode-ltf8 bb) | ||
n-blocks (itf8/decode-itf8 bb) | ||
landmarks (decode-itf8-array bb) | ||
crc (bb/read-bytes bb 4)] | ||
{:length len | ||
:ref ref-seq-id | ||
:start start-pos | ||
:span span | ||
:records n-records | ||
:counter counter | ||
:bases n-bases | ||
:blocks n-blocks | ||
:landmarks landmarks | ||
:crc crc})) | ||
|
||
(defn split-buffer ^ByteBuffer [^ByteBuffer bb size] | ||
(let [^Buffer bb' (.order (.slice bb) ByteOrder/LITTLE_ENDIAN)] | ||
(bb/skip bb size) | ||
(.limit bb' size))) | ||
|
||
(def decode-block-data | ||
(let [factory (CompressorStreamFactory.)] | ||
(fn [^ByteBuffer bb method size raw-size] | ||
;; TODO: decompress data with specified method | ||
(case (long method) | ||
0 (split-buffer bb size) | ||
4 (->> (split-buffer bb size) | ||
rans/decode | ||
bb/make-lsb-byte-buffer) | ||
(let [compressed (bb/read-bytes bb size) | ||
bais (ByteArrayInputStream. compressed) | ||
uncompressed (byte-array raw-size) | ||
bb' (bb/make-lsb-byte-buffer uncompressed)] | ||
(with-open [is (.createCompressorInputStream factory bais)] | ||
(.read is uncompressed) | ||
bb')))))) | ||
|
||
(defn decode-block [bb] | ||
(let [method (bb/read-ubyte bb) | ||
content-type-id (bb/read-ubyte bb) | ||
content-id (itf8/decode-itf8 bb) | ||
size (itf8/decode-itf8 bb) | ||
raw-size (itf8/decode-itf8 bb) | ||
data (decode-block-data bb method size raw-size) | ||
crc (bb/read-bytes bb 4)] | ||
{:method method | ||
:content-type content-type-id | ||
:content-id content-id | ||
:size size | ||
:raw-size raw-size | ||
:data data | ||
:crc crc})) | ||
|
||
(defn decode-cram-header-block [bb] | ||
(let [{bb' :data} (decode-block bb) | ||
size (bb/read-uint bb')] | ||
(bb/read-bytes bb' size))) | ||
|
||
(defn decode-tag-dictionary [^ByteBuffer bb] | ||
(let [n (itf8/decode-itf8 bb) | ||
bb' (split-buffer bb n) | ||
decode-tags (fn [bb] | ||
(loop [acc (transient [])] | ||
(let [c1 (long (bb/read-ubyte bb))] | ||
(if (zero? c1) | ||
(persistent! acc) | ||
(let [c2 (bb/read-ubyte bb) | ||
t (bb/read-ubyte bb) | ||
tag (keyword (str (char c1) (char c2)))] | ||
(recur (conj! acc {:tag tag, :type (char t)})))))))] | ||
(loop [acc (transient [])] | ||
(if (.hasRemaining bb') | ||
(recur (conj! acc (decode-tags bb'))) | ||
(persistent! acc))))) | ||
|
||
(defn decode-preservation-map [^ByteBuffer bb] | ||
(let [_size (itf8/decode-itf8 bb) | ||
n (itf8/decode-itf8 bb)] | ||
(loop [i n, acc (transient {:RN true, :AP true, :RR true})] | ||
(if (zero? i) | ||
(persistent! acc) | ||
(let [k (keyword (String. ^bytes (bb/read-bytes bb 2))) | ||
v (case k | ||
(:RN :AP :RR) (pos? (.get bb)) | ||
:SM (bb/read-bytes bb 5) | ||
:TD (decode-tag-dictionary bb))] | ||
(recur (dec i) (assoc! acc k v))))))) | ||
|
||
(defn decode-data-series-encodings [bb] | ||
(let [_size (itf8/decode-itf8 bb) | ||
n (itf8/decode-itf8 bb)] | ||
(loop [n n, acc (transient {})] | ||
(if (zero? n) | ||
(persistent! acc) | ||
(let [k (keyword (String. ^bytes (bb/read-bytes bb 2))) | ||
v (decode-encoding bb)] | ||
(recur (dec n) (assoc! acc k v))))))) | ||
|
||
(defn decode-tag-encoding-map [bb] | ||
(let [_size (itf8/decode-itf8 bb) | ||
n (itf8/decode-itf8 bb)] | ||
(loop [i n, acc (transient {})] | ||
(if (zero? i) | ||
(persistent! acc) | ||
(let [k (itf8/decode-itf8 bb) | ||
c1 (char (bit-and (bit-shift-right k 16) 0xff)) | ||
c2 (char (bit-and (bit-shift-right k 8) 0xff)) | ||
t (char (bit-and k 0xff)) | ||
v (decode-encoding bb) | ||
tag (keyword (str c1 c2))] | ||
(recur (dec i) (assoc! acc tag {:type t :encoding v}))))))) | ||
|
||
(defn decode-compression-header-block [bb] | ||
(let [{bb' :data} (decode-block bb) | ||
preservation-map (decode-preservation-map bb') | ||
data-series-encodings (decode-data-series-encodings bb') | ||
tag-encoding-map (decode-tag-encoding-map bb')] | ||
{:preservation-map preservation-map | ||
:data-series data-series-encodings | ||
:tags tag-encoding-map})) | ||
|
||
(defn decode-slice-header-block [bb] | ||
(let [{bb' :data} (decode-block bb) | ||
ref-seq-id (itf8/decode-itf8 bb') | ||
start (itf8/decode-itf8 bb') | ||
span (itf8/decode-itf8 bb') | ||
n-records (itf8/decode-itf8 bb') | ||
counter (itf8/decode-ltf8 bb') | ||
n-blocks (itf8/decode-itf8 bb') | ||
content-ids (into [] (map (fn [_] (itf8/decode-itf8 bb'))) | ||
(range n-blocks)) | ||
embedded-reference (itf8/decode-itf8 bb') | ||
reference-md5 (bb/read-bytes bb' 16) | ||
tags []] | ||
{:ref-seq-id ref-seq-id | ||
:start start | ||
:span span | ||
:records n-records | ||
:counter counter | ||
:blocks n-blocks | ||
:content-ids content-ids | ||
:embedded-reference embedded-reference | ||
:reference-md5 reference-md5 | ||
:tags tags})) | ||
|
||
(defn data-series-type [ds] | ||
(case ds | ||
(:BF :CF :RI :RL :AP :RG :MF :NS :NP :TS :NF :TL :FN :FP :DL :RS :PD :HC :MQ) | ||
:int | ||
|
||
(:FC :BS :BA :QS) | ||
:byte | ||
|
||
(:RN :BB :QQ :IN :SC) | ||
:bytes)) | ||
|
||
(defn build-data-series-decoder | ||
[{:keys [codec] :as params} data-type content-id->block-data] | ||
(case codec | ||
:external | ||
(let [^ByteBuffer block (get content-id->block-data (:content-id params))] | ||
(case data-type | ||
:byte #(.get block) | ||
:int #(itf8/decode-itf8 block))) | ||
|
||
:huffman | ||
(let [{:keys [alphabet bit-len]} params] | ||
(assert (and (= (count alphabet) 1) | ||
(zero? (long (first bit-len)))) | ||
"Huffman coding for more than one word is not supported yet.") | ||
(constantly (first alphabet))) | ||
|
||
:byte-array-len | ||
(let [{:keys [len-encoding val-encoding]} params | ||
len-decoder (build-data-series-decoder len-encoding :int content-id->block-data) | ||
val-decoder (build-data-series-decoder val-encoding :byte content-id->block-data)] | ||
(fn [] | ||
(let [len (len-decoder) | ||
bb (bb/allocate-lsb-byte-buffer len)] | ||
(dotimes [_ len] | ||
(.put bb (byte (val-decoder)))) | ||
(.array bb)))) | ||
|
||
:byte-array-stop | ||
(let [{:keys [stop-byte external-id]} params | ||
^ByteBuffer block (get content-id->block-data external-id)] | ||
(fn [] | ||
(.mark ^Buffer block) | ||
(let [start (.position block) | ||
end (do (while (not= (.get block) (byte stop-byte))) | ||
(.position block)) | ||
len (dec (- end start)) | ||
_ (.reset ^Buffer block) | ||
ret (bb/read-bytes block len)] | ||
(.get block) | ||
ret))))) | ||
|
||
(defn build-data-series-decoders [ds-encodings blocks] | ||
(let [content-id->block-data (into {} (map (juxt :content-id :data)) blocks)] | ||
(reduce-kv (fn [decoders ds params] | ||
(let [dt (data-series-type ds) | ||
decoder (build-data-series-decoder params dt content-id->block-data)] | ||
(assoc decoders ds decoder))) | ||
{} ds-encodings))) | ||
|
||
(defn build-positional-data-decoder | ||
[_compression-header slice-header {:keys [RI RL AP RG]}] | ||
(fn [record] | ||
(let [rg (RG)] | ||
(assoc record | ||
:rname (if (= (:ref-seq-id slice-header) -2) | ||
(RI) | ||
(:ref-seq-id slice-header)) | ||
::len (RL) | ||
:pos (AP) | ||
:options (cond-> [] | ||
(not= rg -1) | ||
(conj {:RG {:type "Z" :value rg}})))))) | ||
|
||
(defn build-read-name-decoder [{:keys [preservation-map]} {:keys [RN]}] | ||
(if (:RN preservation-map) | ||
#(assoc % :qname (RN)) | ||
(throw (ex-info "Omitted read names are not supported yet." {})))) | ||
|
||
(defn build-read-feature-decoder | ||
[{:keys [FN MQ QS FC FP BA BS IN SC HC PD DL RS BB QQ]}] | ||
(fn [record] | ||
(loop [i (long (FN)) | ||
acc (transient [])] | ||
(if (zero? i) | ||
(let [record' (assoc record | ||
:mapq (MQ) | ||
:features (persistent! acc)) | ||
qual (when (pos? (bit-and (long (::flag record)) 0x01)) | ||
(let [len (::len record) | ||
bb (bb/allocate-lsb-byte-buffer len)] | ||
(dotimes [_ len] | ||
(.put bb (byte (+ 33 (long (QS)))))) | ||
(.array bb)))] | ||
(cond-> record' qual (assoc :qual qual))) | ||
(let [fc (char (FC)) | ||
fp (FP) | ||
data (case fc | ||
\B {:base (BA) :qual (QS)} | ||
\X {:subst (BS)} | ||
\I {:inserted (IN)} | ||
\S {:softclip (SC)} | ||
\H {:hardclip (HC)} | ||
\P {:pad (PD)} | ||
\D {:del (DL)} | ||
\N {:skip (RS)} | ||
\i {:base (BA)} | ||
\b {:bases (BB)} | ||
\q {:quals (QQ)} | ||
\Q {:qual (QS)})] | ||
(recur (dec i) | ||
(conj! acc (assoc data :fc fc :fp fp)))))))) | ||
|
||
(defn build-cram-record-decoder | ||
[compression-header slice-header {:keys [BF CF] :as decoders}] | ||
(let [pd-decoder (build-positional-data-decoder compression-header | ||
slice-header | ||
decoders) | ||
rn-decoder (build-read-name-decoder compression-header decoders) | ||
rf-decoder (build-read-feature-decoder decoders)] | ||
(fn [] | ||
(let [bf (BF) | ||
cf (CF)] | ||
(->> {:flag bf ::flag cf} | ||
(pd-decoder) | ||
(rn-decoder) | ||
(rf-decoder)))))) | ||
|
||
(defn decode-slice-records [compression-header slice-header blocks] | ||
(let [decoders (build-data-series-decoders (:data-series compression-header) blocks) | ||
record-decoder (build-cram-record-decoder compression-header | ||
slice-header | ||
decoders)] | ||
(letfn [(step [^long i] | ||
(when (pos? i) | ||
(cons (record-decoder) (lazy-seq (step (dec i))))))] | ||
(step (:records slice-header))))) | ||
|