From d736e51f7441a4c8d9fd20cff2b76c1153b1fdc6 Mon Sep 17 00:00:00 2001 From: Shogo Ohta Date: Mon, 13 Nov 2023 07:05:18 +0900 Subject: [PATCH] Rewrite TwoBitWriter with FileChannel + ByteBuffer --- src/cljam/io/twobit/writer.clj | 183 ++++++++++++++++++++------------- 1 file changed, 109 insertions(+), 74 deletions(-) diff --git a/src/cljam/io/twobit/writer.clj b/src/cljam/io/twobit/writer.clj index 122899cc..7213ceab 100644 --- a/src/cljam/io/twobit/writer.clj +++ b/src/cljam/io/twobit/writer.clj @@ -1,20 +1,23 @@ (ns cljam.io.twobit.writer - (:require [clojure.java.io :as cio] - [cljam.io.protocols :as protocols] - [cljam.io.util.lsb :as lsb] - [cljam.util :as util]) - (:import [java.io Closeable OutputStream DataOutputStream BufferedOutputStream FileOutputStream] - [java.nio ByteBuffer])) + (:require [cljam.io.protocols :as protocols] + [cljam.io.util.byte-buffer :as bb] + [cljam.util :as util] + [clojure.java.io :as cio]) + (:import [java.io Closeable] + [java.nio Buffer ByteBuffer] + [java.nio.channels FileChannel] + [java.nio.file OpenOption StandardOpenOption])) -(declare write-sequences) +(declare flush-buffer! write-sequences) -(deftype TwoBitWriter [url writer file-output-stream index] +(deftype TwoBitWriter [url channel buffer index] Closeable (close [this] - (.close ^Closeable (.writer this))) + (flush-buffer! this) + (.close ^Closeable (.-channel this))) protocols/IWriter (writer-url [this] - (.url this)) + (.-url this)) protocols/ISequenceWriter (write-sequences [this seqs] (write-sequences this seqs))) @@ -22,19 +25,35 @@ (defn writer "Returns a 2bit writer of f." [f {:keys [index]}] - (let [abs-f (.getAbsolutePath (cio/file f)) - fos (FileOutputStream. abs-f) - bos (BufferedOutputStream. fos) - dos (DataOutputStream. bos)] - (TwoBitWriter. (util/as-url abs-f) dos fos index))) + (let [file (cio/file f) + ch (FileChannel/open (.toPath file) + (into-array OpenOption [StandardOpenOption/WRITE + StandardOpenOption/CREATE + StandardOpenOption/TRUNCATE_EXISTING])) + bb (bb/allocate-lsb-byte-buffer 8192)] + (TwoBitWriter. (util/as-url (.getAbsolutePath file)) ch bb index))) + +(defn- flush-buffer! [^TwoBitWriter w] + (let [^FileChannel ch (.-channel w) + ^Buffer bb (.-buffer w)] + (.flip bb) + (while (.hasRemaining bb) + (.write ch ^ByteBuffer bb)) + (.clear bb))) + +(defn- ensure-buffer-room! [^TwoBitWriter w ^long n] + (when (< (.remaining ^ByteBuffer (.-buffer w)) n) + (flush-buffer! w))) (defn- write-file-header! "Writes a 2bit file header. Supports little-endian only." - [w nseq] - (lsb/write-int w 0x1A412743) - (lsb/write-int w 0) - (lsb/write-int w nseq) - (lsb/write-int w 0)) + [^TwoBitWriter w nseq] + (ensure-buffer-room! w 16) + (doto ^ByteBuffer (.-buffer w) + (.putInt 0x1A412743) + (.putInt 0) + (.putInt nseq) + (.putInt 0))) (defn- mask-regions "Returns a sequence of [start length] of masked regions." @@ -77,20 +96,24 @@ (recur r nil nil (inc i)))))))) (defn- write-index! - [w idx] - (loop [offset (+ (* 4 4) - (long (reduce + (map #(+ 1 (count (:name %)) 4) idx)))) - idx idx] - (when-let [{:keys [len] name' :name} (first idx)] - (lsb/write-ubyte w (count name')) - (lsb/write-string w name') - (lsb/write-int w offset) - (recur (+ offset - (if-let [{:keys [ambs masks]} (first idx)] - (+ 4 4 (* 2 4 (count ambs)) 4 (* 2 4 (count masks)) 4) - 0) ; dummy - (quot (dec (+ (long len) 4)) 4)) - (next idx))))) + [^TwoBitWriter w idx] + (let [^ByteBuffer bb (.-buffer w)] + (loop [offset (+ (* 4 4) + (long (reduce + (map #(+ 1 (count (:name %)) 4) idx)))) + idx idx] + (when-let [{:keys [len ambs masks] name' :name} (first idx)] + (let [name-size (count name')] + (ensure-buffer-room! w (+ name-size 5)) + (doto bb + (bb/write-ubyte (count name')) + (bb/write-string name') + (.putInt offset)) + (recur (+ offset + (if (and ambs masks) + (+ 4 4 (* 2 4 (count ambs)) 4 (* 2 4 (count masks)) 4) + 0) ; dummy + (quot (dec (+ (long len) 4)) 4)) + (next idx))))))) (def ^:private char->twobit @@ -104,49 +127,62 @@ (defn write-twobit! "Encodes a sequence into twobit format." - [^OutputStream o ^String s] + [^TwoBitWriter w ^String s] (let [len (.length s) - bb (ByteBuffer/wrap (.getBytes s)) - table ^bytes char->twobit] - (dotimes [_ (quot len 4)] - (->> (bit-or - (bit-shift-left (aget table (.get bb)) 6) - (bit-shift-left (aget table (.get bb)) 4) - (bit-shift-left (aget table (.get bb)) 2) - (aget table (.get bb))) - unchecked-int - (.write o))) - (when (pos? (rem len 4)) - (loop [b 0 i (rem len 4) j 1] - (if (pos? i) - (recur - (bit-or b (bit-shift-left (aget table (.get bb)) (* 2 (- 4 j)))) - (dec i) - (inc j)) - (.write o (unchecked-int b))))))) + in (ByteBuffer/wrap (.getBytes s)) + out ^ByteBuffer (.-buffer w) + table ^bytes char->twobit + encode-four-bases #(->> (bit-or + (bit-shift-left (aget table (.get in)) 6) + (bit-shift-left (aget table (.get in)) 4) + (bit-shift-left (aget table (.get in)) 2) + (aget table (.get in))) + unchecked-byte + (.put out))] + (dotimes [_ (quot len 1024)] + (ensure-buffer-room! w 256) + (dotimes [_ 256] + (encode-four-bases))) + (let [remaining (rem len 1024)] + (when (pos? remaining) + (ensure-buffer-room! w (quot (+ remaining 3) 4)) + (dotimes [_ (quot remaining 4)] + (encode-four-bases)) + (when (pos? (rem remaining 4)) + (loop [b 0 i (rem remaining 4) j 1] + (if (pos? i) + (recur + (bit-or b (bit-shift-left (aget table (.get in)) (* 2 (- 4 j)))) + (dec i) + (inc j)) + (.put out (unchecked-byte b))))))))) (defn- write-sequence! "Writes a single sequence entry to writer." - [w sequence' idx] + [^TwoBitWriter w sequence' idx] (let [name' (or (:name sequence') (:rname sequence')) seq-data (or (:seq sequence') (:sequence sequence')) - {:keys [len ambs masks]} (first (filter #(= (:name %) name') idx))] - (lsb/write-int w len) - (lsb/write-int w (count ambs)) + {:keys [len ambs masks]} (first (filter #(= (:name %) name') idx)) + ^ByteBuffer bb (.-buffer w) + write-int (fn [n] + (ensure-buffer-room! w 4) + (.putInt bb n))] + (write-int len) + (write-int (count ambs)) (doseq [[s _] ambs] - (lsb/write-int w s)) + (write-int s)) (doseq [[_ l] ambs] - (lsb/write-int w l)) - (lsb/write-int w (count masks)) + (write-int l)) + (write-int (count masks)) (doseq [[s _] masks] - (lsb/write-int w s)) + (write-int s)) (doseq [[_ l] masks] - (lsb/write-int w l)) - (lsb/write-int w 0) + (write-int l)) + (write-int 0) (write-twobit! w seq-data))) (defn- write-sequences-without-index - [^TwoBitWriter wtr xs] + [wtr xs] (let [idx (map (fn [{:keys [rname] seq' :seq sequence' :sequence @@ -157,16 +193,16 @@ :masks (mask-regions seq-data) :ambs (amb-regions seq-data)})) xs)] - (write-file-header! (.writer wtr) (count xs)) - (write-index! (.writer wtr) idx) + (write-file-header! wtr (count xs)) + (write-index! wtr idx) (doseq [sequence' xs] - (write-sequence! (.writer wtr) sequence' idx)))) + (write-sequence! wtr sequence' idx)))) (defn- write-sequences-with-index [^TwoBitWriter wtr idx xs] (let [idx-atom (atom idx)] - (write-file-header! (.writer wtr) (count @idx-atom)) - (write-index! (.writer wtr) @idx-atom) + (write-file-header! wtr (count @idx-atom)) + (write-index! wtr @idx-atom) (doseq [sequence' xs] (let [name' (or (:name sequence') (:rname sequence')) seq-data (or (:seq sequence') (:sequence sequence')) @@ -174,12 +210,11 @@ ambs (amb-regions seq-data) i (first (keep-indexed #(when (= (:name %2) name') %1) @idx-atom))] (swap! idx-atom update i assoc :masks masks :ambs ambs)) - (write-sequence! (.writer wtr) sequence' @idx-atom)) + (write-sequence! wtr sequence' @idx-atom)) ;; finalize - (.flush ^DataOutputStream (.writer wtr)) - (let [ch (.getChannel ^FileOutputStream (.file-output-stream wtr))] - (.position ch 16) - (write-index! ch @idx-atom)))) + (flush-buffer! wtr) + (.position ^FileChannel (.-channel wtr) 16) + (write-index! wtr @idx-atom))) (defn write-sequences "Writes all sequences to wtr. Input sequences must be a sequence of maps."