Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
athos committed Dec 20, 2023
1 parent d9f65b4 commit 19e8e2d
Showing 1 changed file with 330 additions and 0 deletions.
330 changes: 330 additions & 0 deletions src/cljam/io/cram.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,330 @@
(ns cljam.io.cram
{:clj-kondo/ignore [:missing-docstring]}
(:require [cljam.io.cram.codecs.rans4x8 :as rans]
[cljam.io.cram.itf8 :as itf8]
[cljam.io.util.byte-buffer :as bb])
(:import [java.io ByteArrayInputStream IOException]
[java.nio Buffer ByteBuffer ByteOrder]
[java.util Arrays]
[org.apache.commons.compress.compressors CompressorStreamFactory]))

(def ^:const cram-magic "CRAM")

(defn decode-itf8-array [bb]
(let [n (itf8/decode-itf8 bb)]
(loop [i n, acc (transient [])]
(if (zero? i)
(persistent! acc)
(recur (dec i) (conj! acc (itf8/decode-itf8 bb)))))))

(defn decode-encoding [^ByteBuffer bb]
(let [codec-id (itf8/decode-itf8 bb)
_n (itf8/decode-itf8 bb)]
(case codec-id
0 {:codec :null}
1 (let [content-id (itf8/decode-itf8 bb)]
{:codec :external
:content-id content-id})
3 (let [alphabet (decode-itf8-array bb)
bit-len (decode-itf8-array bb)]
{:codec :huffman, :alphabet alphabet, :bit-len bit-len})
4 (let [len-encoding (decode-encoding bb)
val-encoding (decode-encoding bb)]
{:codec :byte-array-len, :len-encoding len-encoding, :val-encoding val-encoding})
5 (let [stop-byte (.get bb)
external-id (itf8/decode-itf8 bb)]
{:codec :byte-array-stop, :stop-byte stop-byte, :external-id external-id})
6 (let [offset (itf8/decode-itf8 bb)
length (itf8/decode-itf8 bb)]
{:codec :beta, :offset offset, :length length})
7 (let [offset (itf8/decode-itf8 bb)
k (itf8/decode-itf8 bb)]
{:codec :subexp, :offset offset, :k k})
9 (let [offset (itf8/decode-itf8 bb)]
{:codec :gamma, :offset offset})
(throw (ex-info (str "codec " codec-id " not supported") {})))))

(defn decode-file-definition [bb]
(when-not (Arrays/equals ^bytes (bb/read-bytes bb 4) (.getBytes cram-magic))
(throw (IOException. "Invalid CRAM file")))
(let [major (bb/read-ubyte bb)
minor (bb/read-ubyte bb)
file-id (bb/read-bytes bb 20)]
{:version {:major major :minor minor}, :id file-id}))

(defn decode-container-header [^ByteBuffer bb]
(let [len (.getInt bb)
ref-seq-id (itf8/decode-itf8 bb)
start-pos (itf8/decode-itf8 bb)
span (itf8/decode-itf8 bb)
n-records (itf8/decode-itf8 bb)
counter (itf8/decode-ltf8 bb)
n-bases (itf8/decode-ltf8 bb)
n-blocks (itf8/decode-itf8 bb)
landmarks (decode-itf8-array bb)
crc (bb/read-bytes bb 4)]
{:length len
:ref ref-seq-id
:start start-pos
:span span
:records n-records
:counter counter
:bases n-bases
:blocks n-blocks
:landmarks landmarks
:crc crc}))

(defn split-buffer ^ByteBuffer [^ByteBuffer bb size]
(let [^Buffer bb' (.order (.slice bb) ByteOrder/LITTLE_ENDIAN)]
(bb/skip bb size)
(.limit bb' size)))

(def decode-block-data
(let [factory (CompressorStreamFactory.)]
(fn [^ByteBuffer bb method size raw-size]
;; TODO: decompress data with specified method
(case (long method)
0 (split-buffer bb size)
4 (->> (split-buffer bb size)
rans/decode
bb/make-lsb-byte-buffer)
(let [compressed (bb/read-bytes bb size)
bais (ByteArrayInputStream. compressed)
uncompressed (byte-array raw-size)
bb' (bb/make-lsb-byte-buffer uncompressed)]
(with-open [is (.createCompressorInputStream factory bais)]
(.read is uncompressed)
bb'))))))

(defn decode-block [bb]
(let [method (bb/read-ubyte bb)
content-type-id (bb/read-ubyte bb)
content-id (itf8/decode-itf8 bb)
size (itf8/decode-itf8 bb)
raw-size (itf8/decode-itf8 bb)
data (decode-block-data bb method size raw-size)
crc (bb/read-bytes bb 4)]
{:method method
:content-type content-type-id
:content-id content-id
:size size
:raw-size raw-size
:data data
:crc crc}))

(defn decode-cram-header-block [bb]
(let [{bb' :data} (decode-block bb)
size (bb/read-uint bb')]
(bb/read-bytes bb' size)))

(defn decode-tag-dictionary [^ByteBuffer bb]
(let [n (itf8/decode-itf8 bb)
bb' (split-buffer bb n)
decode-tags (fn [bb]
(loop [acc (transient [])]
(let [c1 (long (bb/read-ubyte bb))]
(if (zero? c1)
(persistent! acc)
(let [c2 (bb/read-ubyte bb)
t (bb/read-ubyte bb)
tag (keyword (str (char c1) (char c2)))]
(recur (conj! acc {:tag tag, :type (char t)})))))))]
(loop [acc (transient [])]
(if (.hasRemaining bb')
(recur (conj! acc (decode-tags bb')))
(persistent! acc)))))

(defn decode-preservation-map [^ByteBuffer bb]
(let [_size (itf8/decode-itf8 bb)
n (itf8/decode-itf8 bb)]
(loop [i n, acc (transient {:RN true, :AP true, :RR true})]
(if (zero? i)
(persistent! acc)
(let [k (keyword (String. ^bytes (bb/read-bytes bb 2)))
v (case k
(:RN :AP :RR) (pos? (.get bb))
:SM (bb/read-bytes bb 5)
:TD (decode-tag-dictionary bb))]
(recur (dec i) (assoc! acc k v)))))))

(defn decode-data-series-encodings [bb]
(let [_size (itf8/decode-itf8 bb)
n (itf8/decode-itf8 bb)]
(loop [n n, acc (transient {})]
(if (zero? n)
(persistent! acc)
(let [k (keyword (String. ^bytes (bb/read-bytes bb 2)))
v (decode-encoding bb)]
(recur (dec n) (assoc! acc k v)))))))

(defn decode-tag-encoding-map [bb]
(let [_size (itf8/decode-itf8 bb)
n (itf8/decode-itf8 bb)]
(loop [i n, acc (transient {})]
(if (zero? i)
(persistent! acc)
(let [k (itf8/decode-itf8 bb)
c1 (char (bit-and (bit-shift-right k 16) 0xff))
c2 (char (bit-and (bit-shift-right k 8) 0xff))
t (char (bit-and k 0xff))
v (decode-encoding bb)
tag (keyword (str c1 c2))]
(recur (dec i) (assoc! acc tag {:type t :encoding v})))))))

(defn decode-compression-header-block [bb]
(let [{bb' :data} (decode-block bb)
preservation-map (decode-preservation-map bb')
data-series-encodings (decode-data-series-encodings bb')
tag-encoding-map (decode-tag-encoding-map bb')]
{:preservation-map preservation-map
:data-series data-series-encodings
:tags tag-encoding-map}))

(defn decode-slice-header-block [bb]
(let [{bb' :data} (decode-block bb)
ref-seq-id (itf8/decode-itf8 bb')
start (itf8/decode-itf8 bb')
span (itf8/decode-itf8 bb')
n-records (itf8/decode-itf8 bb')
counter (itf8/decode-ltf8 bb')
n-blocks (itf8/decode-itf8 bb')
content-ids (into [] (map (fn [_] (itf8/decode-itf8 bb')))
(range n-blocks))
embedded-reference (itf8/decode-itf8 bb')
reference-md5 (bb/read-bytes bb' 16)
tags []]
{:ref-seq-id ref-seq-id
:start start
:span span
:records n-records
:counter counter
:blocks n-blocks
:content-ids content-ids
:embedded-reference embedded-reference
:reference-md5 reference-md5
:tags tags}))

(defn data-series-type [ds]
(case ds
(:BF :CF :RI :RL :AP :RG :MF :NS :NP :TS :NF :TL :FN :FP :DL :RS :PD :HC :MQ)
:int

(:FC :BS :BA :QS)
:byte

(:RN :BB :QQ :IN :SC)
:bytes))

(defn build-data-series-decoder
[{:keys [codec] :as params} data-type content-id->block-data]
(case codec
:external
(let [^ByteBuffer block (get content-id->block-data (:content-id params))]
(case data-type
:byte #(.get block)
:int #(itf8/decode-itf8 block)))

:huffman
(let [{:keys [alphabet bit-len]} params]
(assert (and (= (count alphabet) 1)
(zero? (long (first bit-len))))
"Huffman coding for more than one word is not supported yet.")
(constantly (first alphabet)))

:byte-array-len
(let [{:keys [len-encoding val-encoding]} params
len-decoder (build-data-series-decoder len-encoding :int content-id->block-data)
val-decoder (build-data-series-decoder val-encoding :byte content-id->block-data)]
(fn []
(let [len (len-decoder)
bb (bb/allocate-lsb-byte-buffer len)]
(dotimes [_ len]
(.put bb (byte (val-decoder))))
(.array bb))))

:byte-array-stop
(let [{:keys [stop-byte external-id]} params
^ByteBuffer block (get content-id->block-data external-id)]
(fn []
(.mark ^Buffer block)
(let [start (.position block)
end (do (while (not= (.get block) (byte stop-byte)))
(.position block))
len (dec (- end start))]
(.reset ^Buffer block)
(bb/read-bytes block len))))))

(defn build-data-series-decoders [ds-encodings blocks]
(let [content-id->block-data (into {} (map (juxt :content-id :data)) blocks)]
(reduce-kv (fn [decoders ds params]
(let [dt (data-series-type ds)
decoder (build-data-series-decoder params dt content-id->block-data)]
(assoc decoders ds decoder)))
{} ds-encodings)))

(defn build-positional-data-decoder
[_compression-header slice-header {:keys [RI RL AP RG]}]
(fn [record]
(assoc record
:rname (if (= (:ref-seq-id slice-header) -2)
(RI)
(:ref-seq-id slice-header))
:len (RL)
:pos (AP)
:options [{:RG {:type "Z" :value (RG)}}])))

(defn build-read-name-decoder [{:keys [preservation-map]} {:keys [RN]}]
(if (:RN preservation-map)
#(assoc % :qname (RN))
(throw (ex-info "Omitted read names are not supported yet." {}))))

(defn build-read-feature-decoder
[{:keys [FN FC FP BA QS BS IN SC HC PD DL RS BB QQ]}]
(fn [record]
(loop [i (long (FN))
acc (transient [])]
(if (zero? i)
(assoc record :features (persistent! acc))
(let [fc (char (FC))
fp (FP)
data (case fc
\B {:base (BA) :qual (QS)}
\X {:subst (BS)}
\I {:inserted (IN)}
\S {:softclip (SC)}
\H {:hardclip (HC)}
\P {:pad (PD)}
\D {:del (DL)}
\N {:skip (RS)}
\i {:base (BA)}
\b {:bases (BB)}
\q {:quals (QQ)}
\Q {:qual (QS)})]
(recur (dec i)
(conj! acc (assoc data :fc fc :fp fp))))))))

(defn build-cram-record-decoder
[compression-header slice-header {:keys [BF CF] :as decoders}]
(let [pd-decoder (build-positional-data-decoder compression-header
slice-header
decoders)
rn-decoder (build-read-name-decoder compression-header decoders)
rf-decoder (build-read-feature-decoder decoders)]
(fn []
(let [bf (BF)
_cf (CF)]
(->> {:flag bf}
(pd-decoder)
(rn-decoder)
(rf-decoder))))))

(defn decode-slice-records [compression-header slice-header blocks]
(let [decoders (build-data-series-decoders (:data-series compression-header) blocks)
record-decoder (build-cram-record-decoder compression-header
slice-header
decoders)]
(letfn [(step [^long i]
(when (pos? i)
(cons (record-decoder) (lazy-seq (step (dec i))))))]
(step (:records slice-header)))))

0 comments on commit 19e8e2d

Please sign in to comment.