Skip to content

Commit

Permalink
Rewrite TwoBitWriter with FileChannel + ByteBuffer
Browse files Browse the repository at this point in the history
  • Loading branch information
athos committed Nov 15, 2023
1 parent bab0ae8 commit d736e51
Showing 1 changed file with 109 additions and 74 deletions.
183 changes: 109 additions & 74 deletions src/cljam/io/twobit/writer.clj
Original file line number Diff line number Diff line change
@@ -1,40 +1,59 @@
(ns cljam.io.twobit.writer
(:require [clojure.java.io :as cio]
[cljam.io.protocols :as protocols]
[cljam.io.util.lsb :as lsb]
[cljam.util :as util])
(:import [java.io Closeable OutputStream DataOutputStream BufferedOutputStream FileOutputStream]
[java.nio ByteBuffer]))
(:require [cljam.io.protocols :as protocols]
[cljam.io.util.byte-buffer :as bb]
[cljam.util :as util]
[clojure.java.io :as cio])
(:import [java.io Closeable]
[java.nio Buffer ByteBuffer]
[java.nio.channels FileChannel]
[java.nio.file OpenOption StandardOpenOption]))

(declare write-sequences)
(declare flush-buffer! write-sequences)

(deftype TwoBitWriter [url writer file-output-stream index]
(deftype TwoBitWriter [url channel buffer index]
Closeable
(close [this]
(.close ^Closeable (.writer this)))
(flush-buffer! this)
(.close ^Closeable (.-channel this)))
protocols/IWriter
(writer-url [this]
(.url this))
(.-url this))
protocols/ISequenceWriter
(write-sequences [this seqs]
(write-sequences this seqs)))

(defn writer
"Returns a 2bit writer of f."
[f {:keys [index]}]
(let [abs-f (.getAbsolutePath (cio/file f))
fos (FileOutputStream. abs-f)
bos (BufferedOutputStream. fos)
dos (DataOutputStream. bos)]
(TwoBitWriter. (util/as-url abs-f) dos fos index)))
(let [file (cio/file f)
ch (FileChannel/open (.toPath file)
(into-array OpenOption [StandardOpenOption/WRITE
StandardOpenOption/CREATE
StandardOpenOption/TRUNCATE_EXISTING]))
bb (bb/allocate-lsb-byte-buffer 8192)]
(TwoBitWriter. (util/as-url (.getAbsolutePath file)) ch bb index)))

(defn- flush-buffer! [^TwoBitWriter w]
(let [^FileChannel ch (.-channel w)
^Buffer bb (.-buffer w)]
(.flip bb)
(while (.hasRemaining bb)
(.write ch ^ByteBuffer bb))
(.clear bb)))

(defn- ensure-buffer-room! [^TwoBitWriter w ^long n]
(when (< (.remaining ^ByteBuffer (.-buffer w)) n)
(flush-buffer! w)))

(defn- write-file-header!
"Writes a 2bit file header. Supports little-endian only."
[w nseq]
(lsb/write-int w 0x1A412743)
(lsb/write-int w 0)
(lsb/write-int w nseq)
(lsb/write-int w 0))
[^TwoBitWriter w nseq]
(ensure-buffer-room! w 16)
(doto ^ByteBuffer (.-buffer w)
(.putInt 0x1A412743)
(.putInt 0)
(.putInt nseq)
(.putInt 0)))

(defn- mask-regions
"Returns a sequence of [start length] of masked regions."
Expand Down Expand Up @@ -77,20 +96,24 @@
(recur r nil nil (inc i))))))))

(defn- write-index!
[w idx]
(loop [offset (+ (* 4 4)
(long (reduce + (map #(+ 1 (count (:name %)) 4) idx))))
idx idx]
(when-let [{:keys [len] name' :name} (first idx)]
(lsb/write-ubyte w (count name'))
(lsb/write-string w name')
(lsb/write-int w offset)
(recur (+ offset
(if-let [{:keys [ambs masks]} (first idx)]
(+ 4 4 (* 2 4 (count ambs)) 4 (* 2 4 (count masks)) 4)
0) ; dummy
(quot (dec (+ (long len) 4)) 4))
(next idx)))))
[^TwoBitWriter w idx]
(let [^ByteBuffer bb (.-buffer w)]
(loop [offset (+ (* 4 4)
(long (reduce + (map #(+ 1 (count (:name %)) 4) idx))))
idx idx]
(when-let [{:keys [len ambs masks] name' :name} (first idx)]
(let [name-size (count name')]
(ensure-buffer-room! w (+ name-size 5))
(doto bb
(bb/write-ubyte (count name'))
(bb/write-string name')
(.putInt offset))
(recur (+ offset
(if (and ambs masks)
(+ 4 4 (* 2 4 (count ambs)) 4 (* 2 4 (count masks)) 4)
0) ; dummy
(quot (dec (+ (long len) 4)) 4))
(next idx)))))))

(def ^:private
char->twobit
Expand All @@ -104,49 +127,62 @@

(defn write-twobit!
"Encodes a sequence into twobit format."
[^OutputStream o ^String s]
[^TwoBitWriter w ^String s]
(let [len (.length s)
bb (ByteBuffer/wrap (.getBytes s))
table ^bytes char->twobit]
(dotimes [_ (quot len 4)]
(->> (bit-or
(bit-shift-left (aget table (.get bb)) 6)
(bit-shift-left (aget table (.get bb)) 4)
(bit-shift-left (aget table (.get bb)) 2)
(aget table (.get bb)))
unchecked-int
(.write o)))
(when (pos? (rem len 4))
(loop [b 0 i (rem len 4) j 1]
(if (pos? i)
(recur
(bit-or b (bit-shift-left (aget table (.get bb)) (* 2 (- 4 j))))
(dec i)
(inc j))
(.write o (unchecked-int b)))))))
in (ByteBuffer/wrap (.getBytes s))
out ^ByteBuffer (.-buffer w)
table ^bytes char->twobit
encode-four-bases #(->> (bit-or
(bit-shift-left (aget table (.get in)) 6)
(bit-shift-left (aget table (.get in)) 4)
(bit-shift-left (aget table (.get in)) 2)
(aget table (.get in)))
unchecked-byte
(.put out))]
(dotimes [_ (quot len 1024)]
(ensure-buffer-room! w 256)
(dotimes [_ 256]
(encode-four-bases)))
(let [remaining (rem len 1024)]
(when (pos? remaining)
(ensure-buffer-room! w (quot (+ remaining 3) 4))
(dotimes [_ (quot remaining 4)]
(encode-four-bases))
(when (pos? (rem remaining 4))
(loop [b 0 i (rem remaining 4) j 1]
(if (pos? i)
(recur
(bit-or b (bit-shift-left (aget table (.get in)) (* 2 (- 4 j))))
(dec i)
(inc j))
(.put out (unchecked-byte b)))))))))

(defn- write-sequence!
"Writes a single sequence entry to writer."
[w sequence' idx]
[^TwoBitWriter w sequence' idx]
(let [name' (or (:name sequence') (:rname sequence'))
seq-data (or (:seq sequence') (:sequence sequence'))
{:keys [len ambs masks]} (first (filter #(= (:name %) name') idx))]
(lsb/write-int w len)
(lsb/write-int w (count ambs))
{:keys [len ambs masks]} (first (filter #(= (:name %) name') idx))
^ByteBuffer bb (.-buffer w)
write-int (fn [n]
(ensure-buffer-room! w 4)
(.putInt bb n))]
(write-int len)
(write-int (count ambs))
(doseq [[s _] ambs]
(lsb/write-int w s))
(write-int s))
(doseq [[_ l] ambs]
(lsb/write-int w l))
(lsb/write-int w (count masks))
(write-int l))
(write-int (count masks))
(doseq [[s _] masks]
(lsb/write-int w s))
(write-int s))
(doseq [[_ l] masks]
(lsb/write-int w l))
(lsb/write-int w 0)
(write-int l))
(write-int 0)
(write-twobit! w seq-data)))

(defn- write-sequences-without-index
[^TwoBitWriter wtr xs]
[wtr xs]
(let [idx (map (fn [{:keys [rname]
seq' :seq
sequence' :sequence
Expand All @@ -157,29 +193,28 @@
:masks (mask-regions seq-data)
:ambs (amb-regions seq-data)}))
xs)]
(write-file-header! (.writer wtr) (count xs))
(write-index! (.writer wtr) idx)
(write-file-header! wtr (count xs))
(write-index! wtr idx)
(doseq [sequence' xs]
(write-sequence! (.writer wtr) sequence' idx))))
(write-sequence! wtr sequence' idx))))

(defn- write-sequences-with-index
[^TwoBitWriter wtr idx xs]
(let [idx-atom (atom idx)]
(write-file-header! (.writer wtr) (count @idx-atom))
(write-index! (.writer wtr) @idx-atom)
(write-file-header! wtr (count @idx-atom))
(write-index! wtr @idx-atom)
(doseq [sequence' xs]
(let [name' (or (:name sequence') (:rname sequence'))
seq-data (or (:seq sequence') (:sequence sequence'))
masks (mask-regions seq-data)
ambs (amb-regions seq-data)
i (first (keep-indexed #(when (= (:name %2) name') %1) @idx-atom))]
(swap! idx-atom update i assoc :masks masks :ambs ambs))
(write-sequence! (.writer wtr) sequence' @idx-atom))
(write-sequence! wtr sequence' @idx-atom))
;; finalize
(.flush ^DataOutputStream (.writer wtr))
(let [ch (.getChannel ^FileOutputStream (.file-output-stream wtr))]
(.position ch 16)
(write-index! ch @idx-atom))))
(flush-buffer! wtr)
(.position ^FileChannel (.-channel wtr) 16)
(write-index! wtr @idx-atom)))

(defn write-sequences
"Writes all sequences to wtr. Input sequences must be a sequence of maps."
Expand Down

0 comments on commit d736e51

Please sign in to comment.