Using reducers on the bytes in a memory mapped file

(written by lawrence krubner, however indented passages are often quotes). You can contact lawrence at: lawrence@krubner.com

This could be a powerful technique, especially when speed and parallelization are important:

First, we’ll have a couple of parameters: the character set for the input and a hint for the size of chunk to break the file into.

(def ^:dynamic *charset* (Charset/forName "UTF-8"))
(def ^:dynamic *chunk-size* (* 10 1024 1024))
With those, we’ll break the file into chunks by skipping through it and reading ahead until we get to the end of a line. Later, when we actually read the file, this will make sure that lines aren’t broken across chunks.

(defn get-chunk-offsets
  [f pos offsets chunk-size]
  (let [skip-to (+ pos chunk-size)]
    (if (>= skip-to (.length f))
      (conj offsets (.length f))
      (do
        (.seek f skip-to)
        (while (not= (.read f) (int \newline)))
        (let [new-pos (.getFilePointer f)]
          (recur f new-pos (conj offsets new-pos)
                 chunk-size))))))
(defn get-chunks
  ([file-name] (get-chunks file-name *chunk-size*))
  ([file-name chunk-size]
   (with-open [f (RandomAccessFile. file-name "r")]
     (doall
       (partition 2 1 (get-chunk-offsets
                        f 0 [0] chunk-size))))))
And with those, we can memory map each chunk and read the lines out of it as a sequence.

(defn read-chunk
  [channel [from to]]
  (let [chunk-mmap
        (.map
          channel
          java.nio.channels.FileChannel$MapMode/READ_ONLY
          from
          (- to from))
        decoder (.newDecoder *charset*)]
    (doall
      (split-lines
        (str (.decode decoder chunk-mmap))))))
These let us bring everything together similar to how we did with serial-process.

(defn mmap-process
  [file-name]
  (let [chan (nio/channel file-name)]
    (->>
      file-name
      get-chunks
      (r/mapcat (partial read-chunk chan))
      (r/map parse-line)
      (r/filter data-line?)
      (r/fold combiner process-user-map))))
Source