diff --git a/src/cljam/io/cram.clj b/src/cljam/io/cram.clj new file mode 100644 index 00000000..20ba474a --- /dev/null +++ b/src/cljam/io/cram.clj @@ -0,0 +1,330 @@ +(ns cljam.io.cram + {:clj-kondo/ignore [:missing-docstring]} + (:require [cljam.io.cram.codecs.rans4x8 :as rans] + [cljam.io.cram.itf8 :as itf8] + [cljam.io.util.byte-buffer :as bb]) + (:import [java.io ByteArrayInputStream IOException] + [java.nio Buffer ByteBuffer ByteOrder] + [java.util Arrays] + [org.apache.commons.compress.compressors CompressorStreamFactory])) + +(def ^:const cram-magic "CRAM") + +(defn decode-itf8-array [bb] + (let [n (itf8/decode-itf8 bb)] + (loop [i n, acc (transient [])] + (if (zero? i) + (persistent! acc) + (recur (dec i) (conj! acc (itf8/decode-itf8 bb))))))) + +(defn decode-encoding [^ByteBuffer bb] + (let [codec-id (itf8/decode-itf8 bb) + _n (itf8/decode-itf8 bb)] + (case codec-id + 0 {:codec :null} + 1 (let [content-id (itf8/decode-itf8 bb)] + {:codec :external + :content-id content-id}) + 3 (let [alphabet (decode-itf8-array bb) + bit-len (decode-itf8-array bb)] + {:codec :huffman, :alphabet alphabet, :bit-len bit-len}) + 4 (let [len-encoding (decode-encoding bb) + val-encoding (decode-encoding bb)] + {:codec :byte-array-len, :len-encoding len-encoding, :val-encoding val-encoding}) + 5 (let [stop-byte (.get bb) + external-id (itf8/decode-itf8 bb)] + {:codec :byte-array-stop, :stop-byte stop-byte, :external-id external-id}) + 6 (let [offset (itf8/decode-itf8 bb) + length (itf8/decode-itf8 bb)] + {:codec :beta, :offset offset, :length length}) + 7 (let [offset (itf8/decode-itf8 bb) + k (itf8/decode-itf8 bb)] + {:codec :subexp, :offset offset, :k k}) + 9 (let [offset (itf8/decode-itf8 bb)] + {:codec :gamma, :offset offset}) + (throw (ex-info (str "codec " codec-id " not supported") {}))))) + +(defn decode-file-definition [bb] + (when-not (Arrays/equals ^bytes (bb/read-bytes bb 4) (.getBytes cram-magic)) + (throw (IOException. "Invalid CRAM file"))) + (let [major (bb/read-ubyte bb) + minor (bb/read-ubyte bb) + file-id (bb/read-bytes bb 20)] + {:version {:major major :minor minor}, :id file-id})) + +(defn decode-container-header [^ByteBuffer bb] + (let [len (.getInt bb) + ref-seq-id (itf8/decode-itf8 bb) + start-pos (itf8/decode-itf8 bb) + span (itf8/decode-itf8 bb) + n-records (itf8/decode-itf8 bb) + counter (itf8/decode-ltf8 bb) + n-bases (itf8/decode-ltf8 bb) + n-blocks (itf8/decode-itf8 bb) + landmarks (decode-itf8-array bb) + crc (bb/read-bytes bb 4)] + {:length len + :ref ref-seq-id + :start start-pos + :span span + :records n-records + :counter counter + :bases n-bases + :blocks n-blocks + :landmarks landmarks + :crc crc})) + +(defn split-buffer ^ByteBuffer [^ByteBuffer bb size] + (let [^Buffer bb' (.order (.slice bb) ByteOrder/LITTLE_ENDIAN)] + (bb/skip bb size) + (.limit bb' size))) + +(def decode-block-data + (let [factory (CompressorStreamFactory.)] + (fn [^ByteBuffer bb method size raw-size] + ;; TODO: decompress data with specified method + (case (long method) + 0 (split-buffer bb size) + 4 (->> (split-buffer bb size) + rans/decode + bb/make-lsb-byte-buffer) + (let [compressed (bb/read-bytes bb size) + bais (ByteArrayInputStream. compressed) + uncompressed (byte-array raw-size) + bb' (bb/make-lsb-byte-buffer uncompressed)] + (with-open [is (.createCompressorInputStream factory bais)] + (.read is uncompressed) + bb')))))) + +(defn decode-block [bb] + (let [method (bb/read-ubyte bb) + content-type-id (bb/read-ubyte bb) + content-id (itf8/decode-itf8 bb) + size (itf8/decode-itf8 bb) + raw-size (itf8/decode-itf8 bb) + data (decode-block-data bb method size raw-size) + crc (bb/read-bytes bb 4)] + {:method method + :content-type content-type-id + :content-id content-id + :size size + :raw-size raw-size + :data data + :crc crc})) + +(defn decode-cram-header-block [bb] + (let [{bb' :data} (decode-block bb) + size (bb/read-uint bb')] + (bb/read-bytes bb' size))) + +(defn decode-tag-dictionary [^ByteBuffer bb] + (let [n (itf8/decode-itf8 bb) + bb' (split-buffer bb n) + decode-tags (fn [bb] + (loop [acc (transient [])] + (let [c1 (long (bb/read-ubyte bb))] + (if (zero? c1) + (persistent! acc) + (let [c2 (bb/read-ubyte bb) + t (bb/read-ubyte bb) + tag (keyword (str (char c1) (char c2)))] + (recur (conj! acc {:tag tag, :type (char t)})))))))] + (loop [acc (transient [])] + (if (.hasRemaining bb') + (recur (conj! acc (decode-tags bb'))) + (persistent! acc))))) + +(defn decode-preservation-map [^ByteBuffer bb] + (let [_size (itf8/decode-itf8 bb) + n (itf8/decode-itf8 bb)] + (loop [i n, acc (transient {:RN true, :AP true, :RR true})] + (if (zero? i) + (persistent! acc) + (let [k (keyword (String. ^bytes (bb/read-bytes bb 2))) + v (case k + (:RN :AP :RR) (pos? (.get bb)) + :SM (bb/read-bytes bb 5) + :TD (decode-tag-dictionary bb))] + (recur (dec i) (assoc! acc k v))))))) + +(defn decode-data-series-encodings [bb] + (let [_size (itf8/decode-itf8 bb) + n (itf8/decode-itf8 bb)] + (loop [n n, acc (transient {})] + (if (zero? n) + (persistent! acc) + (let [k (keyword (String. ^bytes (bb/read-bytes bb 2))) + v (decode-encoding bb)] + (recur (dec n) (assoc! acc k v))))))) + +(defn decode-tag-encoding-map [bb] + (let [_size (itf8/decode-itf8 bb) + n (itf8/decode-itf8 bb)] + (loop [i n, acc (transient {})] + (if (zero? i) + (persistent! acc) + (let [k (itf8/decode-itf8 bb) + c1 (char (bit-and (bit-shift-right k 16) 0xff)) + c2 (char (bit-and (bit-shift-right k 8) 0xff)) + t (char (bit-and k 0xff)) + v (decode-encoding bb) + tag (keyword (str c1 c2))] + (recur (dec i) (assoc! acc tag {:type t :encoding v}))))))) + +(defn decode-compression-header-block [bb] + (let [{bb' :data} (decode-block bb) + preservation-map (decode-preservation-map bb') + data-series-encodings (decode-data-series-encodings bb') + tag-encoding-map (decode-tag-encoding-map bb')] + {:preservation-map preservation-map + :data-series data-series-encodings + :tags tag-encoding-map})) + +(defn decode-slice-header-block [bb] + (let [{bb' :data} (decode-block bb) + ref-seq-id (itf8/decode-itf8 bb') + start (itf8/decode-itf8 bb') + span (itf8/decode-itf8 bb') + n-records (itf8/decode-itf8 bb') + counter (itf8/decode-ltf8 bb') + n-blocks (itf8/decode-itf8 bb') + content-ids (into [] (map (fn [_] (itf8/decode-itf8 bb'))) + (range n-blocks)) + embedded-reference (itf8/decode-itf8 bb') + reference-md5 (bb/read-bytes bb' 16) + tags []] + {:ref-seq-id ref-seq-id + :start start + :span span + :records n-records + :counter counter + :blocks n-blocks + :content-ids content-ids + :embedded-reference embedded-reference + :reference-md5 reference-md5 + :tags tags})) + +(defn data-series-type [ds] + (case ds + (:BF :CF :RI :RL :AP :RG :MF :NS :NP :TS :NF :TL :FN :FP :DL :RS :PD :HC :MQ) + :int + + (:FC :BS :BA :QS) + :byte + + (:RN :BB :QQ :IN :SC) + :bytes)) + +(defn build-data-series-decoder + [{:keys [codec] :as params} data-type content-id->block-data] + (case codec + :external + (let [^ByteBuffer block (get content-id->block-data (:content-id params))] + (case data-type + :byte #(.get block) + :int #(itf8/decode-itf8 block))) + + :huffman + (let [{:keys [alphabet bit-len]} params] + (assert (and (= (count alphabet) 1) + (zero? (long (first bit-len)))) + "Huffman coding for more than one word is not supported yet.") + (constantly (first alphabet))) + + :byte-array-len + (let [{:keys [len-encoding val-encoding]} params + len-decoder (build-data-series-decoder len-encoding :int content-id->block-data) + val-decoder (build-data-series-decoder val-encoding :byte content-id->block-data)] + (fn [] + (let [len (len-decoder) + bb (bb/allocate-lsb-byte-buffer len)] + (dotimes [_ len] + (.put bb (byte (val-decoder)))) + (.array bb)))) + + :byte-array-stop + (let [{:keys [stop-byte external-id]} params + ^ByteBuffer block (get content-id->block-data external-id)] + (fn [] + (.mark ^Buffer block) + (let [start (.position block) + end (do (while (not= (.get block) (byte stop-byte))) + (.position block)) + len (dec (- end start))] + (.reset ^Buffer block) + (bb/read-bytes block len)))))) + +(defn build-data-series-decoders [ds-encodings blocks] + (let [content-id->block-data (into {} (map (juxt :content-id :data)) blocks)] + (reduce-kv (fn [decoders ds params] + (let [dt (data-series-type ds) + decoder (build-data-series-decoder params dt content-id->block-data)] + (assoc decoders ds decoder))) + {} ds-encodings))) + +(defn build-positional-data-decoder + [_compression-header slice-header {:keys [RI RL AP RG]}] + (fn [record] + (assoc record + :rname (if (= (:ref-seq-id slice-header) -2) + (RI) + (:ref-seq-id slice-header)) + :len (RL) + :pos (AP) + :options [{:RG {:type "Z" :value (RG)}}]))) + +(defn build-read-name-decoder [{:keys [preservation-map]} {:keys [RN]}] + (if (:RN preservation-map) + #(assoc % :qname (RN)) + (throw (ex-info "Omitted read names are not supported yet." {})))) + +(defn build-read-feature-decoder + [{:keys [FN FC FP BA QS BS IN SC HC PD DL RS BB QQ]}] + (fn [record] + (loop [i (long (FN)) + acc (transient [])] + (if (zero? i) + (assoc record :features (persistent! acc)) + (let [fc (char (FC)) + fp (FP) + data (case fc + \B {:base (BA) :qual (QS)} + \X {:subst (BS)} + \I {:inserted (IN)} + \S {:softclip (SC)} + \H {:hardclip (HC)} + \P {:pad (PD)} + \D {:del (DL)} + \N {:skip (RS)} + \i {:base (BA)} + \b {:bases (BB)} + \q {:quals (QQ)} + \Q {:qual (QS)})] + (recur (dec i) + (conj! acc (assoc data :fc fc :fp fp)))))))) + +(defn build-cram-record-decoder + [compression-header slice-header {:keys [BF CF] :as decoders}] + (let [pd-decoder (build-positional-data-decoder compression-header + slice-header + decoders) + rn-decoder (build-read-name-decoder compression-header decoders) + rf-decoder (build-read-feature-decoder decoders)] + (fn [] + (let [bf (BF) + _cf (CF)] + (->> {:flag bf} + (pd-decoder) + (rn-decoder) + (rf-decoder)))))) + +(defn decode-slice-records [compression-header slice-header blocks] + (let [decoders (build-data-series-decoders (:data-series compression-header) blocks) + record-decoder (build-cram-record-decoder compression-header + slice-header + decoders)] + (letfn [(step [^long i] + (when (pos? i) + (cons (record-decoder) (lazy-seq (step (dec i))))))] + (step (:records slice-header))))) +