Ode to a Streaming ByteString

Or: Lazy I/O without Shooting Yourself in the Foot


One of my favorite founding principlesThe cynics among us might refer to it as a myth rather than a principle, given that Unix is no stranger to massive, kitchen-sink programs. Indeed, I’m writing this post in such a program.

of Unix is the universality of streams of bytes. Unix shells give us a monadic pipeline operator |, that, given some computation a | b, executes a so that its output forms the input to b. This principle affords the standard Unix commands succinct composition, allowing the executing shell, rather than the programs themselves, to control the flow of data through a composed sequence of shell commands. Indeed, the principle of byte stream manipulation serves as the foundation for entire entire programming languages like sed and awk. Tools like xargs and tee and parallel provide higher-order combinators to construct pipelines from further pipelines in ways beyond the capabilities of the pipeline operator itself.

Let’s consider some properties of programs that work well in pipelined computations such as these, using grep as the archetypal such program. grep is, of course, a tool of near-unparalleled importance, being enshrined in the POSIX standard, and it’s been optimized probably as thoroughly as any other program. Our programs are probably not grep, but they may need to emulate how grep manages its input streams:

  1. It hides the details of resource manipulation. The operating system, its system libraries, the shell in current use, and the implementation of its | operator hide the details of how input is batched and fed between files. Given that these details are abstracted from us, we should assume that input is buffered and chunked as necessary, and that these buffers and chunks are handled with as little overhead as possible. In particular, byte buffers read from an input stream should only be copied and retained if the program explicitly requests it: old input should never hang around in memory, given that the size of that input is theoretically unbounded.
  2. It consumes its input lazily. If I tell grep to search a ten-gigabyte file for one and only one occurrence of a string, it should consume only as much input as is required to fulfill my request. If more bytes are read from a stream than needed, it should be indistinguishable, performance-wise, from reading as little input as is possible, and should produce no change in observable program behavior.
  3. It cleans up its resources. Should grep request resources from the OS, or spawn some other helper process, these resources and subprocesses should be reclaimed quickly and reliably, even in the face of upstream or downstream pipeline failures, or even just a user impatiently hammering Ctrl+C and sending dozens of SIGKILL signals.

C programs excel at fulfilling these criteria. Ha! Just kidding. C programs can excel at streaming data manipulation, because C programming entrusts you with its details, and you might be someone who excels at streaming data manipulation. Though the POSIX standard provides certain guaranteesThe POSIX standard specifies that certain resources—file descriptors, directory streams, stream converters from iconv(), and message catalogs from catopen—are relinquished upon program exit. Child processes spawned with the exec family of functions are usually not terminated (unless specifically requested by the parent process or an associated process group). Shared memory may or may not be relinquished, depending on whether other processes are referring to it. This digression is getting long enough as is; read the standard if you’re really curious.

about the manner in which resources are reclaimed upon program exit, you are still responsible for managing resources during program execution. There aren’t too many layers of abstraction between you, stdin, and stdout. Sure, if your underlying FILE* struct can do some caching for you, it probably does, but you can always read directly with read(2) or with some lower-level efficiency-focused framework like kqueue, epoll, libdispatch, or io_uring.

With higher-level, garbage-collected languages, things become a good bit trickier. It is the responsibility of a higher-level language to insulate us from the concerns of C: as with shell scripts, we should have, in the common case, no particular knowledge of how files are read, how memory is allocated, or how input is buffered. For most programs, especially the quick-and-dirty ones at which garbage-collected languages excel, such details won’t matter. But when you find yourself outside the realm of “most programs”, the situation can prove vexing: the nature of a high-level language to isolate you from decisions made about resource management becomes a problem when the correctness of your software depends on the details of said resource management. We find ourselves confronted with a particularly tricky dialectic: high-level languages aid in the development of programs by abstracting away details, but hinder said development when program correctness depends directly on properties of these abstracted details, such as when writing code that handles error conditions robustly or remains reliable in the presence of resource pressure.The difficulties encountered when addressing these concerns in portable shell scripts are just one of the many reasons we don’t write everything in shell.

Haskell’s declarative nature, non-strict evaluation, and correct-by-construction philosophy makes it, at least in theory, an attractive solution for writing code that works well in streaming pipelines. In practice, however, we run into three interrelated but distinct problems, ones that display a nice symmetry with the properties of well-managed input streams outlined above:

  1. the streaming problem: how do we write Haskell code that processes potentially-infinite data streams in a finite amount of memory?
  2. the lazy I/O problem: given Haskell’s non-strict semantics, how do we consume input from the outside world lazily and efficiently?
  3. the resource manipulation problem: given that we have no direct control over the lifetime of data, how do we ensure that our code manages and relinquishes expensive resources correctly?

I’m going to discuss how to address these problems, using the streaming ecosystem to address the first point, the streaming-bytestring library the second, and the resourcet library the third. Given Haskell’s more-than-one-way-to-do-it philosophy, there exist many alternative ecosystems—libraries like conduit, pipes-bytestring, streamly, and io-streams all provide tools to address these problems—but the streaming ecosystem is the one with which I am most familiar, and one with a generally friendly interface expressed with function composition. Both streaming and streaming-bytestring are cleverly designed and carefully thought through; I hope that this post serves as an exploration of their associated design decisions and resulting idioms. But to appreciate the depth of the problem, we have to start by discussing how Haskell represents byte buffers and streams of data, and how infelicities in their standard formulation provide streaming-bytestring its raison d’être.

A Byte Menagerie

Feel free to skip to the next section if you’re already familiar with Haskell string types and the perils of lazy I/O.

Haskell is notorious for its plethora of string types: String (a lazy list of four-byte Char values); strict and lazy Text values, and strict, lazy, and short ByteString valuesThe bytestring package also provides Data.ByteString.Char8 and Data.ByteString.Char8.Lazy, but these actually use the same type in-memory, and provide APIs dealing in Char and String values rather than Word8 and [Word8], the former of which can be more natural as long as you know that your input is Latin-1 compatible.

. For the purposes of this article, we’re not going to worry about String or Text, because both operate on values known to be valid Unicode data. Our streaming pipelines operate on bytes, not Unicode characters—if, say, we want to be able to operate on binary data in such a pipeline, attempting to represent that data as Unicode text is inherently wrong. As such, we’ll focus only on Haskell’s byte buffer type, ByteString. Strict ByteStrings are pretty easy to understand:

module Data.ByteString where

data ByteString -- strict
  = BS
      {-# UNPACK #-} !(ForeignPtr Word8) -- payload
      {-# UNPACK #-} !Int -- length

ByteString represents a byte buffer and its associated length; in this it is similar to Go’s []byte or Rust’s &[u8] (though it has one fewer datum to track, as the Rust and Go types offer mutable access to the associated byte buffer and thus must keep track of its total capacity). ByteString values can contain NUL bytes, or indeed any other Word8 value. If a given ByteString represents human-readable text, it must be converted to Text values explicitly with Data.Text.Encoding (for UTF-8/16/32) or String values with the encoding library; conversion failures should be handled at those callsites, though the correct behavior is often to throw an exception. This representation also allows for quick serialization and deserialization to C char* values, by extracting that ForeignPtr Word8 and treating it as the memory address that it is. Similarly, we can choose to create that ForeignPtr Word8 by copying from another source (like another ByteString or from a socket), or without copying by using unsafe ByteString operations (which can break referential transparency, but are sometimes necessary in extremely low-level code).

A memory and length pair, supporting O(1) length and indexing and O(n) concatenation, is pretty straightforward. More interesting is this type, the lazy ByteString. For the sake of clarity of examples, we’ll use a different type name than does bytestring, which uses the ByteString name for both lazy and strict variants, disambiguating via the module name (Data.ByteString for strict, and Data.ByteString.Lazy for lazy).

module Data.ByteString.Lazy where

import Data.ByteString qualified as Strict

data ByteString
  = Empty
  | Chunk {-# UNPACK #-} !Strict.ByteString ByteString
                         -- ^ head chunk    -- ^ lazy tail

Rather than being a strictly evaluated pair of buffer and length, this is a lazy list, similar to what we would have were we to represent it with [ByteString]: Empty is like [] and Chunk is like the : operator. The only operative difference is that the first strict ByteString parameter is evaluated strictly, which slightly reduces its overhead in memory. The second parameter, a lazy ByteString representing the rest of the string, is not strict, since the whole point of this enterprise is to let the Haskell runtime manage the laziness associated with our lazy list of byte buffers. This sounds like a great deal: GHC’s evaluator is a finely-tuned instrument, and it’s unlikely that we can improve thereupon.

This lets us imagine how ByteStream.Lazy.readFile would work: given some chunk size N, the runtime would read N bytes from a source, placing them in the !ByteString parameter. The second parameter would then be a thunk that, if evaluated, would read another byte buffer of maximum length N. If that second parameter is never evaluated—in other words, if our calling code operates only on the first N bytes, the Lazy.ByteString parameter will never be evaluated, and the overhead of reading further bytes is never incurred. This, indeed, is more-or-less how that function is implemented.

Yet lazy ByteString values don’t work reliably when reading files lazily from disk. It violates the principle of deterministic resource cleanup: once a Chunk is read, reading beyond its embedded ByteString requires evaluating its second parameter, which entails further system calls to perform further needed disk I/O to build another Chunk (or an Empty if we’re at the end of the file). Should those system calls reference a socket or file descriptor that has since been closed, we’ll encounter a runtime crash. Even though openFile takes place in the IO monad, the resulting ByteString has IO hidden nefariously within, even when passed to contexts where it should be pure. I’ve seen this referred to as “pseudo-pure”: it may look like a pure computation, but it can incur I/O happening somewhere else, which, even if it works, is not the Haskell way. I/O is too important to be left implicit!

This has all been an exceptionally long-winded way to explain why no basic string type fulfills all our criteria in the presence of lazy I/O:

  • String and Text operations fail in the face of binary or non-Unicode data;
  • A strict ByteString is read all-at-once into memory, which violates both our need for laziness and constant resource consumption;
  • Lazy.ByteString violates the princple of deterministic resource cleanup, unless you use Haskell’s experimental support for linear types, which prevent lazy bytestrings from outliving their associated file handle (though linear types are a bleeding-edge feature that have yet to see widespread adoption).

Now, this may not always matter. Your program may deal only in small files, in which case you don’t need lazy I/O at all: you can just read file contents in as a strict ByteString and you’ll be okay. Computers have a lot of memory nowadays. But Haskell is a lazy language, and it excels at problems that can be phrased lazily. Should our business logic be a matter of lazy stream processing, we need some sort of abstraction that can lazily stream byte buffers from a source of data with constant memory consumption, minimal copying, and safe, deterministic resource cleanup. The trifecta of streaming, streaming-bytestring, and resourcet take care of this: let’s take a look and see how it works.

A Look at the API

The Important Types

The Stream type Stream (Of a) m r represents a stream capable of yielding zero or more a values, performing effects in m, and returning a final r result type. We can see these capabilities in the definition of Stream itself: such a computation can yield a computed Step, embed a monadic Effect, or simply Return a pure value.

module Streaming where

data Stream f m r
  = Step !(f (Stream f m r)) -- yield an item, and the rest of a stream, defined by the functor f
  | Effect (m (Stream f m r)) -- perform some monadic effect resulting in a further stream
  | Return r -- do nothing and return a final value

The most common functor used as the value of f is the Of functor, which is identical to the tuple type (,), but strict in its left argument, again to avoid the overhead of a lazy value when we know that the value in question has already been evaluated.

module Data.Functor.Of where
  data Of a b = !a :< b

The usual way to run a Stream is to call Streaming.toList, which returns an Of-pair containing a list of yielded a values and a final r. Note that Of is partially applied within the Stream, with its parameter a being the type of elements a stream yields.

toList :: Monad m => Stream (Of a) m r -> m (Of [a] r)

There’s nothing here specific to bytestrings or I/O, which means that Stream is suitable for building streaming abstractions in any monad, not just IO. In contrast, ByteStream is concerned with holding chunked ByteString values, not with Stream:

module Streaming.ByteString

data ByteStream m r
  = Empty r
  | Chunk {-# UNPACK #-} !ByteString (ByteStream m r)
  | Go (m (ByteStream m r))

You’ll notice that ByteStream is similar to Lazy.ByteString, with a few differences. Firstly, it has access to two type variables m and r: m represents a monadic context, with which chunked reads can perform side effects, using the Go constructor. This Go constructor is also new; it makes it explicit that reading further ByteString chunks can cause side effects, unlike Lazy.ByteString, which hid the fact that file I/O may occur when reading long strings from disk. By using MonadResource to clean up file handles, we can indicate, with the type system, that reading ByteString values out of a ByteString entails disk I/O, and thread that MonadResource constraint anywhere that file handles or ephemeral data must be cleaned up. Lastly, the Empty constructor takes an r argument, representing the final value, if any, of a given ByteStream computation.

The difference between ByteStream and the lazy ByteString might not seem seismic, but it represents something very profound about Haskell: often, we gain expressive power by treating computations as data. The fact that ByteStream has a Go constructor, which allows embedding arbitrary m-actions as long as they return a further ByteStream, gives ByteStream the ability to represent any kind of computation. In the case of ByteStream values yielded from the system, that m can be IO, or a monad transformer implementing MonadIO, or an effect stack implementing Lift IO. But if we were dealing with a ByteStream defined ahead of time, that m can be Identity.An interesting consequence of this is the Show instance for ByteStream, which requires the m parameter to be Identity and the r return type to be (), because the Show typeclass does not have access to the monadic context required to print out, say, a ByteStream that needs IO to perform its effects. (To print such a ByteStream, you’d pass it to the stdout eliminator, of type MonadIO m => ByteStream m r -> m r, which evaluates that stream for its effects and prints to the console any chunks encountered while doing so.)

Furthermore, ByteStream isn’t just a computation, it’s a monad transformer, parameterized in terms of some parent monad m. This means we can use it in unexpected ways, such as the base monad in our quick-and-dirty web server: in that case, having a ByteStream IO monad at the center of our effect stack gave us the capability to send streams of bytes down a network connection, using the sendM function. We farmed out the handling of Request values to the Reader effect, and a State effect took care of handling Response values, but for an HTTP handler to do anything useful, it must have access to a sink of bytes. Using Lift (ByteStream IO) allowed us access to such capabilities, without divulging the way in which these bytes are ultimately transmitted to the user (all we know is that it happens in IO, as indeed all network activity must).

Creating a ByteStream

Examining the ways to construct ByteStream values makes the relationship between ByteString, Stream, and ByteStream clearer:

-- An empty ByteString is generalizable to any m, but carries no value
-- in its return type.
empty :: ByteStream m ()

-- The current program's standard input is itself a stream of bytes,
-- using 'MonadIO' to actually read from the input's file descriptor.
stdin :: MonadIO m => ByteStream m ()

-- If we already have a strict ByteString, we can package it up with
-- 'Chunk' (and an 'Empty' tail).
fromStrict :: Strict.ByteString -> ByteStream m ()

-- Converting from a lazy ByteStream to a ByteString is a matter of
-- folding over the former, replacing the 'Empty' and 'Chunk'
-- constructors with those provided by ByteStream.
fromLazy :: Lazy.ByteString -> ByteStream m ()

-- In general, we can see ByteStream as a more efficient
-- representation of a Stream of unchunked Word8 values, or a stream
-- of chunked, strict ByteString values. Note that these preserve the
-- return value of the input Stream.
pack :: Stream (Of Word8) m r -> ByteStream m r
fromStream :: Stream (Of Strict.ByteString) m r -> ByteStream m r

More interesting is the fundamental file-reading method, readFile. It is interesting in that it is the first time we see MonadResource, a typeclass provided by the resourcet package.

readFile :: MonadResource m => FilePath -> ByteStream m ()

The MonadResource typeclass represents monads that are capable of managing the lifetime of critical resources, even in the presence of Haskell’s support for asynchronous exceptions. In a multithreaded Haskell program, any thread can send an exception to any other thread at any time, thanks to the fearsome Control.Exception.throwTo. MonadResource, and its corresponding ResourceT monad transformer, allow us a degree of durability in the face of this fact. Though readFile entails accessing a file on disk, we can be confident that even the most unluckily-timed of exceptions will not result in the associated file handle being orphaned or left unclosed. Additionally, the fact that this MonadResource m constraint is propagated, via the m parameter, to the resulting ByteStream makes it explicit that accessing the entirety of the associated ByteStream requires access to an associated resource, the lifetime of which is managed by the invoking MonadResource computation.

Running, Extracting, and Converting

Since ByteString is the lingua franca byte-buffer type, it follows that we need to be able to run a ByteStream and assemble its constituent chunks into a ByteString, whether lazy or strict. However, because ByteStream computations have a return value, we must take it into account as well. We do so by using the Of type, again taking advantage of Of a b’s slight efficiency compared to (a, b).

toLazy :: ByteStream m r -> m (Of Lazy.ByteString r)
toStrict :: ByteStrean m r -> m (Of Strict.ByteString r)

That toStrict function looks awfully tempting, and indeed it is, given how ubiquitous strict ByteString is in practice. We must be careful, however, not to lose constant-memory streaming here; if we create some infinite stream, and then immediately blow our memory by reading it aggressively into a buffer, then we haven’t accomplished anything. A safer approach is converting it back into a Stream:

unpack :: ByteStream m r -> Stream (Of Word8) m r
toChunks :: ByteStream m r -> Stream (Of Strict.ByteString) m r

Note that unpack and toChunks return their result in a pure context, not a monadic one, since converting between stream types doesn’t entail actually running the streams.

The interface boundaries between bytestrings and streams aren’t the only kind of eliminators that ByteStream supports. Indeed, familiar functions like head, last, and length are eliminators, consuming their input and running it for as long is as needed to return a result:

head :: Monad m => ByteStream m r -> m (Of (Maybe Word8) r)
last :: Monad m => ByteStream m r -> m (Of (Maybe Word8) r)
length :: Monad m => ByteStream m r -> m (Of Int r)

Most of these eliminators have variant forms, suffixed with an underscore, that discard the ByteStream’s return value, returning only the left-hand side of the Of-pair; given how often one deals with ByteStream m (), these functions can save many a pattern-match. Note that the only difference between these functions and their equivalents over strict or lazy bytestrings is the m effects parameter: because getting the constitient chunks of a ByteString m () may involve monadic effects in m, these functions must have a monadic return type.

head_ :: Monad m => ByteStream m r -> m (Maybe Word8)
last_ :: Monad m => ByteStream m r -> m (Maybe Word8)
length_ :: Monad m => ByteStream m r -> m Int


As with the above eliminators, there also exist familiar combinators for building new ByteStream values out of existing ones:

cons :: Monad m => Word8 -> ByteStream m r -> ByteStream m r
map :: Monad m => (Word8 -> Word8) -> ByteStream m r -> ByteStream m r
append :: Monad m => ByteStream m r -> ByteStream m s -> ByteStream m s
take :: Monad m => Int64 -> ByteStream m r -> ByteStream m r
drop :: Monad m => Int64 -> ByteStream m r -> ByteStream m r

Again, it helps to start thinking in computations rather than in data. take n takes in a ByteString computation and returns the result of limiting that computation’s underlying byte stream to n characters. map promotes a function on bytes (Word8 -> Word8) to a function operating over ByteStream computations. In promoting ByteString values to ByteStream computations, we’ve lost none of the vocabulary from ByteString with which we’re familiar, but that vocabulary becomes in turn more lyrical and powerful. Note that unlike the eliminators, these functions do not run in m: they merely describe modifiers to existing ByteStream computations, deferring the actual execution of the stream to an eliminator.

Practical (Sort Of) Examples

I enjoy the interplay between the types provided by streaming and streaming-bytestring, and in an effort to share why this is so, I’ve expressed the classic Knuth-McIlroy word-frequency classifier: given some input file, we are to process it word-by-word, keeping track of how many times a given word is encountered, and print the twenty-five most commonly-occurring words. This isn’t a perfect problem to express with streaming combinators, since to classify the occurrences of words we must retain them in memory (and thereby break streaming), but it’s a classic problem and the resulting code is quite handsome. Let’s build it, like so. First, we’ll need some imports.

{-# LANGUAGE ImportQualifiedPost #-}

module Main where

import Control.Category ((>>>))
import Control.Monad.Trans.Resource (MonadResource)
import Data.ByteString qualified as Strict
import Data.ByteString.Lazy qualified as Lazy
import Data.HashMap.Strict qualified as Map
import Data.List qualified as List
import Data.Monoid
import Data.Word8 qualified as Word8 -- from 'word8' package
import Streaming
import Streaming.Prelude qualified as Streaming

Given a FilePath as input, we need to return an association list, mapping words to their frequency.

classify :: MonadResource m => FilePath -> m [(Strict.ByteString, Int)]

In classical Haskell fashion, we’ll express this with function composition. As is my preference when explicating compositions, I’m going to use the >>> operator, provided by Control.Category, to express said composition in a left-to-right fashion. Furthermore, I’m going to place, in the comments, the type of the computation at the given point. If omitted, it is the same as the previous stage. (We could do this in code with id and the TypeApplications extension, which has the advantage of checking our assumptions at each stage, but it gets a little noisy.)

We start off by calling ByteStream.readFile, yielding a stream with resource-based access, but no associated return type. We then normalize the input by converting everything to lowercase and ignoring.

classify                          -- FilePath
  = ByteStream.readFile           -- ByteStream m ()
  >>> ByteStream.map Word8.toLower

At this point, we want to split this ByteStream up into individual words, so that we can eventually shove these words piece-by-piece into the actual classification stage. ByteStream.splitWith takes care of this for us, yielding a stream where the functorial type is itself ByteStream m (rather than the Of functor that we usually use). That m appears twice in the resulting type, since it’s propagated both to the Stream and the ByteStream yielded from each per-element chunk of the stream.

>>> ByteStream.splitWith Word8.isSpace -- Stream (ByteStream m) m ()

Subsequently, we want to ensure that there aren’t any stray empty strings. The denull function operates on Streams of ByteStreams, removing any empty chunks that appear.

>>> ByteStream.denull

We now have a Stream containing ByteStream values, each of which we know to be all lowercase characters. We now need to build something capable of representing a histogram of their frequency of occurrence. At first glance, such a data type could be Map (ByteStream m ()) Int, but we run into a problem: ByteStream quantities have no Ord instance, being computations rather than bare values. Nor do we have a Hashable instance, so we cannot evade this with HashMap. What we must do instead is take a step that converts our functorial shape from ByteStream m to Of Strict.ByteString, element-by-element. Streaming.mapped is a combinator capable of changing the functorial shape in such a way, and to it we pass a familiar function: toStrict, the function that turns a ByteStream into a Strict.ByteString. (Though calling this function on a massive ByteStream could break streaming, we know the individual words are going to be fairly short, so we needn’t worry.) Subsequently, we can filter the stream to remove punctuation and other such non-alphanumeric characters.

>>> Streaming.mapped ByteStream.toStrict -- Stream (Of Strict.ByteString) m ()
>>> Streaming.filter (ByteString.all Word8.isAlpha)

Now we must build a HashMap-based histogram. Though we could pull in the foldl-statistics package rather than do this ourselves, it’s not painful to express with the streaming library’s fold_ function, which takes a step function, a starting value, and an extraction function. Our starting value will be the empty HashMap, our extraction function will be its toList destructor yielding (ByteString, Int) pairs, and our step function will either insert 1 or add 1 to the value already contained therein.

>>> let step m str = Map.insertWith (+) str 1 m
     in Streaming.fold_ step Map.empty Map.toList -- m [(ByteString, Int)]

And now we can use the list primitivesA fun exercise for the interested reader is to use a less naïve method to extract the most common occurrences, avoiding the overhead of Map.toList.

sortOn and take to extract the twenty-five most common occurrences. (The snd >>> Down invocation ensures that we compare each (ByteString, Int) pair based on its second field, in descending order.)

>>> fmap (List.sortOn (snd >>> Down) >>> List.take 25)

Put all together, we have a pipeline that is aesthetically appealing, and around half as fast as do the optimized-over-decades tools that make up McIlroy’s pipeline (which is what we expect, given that Haskell programs run around half as fast as does handwritten C).

classify :: MonadResource m => FilePath -> m [(ByteString, Int)]
classify =
     >>> ByteStream.map Word8.toLower
     >>> ByteStream.splitWith Word8.isSpace
     >>> ByteStream.denull
     >>> Streaming.mapped ByteStream.toStrict
     >>> Streaming.filter (ByteString.all Word8.isAlpha)
     >>> let step m str = Map.insertWith (+) str 1 m
          in Streaming.fold_ step Map.empty Map.toList
     >>> fmap (List.sortOn (snd >>> Ord.Down) >>> List.take 25)

Explicit Copying

Here’s another example, extracted from some code I once worked on to interact with Git repositories. This code, given some Blob type carrying with it a pointer to some Git data on disk, summarizes its contents, retaining both the first line of the blob for display as well as the number of bytes found in the blob in total.

newtype Result = Result (Of (Maybe Lazy.ByteString) Int)

summarize ::
  (MonadMask m, MonadUnliftIO m, Git.MonadGit r m, Git.Libgit2.HasLgRepo m) =>
  Git.Blob r m ->
  m Result

The first few lines are uninteresting, as they are concerned solely with coercing the Lazy.ByteString contents of this Git.Blob into a ByteStream. The only new function here is mwrap, an alias for the Go constructor, which promotes an effect returning a ByteStream into a ByteStream itself.

summarize =
  Git.blobToLazyByteString     -- m Lazy.ByteString
  >>> fmap ByteStream.fromLazy -- m (ByteStream m ())
  >>> ByteStream.mwrap         -- ByteStream m ()

Now we must think a bit harder. The ByteStream contents of this blob have two purposes: we must count the number of lines therein, and we must retain their first line for display purposes. As such, we have two eliminating functions we want to use ByteStream.head_ and ByteStream.length. Because ByteStream hews closely to a consumer-producer model, it won’t suffice simply to invoke these functions: we must copy the ByteStream input.

The copy operation on strict ByteString values has a pretty straightforward type signature:

copy :: ByteString -> ByteString

Not rocket science: it takes in a byte buffer, copies it with memcpy or some equivalent formulation, and returns the copy. In contrast, the type signature of streaming-bytestring’s copy operation is subtzle:

copy :: ByteStream m a -> ByteStream (ByteStream m) a

Copying a ByteStream is not a process of memcpy. Indeed, we specifically don’t want to copy the byte buffers within the stream, because that can be expensive, and we’re not operating destructively upon them, since Haskell is a pure language. Instead, our copy will copy the constructors, rather than the contents, of ByteStream, wrapping them around the m parameter in ByteStream m and ultimately yielding a nested ByteStream (ByteStream m). This nested datum represents our responsibility to handle those effects, whatever we do to them. In handling those effects, we will eliminate that nested ByteStream layer, returning it to an m, and ultimately acting on a stream of data in two ways without breaking streaming or incurring otiose buffer copies.

>>> ByteStream.copy -- ByteStream (ByteStream m) ()

We now have two eliminators to call. Let’s start by specializing the type of length_ so that it behaves as though we had passed in a ByteStream (ByteStream m), using the TypeApplications extension to GHC:

length_ @(ByteStream m)
  :: Monad m => ByteStream (ByteStream m) r -> ByteStream m Int

Computing the length of a nested ByteStream returning an r value returns another ByteStream, but paired up with more information. The resulting ByteStream now keeps track of the length of all the data that runs through it; when it finishes and returns an r value, that value is now associated with an Int representing that length. The fact that computing the length of that nested ByteStream entailed its elimination is made apparent by the fact that the resulting ByteStream is flat, its monadic parameter just m.

>>> ByteStream.length_ -- ByteStream m Int

Now that we’re set to compute the length of the input stream as a whole, we need to extract its first line. As with our previous examples, we’ll call lines to turn the input ByteStream into a Stream, then Streaming.mapped to yield a stream of Lazy.ByteString values (since the whole file contents could, concievably, be on one line, we must exercise caution). We’ll then call head to get the first line of the blob and its total length.

>>> ByteStream.lines -- Stream (ByteStream m) m Int
>>> Streaming.mapped ByteStream.toLazy -- Stream (Of Lazy.ByteString) m Int
>>> Streaming.head -- m (Of (Maybe Lazy.ByteString), Int)
>>> fmap Result -- m Result

All together, this pipeline is remarkably concise, and it retains glorious type safety. We can be confident that, should we be provided some massive Git blob, our program will not chew through more memory than it should.

summarize ::
  (MonadMask m, MonadUnliftIO m, Git.MonadGit r m, Git.Libgit2.HasLgRepo m) =>
  Git.Blob r m ->
  m Result
summarize =
  Git.blobToLazyByteString -- m Lazy.ByteString
  >>> fmap ByteStream.fromLazy -- m (ByteStream m ())
  >>> ByteStream.mwrap -- ByteStream m ()
  >>> ByteStream.copy -- ByteStream (ByteStream m) ()
  >>> ByteStream.length_ -- ByteStream m Int
  >>> ByteStream.lines -- Stream (ByteStream m) m Int
  >>> Streaming.mapped ByteStream.toLazy -- Stream (Of Lazy.ByteString) m Int
  >>> Streaming.head -- m (Of (Maybe Lazy.ByteString), Int)
  >>> fmap Result -- m Result

Let’s Get Categorical

You may notice that these examples don’t deal exclusively with the tools provided by streaming-bytestring: idiomatic use of streaming-bytestring is inextricably connected with streaming itself. I pointed out earlier that ByteStream is like a version of Lazy.ByteString extended to encapsulate effectful computations, and that doing so gives us ByteStream instances for Functor, Monad, and MonadTrans. But the similarities don’t end there: because streaming depends on the mmorph package, these types also implement the MFunctor and MMonad classes, which represent functors and monads in the category of monads. (If your attention just wandered, I know, I know, but stay with me. I promise that this is pretty cool.)

Let’s start by taking a look at the copy operations for Stream and ByteStream.

Streaming.copy :: Stream (Of a) m r -> Stream (Of a) (Stream (Of a) m) r
ByteStream.copy :: ByteStream m r -> ByteStream (ByteStream m) r

These functions both take in an action based in some monad m and return a new action in which that m is wrapped in a further type, Stream (Of a) m for the former and ByteStream m for the latter. For both of these, we can think of copying as a nesting operation. And as it happens, MFunctor and MMonad provide us vocabulary to handle monads within monads. We’ll cover MFunctor here, as it’s pretty simple.

class MFunctor t where
  hoist :: Monad m => (forall a . old a -> new a) -> t old b -> t new b

hoist promotes a natural transformation from old to new to a function operating on MFunctor values, one that changes the type of the nested monad parameter m. We can see this in action if we speak in terms of Stream and ByteStream:

hoist @(Stream (Of a)) ::
  Monad m =>
  (forall a . old a -> new a) ->
  Stream (Of a) old r ->
  Stream (Of a) new r

hoist @ByteStream ::
  Monad m =>
  (forall a . old a -> new a) ->
  ByteStream old r ->
  ByteStream new r

Thanks to hoist, we have a consistent vocabulary for speaking about nested Stream and ByteStream values, no matter how deeply nested they may be. It also gives us certain invariants for how we can expect our library to behave: for example, hoisting the effects eliminator into a nested ByteStream peels away the nesting, discarding all chunks and merely running the effects. From this we can deduce that copy >>> hoist effects is equivalent to id, as is just plain copy >>> effects itself. Whether we want to flatten a nested monad externally or internally with hoist, we have options: use whichever is more aesthetically pleasing at the time.

end of file

If you kept with me during all of this, I appreciate you. This post got away from me, length-wise. But now it is over.

I thank Rob Rix and Joe Kachmar for their comments and suggestions on drafts of this article.