diff --git a/src/cljam/io/cram.clj b/src/cljam/io/cram.clj new file mode 100644 index 00000000..12e496bc --- /dev/null +++ b/src/cljam/io/cram.clj @@ -0,0 +1,38 @@ +(ns cljam.io.cram + {:clj-kondo/ignore [:missing-docstring]} + (:require [cljam.io.cram.reader :as reader] + [cljam.io.protocols :as protocols] + [cljam.io.sequence :as cseq] + [cljam.io.util.byte-buffer :as bb] + [cljam.util :as util] + [clojure.java.io :as cio]) + (:import [cljam.io.cram.reader CRAMReader] + [java.nio.channels FileChannel] + [java.nio.file OpenOption StandardOpenOption])) + +(defn reader + (^CRAMReader [f] (reader f {})) + (^CRAMReader [f {:keys [reference]}] + (let [file (cio/file f) + url (util/as-url (.getAbsolutePath file)) + ch (FileChannel/open (.toPath file) + (into-array OpenOption [StandardOpenOption/READ])) + bb (bb/allocate-lsb-byte-buffer 256) + seq-rdr (some-> reference cseq/reader) + header (volatile! nil) + refs (delay + (mapv (fn [{:keys [SN LN]}] {:name SN :len LN}) + (:SQ @header))) + rdr (reader/->CRAMReader url ch bb header refs seq-rdr)] + (reader/read-file-definition rdr) + (vreset! header (reader/read-header rdr)) + rdr))) + +(defn read-header [rdr] + (protocols/read-header rdr)) + +(defn read-refs [rdr] + (protocols/read-refs rdr)) + +(defn read-alignments [rdr] + (protocols/read-alignments rdr)) diff --git a/src/cljam/io/cram/decode/structure.clj b/src/cljam/io/cram/decode/structure.clj index 66f4a47a..ce3eec47 100644 --- a/src/cljam/io/cram/decode/structure.clj +++ b/src/cljam/io/cram/decode/structure.clj @@ -9,7 +9,7 @@ [java.util Arrays] [org.apache.commons.compress.compressors CompressorStreamFactory])) -(def ^:const cram-magic "CRAM") +(def ^:private ^:const cram-magic "CRAM") (defn- decode-itf8-array [bb] (let [n (itf8/decode-itf8 bb)] diff --git a/src/cljam/io/cram/reader.clj b/src/cljam/io/cram/reader.clj index cddfc797..54d88183 100644 --- a/src/cljam/io/cram/reader.clj +++ b/src/cljam/io/cram/reader.clj @@ -1,12 +1,56 @@ (ns cljam.io.cram.reader {:clj-kondo/ignore [:missing-docstring]} - (:require [cljam.io.util.byte-buffer :as bb] - [cljam.io.cram.decode.data-series :as ds] + (:require [cljam.io.cram.decode.data-series :as ds] [cljam.io.cram.decode.record :as record] - [cljam.io.cram.decode.structure :as struct]) - (:import [java.nio Buffer ByteBuffer ByteOrder] + [cljam.io.cram.decode.structure :as struct] + [cljam.io.protocols :as protocols]) + (:import [java.io Closeable] + [java.nio Buffer ByteBuffer ByteOrder] [java.nio.channels FileChannel FileChannel$MapMode])) +(declare read-header read-alignments) + +(deftype CRAMReader [url channel buffer header refs seq-reader] + Closeable + (close [_] + (when seq-reader + (.close ^Closeable seq-reader)) + (.close ^FileChannel channel)) + protocols/IReader + (reader-url [_] url) + (read [this] + (protocols/read-alignments this)) + #_(read [_ option]) + (indexed? [_] false) + protocols/IAlignmentReader + (read-header [_] @header) + (read-refs [_] @refs) + (read-alignments [this] + (read-alignments this)) + #_(read-alignments [_ region]) + #_(read-blocks [_]) + #_(read-blocks [_ region]) + #_(read-blocks [_ region option]) + #_protocols/IRegionReader + #_(read-in-region [_ region]) + #_(read-in-region [_ region option])) + +(defn- read-to-buffer + ([rdr] (read-to-buffer rdr nil)) + ([^CRAMReader rdr limit] + (let [^FileChannel ch (.-channel rdr) + ^Buffer bb (.-buffer rdr)] + (.clear bb) + (.limit bb (or limit (.capacity bb))) + (while (and (.hasRemaining bb) + (< (.position ch) (.size ch))) + (.read ch ^ByteBuffer bb)) + (.flip bb)))) + +(defn read-file-definition [^CRAMReader rdr] + (read-to-buffer rdr 26) + (struct/decode-file-definition (.-buffer rdr))) + (defn- update-next-mate [{:keys [^long flag] :as record} {^long mate-flag :flag :as mate}] (assoc record @@ -47,14 +91,15 @@ (aset records i record') (aset records j mate')))))) -(defn- read-slice-records [bb seq-reader cram-header compression-header] +(defn- read-slice-records [^CRAMReader rdr bb compression-header] (let [slice-header (struct/decode-slice-header-block bb) blocks (into [] (map (fn [_] (struct/decode-block bb))) (range (:blocks slice-header))) + ds-decoders (ds/build-data-series-decoders compression-header blocks) tag-decoders (ds/build-tag-decoders compression-header blocks) - record-decoder (record/build-cram-record-decoder seq-reader - cram-header + record-decoder (record/build-cram-record-decoder (.-seq-reader rdr) + @(.-header rdr) compression-header slice-header ds-decoders @@ -67,20 +112,20 @@ (map #(dissoc % ::record/flag ::record/len ::record/next-fragment) records))) -(defn- read-container-records [^Buffer bb seq-reader cram-header container-header] +(defn- read-container-records [^CRAMReader rdr ^ByteBuffer bb container-header] (let [container-header-end (.position bb) compression-header (struct/decode-compression-header-block bb)] (->> (:landmarks container-header) (mapcat (fn [^long landmark] - (.position bb (+ container-header-end landmark)) - (read-slice-records bb seq-reader cram-header compression-header)))))) + (.position ^Buffer bb (+ container-header-end landmark)) + (read-slice-records rdr bb compression-header)))))) -(defn- read-container-with [^FileChannel ch ^ByteBuffer bb f] - (let [pos (.position ch) - _ (do (.clear ^Buffer bb) - (.read ch bb) - (.flip ^Buffer bb)) +(defn- read-container-with [^CRAMReader rdr f] + (let [^FileChannel ch (.-channel rdr) + pos (.position ch) + _ (read-to-buffer rdr) + ^ByteBuffer bb (.-buffer rdr) container-header (struct/decode-container-header bb) header-size (.position bb) container-size (long (:length container-header)) @@ -91,13 +136,15 @@ (.position ch (+ pos header-size container-size)) ret)) -(defn read-alignments [^FileChannel ch seq-reader] - (let [bb (bb/allocate-lsb-byte-buffer 256) - cram-header (read-container-with ch bb #(struct/decode-cram-header-block %2))] +(defn read-header [^CRAMReader rdr] + (read-container-with rdr #(struct/decode-cram-header-block %2))) + +(defn read-alignments [^CRAMReader rdr] + (let [^FileChannel ch (.-channel rdr)] (letfn [(read1 [container-header bb] - (read-container-records bb seq-reader cram-header container-header)) + (read-container-records rdr bb container-header)) (step [] (when (< (.position ch) (.size ch)) - (concat (read-container-with ch bb read1) + (concat (read-container-with rdr read1) (lazy-seq (step)))))] (step))))