Ode to a Streaming ByteString
Or: Lazy I/O without Shooting Yourself in the Foot
2021-07-24
One of my favorite founding principlesThe cynics among us might refer to it as a myth rather than a principle, given that Unix is no stranger to massive, kitchen-sink programs. Indeed, I’m writing this post in such a program.
of Unix is the universality of streams of bytes. Unix shells give us a monadic pipeline operator |
, that, given some computation a | b
, executes a
so that its output forms the input to b
. This principle affords the standard Unix commands succinct composition, allowing the executing shell, rather than the programs themselves, to control the flow of data through a composed sequence of shell commands. Indeed, the principle of byte stream manipulation serves as the foundation for entire entire programming languages like sed
and awk
. Tools like xargs
and tee
and parallel
provide higher-order combinators to construct pipelines from further pipelines in ways beyond the capabilities of the pipeline operator itself.
Let’s consider some properties of programs that work well in pipelined computations such as these, using grep
as the archetypal such program. grep
is, of course, a tool of near-unparalleled importance, being enshrined in the POSIX standard, and it’s been optimized probably as thoroughly as any other program. Our programs are probably not grep
, but they may need to emulate how grep
manages its input streams:
- It hides the details of resource manipulation. The operating system, its system libraries, the shell in current use, and the implementation of its
|
operator hide the details of how input is batched and fed between files. Given that these details are abstracted from us, we should assume that input is buffered and chunked as necessary, and that these buffers and chunks are handled with as little overhead as possible. In particular, byte buffers read from an input stream should only be copied and retained if the program explicitly requests it: old input should never hang around in memory, given that the size of that input is theoretically unbounded. - It consumes its input lazily. If I tell
grep
to search a ten-gigabyte file for one and only one occurrence of a string, it should consume only as much input as is required to fulfill my request. If more bytes are read from a stream than needed, it should be indistinguishable, performance-wise, from reading as little input as is possible, and should produce no change in observable program behavior. - It cleans up its resources. Should
grep
request resources from the OS, or spawn some other helper process, these resources and subprocesses should be reclaimed quickly and reliably, even in the face of upstream or downstream pipeline failures, or even just a user impatiently hammeringCtrl+C
and sending dozens ofSIGKILL
signals.
C programs excel at fulfilling these criteria. Ha! Just kidding. C programs can excel at streaming data manipulation, because C programming entrusts you with its details, and you might be someone who excels at streaming data manipulation. Though the POSIX standard provides certain guaranteesThe POSIX standard specifies that certain resources—file descriptors, directory streams, stream converters from iconv()
, and message catalogs from catopen
—are relinquished upon program exit. Child processes spawned with the exec
family of functions are usually not terminated (unless specifically requested by the parent process or an associated process group). Shared memory may or may not be relinquished, depending on whether other processes are referring to it. This digression is getting long enough as is; read the standard if you’re really curious.
about the manner in which resources are reclaimed upon program exit, you are still responsible for managing resources during program execution. There aren’t too many layers of abstraction between you, stdin
, and stdout
. Sure, if your underlying FILE*
struct can do some caching for you, it probably does, but you can always read directly with read(2)
or with some lower-level efficiency-focused framework like kqueue
, epoll
, libdispatch
, or io_uring
.
With higher-level, garbage-collected languages, things become a good bit trickier. It is the responsibility of a higher-level language to insulate us from the concerns of C: as with shell scripts, we should have, in the common case, no particular knowledge of how files are read, how memory is allocated, or how input is buffered. For most programs, especially the quick-and-dirty ones at which garbage-collected languages excel, such details won’t matter. But when you find yourself outside the realm of “most programs”, the situation can prove vexing: the nature of a high-level language to isolate you from decisions made about resource management becomes a problem when the correctness of your software depends on the details of said resource management. We find ourselves confronted with a particularly tricky dialectic: high-level languages aid in the development of programs by abstracting away details, but hinder said development when program correctness depends directly on properties of these abstracted details, such as when writing code that handles error conditions robustly or remains reliable in the presence of resource pressure.The difficulties encountered when addressing these concerns in portable shell scripts are just one of the many reasons we don’t write everything in shell.
Haskell’s declarative nature, non-strict evaluation, and correct-by-construction philosophy makes it, at least in theory, an attractive solution for writing code that works well in streaming pipelines. In practice, however, we run into three interrelated but distinct problems, ones that display a nice symmetry with the properties of well-managed input streams outlined above:
- the streaming problem: how do we write Haskell code that processes potentially-infinite data streams in a finite amount of memory?
- the lazy I/O problem: given Haskell’s non-strict semantics, how do we consume input from the outside world lazily and efficiently?
- the resource manipulation problem: given that we have no direct control over the lifetime of data, how do we ensure that our code manages and relinquishes expensive resources correctly?
I’m going to discuss how to address these problems, using the streaming
ecosystem to address the first point, the streaming-bytestring
library the second, and the resourcet
library the third. Given Haskell’s more-than-one-way-to-do-it philosophy, there exist many alternative ecosystems—libraries like conduit
, pipes-bytestring
, streamly
, and io-streams
all provide tools to address these problems—but the streaming
ecosystem is the one with which I am most familiar, and one with a generally friendly interface expressed with function composition. Both streaming
and streaming-bytestring
are cleverly designed and carefully thought through; I hope that this post serves as an exploration of their associated design decisions and resulting idioms. But to appreciate the depth of the problem, we have to start by discussing how Haskell represents byte buffers and streams of data, and how infelicities in their standard formulation provide streaming-bytestring
its raison d’être.
A Byte Menagerie
Feel free to skip to the next section if you’re already familiar with Haskell string types and the perils of lazy I/O.
Haskell is notorious for its plethora of string types: String
(a lazy list of four-byte Char
values); strict and lazy Text
values, and strict, lazy, and short ByteString
valuesThe bytestring
package also provides Data.ByteString.Char8
and Data.ByteString.Char8.Lazy
, but these actually use the same type in-memory, and provide APIs
dealing in Char
and String
values rather than Word8
and [Word8]
, the former of which can be more natural as long as you know that your input is Latin-1 compatible.
. For the purposes of this article, we’re not going to worry about String
or Text
, because both operate on values known to be valid Unicode data. Our streaming pipelines operate on bytes, not Unicode characters—if, say, we want to be able to operate on binary data in such a pipeline, attempting to represent that data as Unicode text is inherently wrong. As such, we’ll focus only on Haskell’s byte buffer type, ByteString
. Strict ByteStrings
are pretty easy to understand:
module Data.ByteString where
data ByteString -- strict
= BS
{-# UNPACK #-} !(ForeignPtr Word8) -- payload
{-# UNPACK #-} !Int -- length
ByteString
represents a byte buffer and its associated length; in this it is similar to Go’s []byte
or Rust’s &[u8]
(though it has one fewer datum to track, as the Rust and Go types offer mutable access to the associated byte buffer and thus must keep track of its total capacity). ByteString
values can contain NUL
bytes, or indeed any other Word8
value. If a given ByteString
represents human-readable text, it must be converted to Text
values explicitly with Data.Text.Encoding
(for UTF-8/16/32) or String
values with the encoding
library; conversion failures should be handled at those callsites, though the correct behavior is often to throw an exception. This representation also allows for quick serialization and deserialization to C char*
values, by extracting that ForeignPtr Word8
and treating it as the memory address that it is. Similarly, we can choose to create that ForeignPtr Word8
by copying from another source (like another ByteString
or from a socket), or without copying by using unsafe ByteString
operations (which can break referential transparency, but are sometimes necessary in extremely low-level code).
A memory and length pair, supporting O(1) length and indexing and O(n) concatenation, is pretty straightforward. More interesting is this type, the lazy ByteString
. For the sake of clarity of examples, we’ll use a different type name than does bytestring
, which uses the ByteString
name for both lazy and strict variants, disambiguating via the module name (Data.ByteString
for strict, and Data.ByteString.Lazy
for lazy).
module Data.ByteString.Lazy where
import Data.ByteString qualified as Strict
data ByteString
= Empty
| Chunk {-# UNPACK #-} !Strict.ByteString ByteString
-- ^ head chunk -- ^ lazy tail
Rather than being a strictly evaluated pair of buffer and length, this is a lazy list, similar to what we would have were we to represent it with [ByteString]
: Empty
is like []
and Chunk
is like the :
operator. The only operative difference is that the first strict ByteString
parameter is evaluated strictly, which slightly reduces its overhead in memory. The second parameter, a lazy ByteString
representing the rest of the string, is not strict, since the whole point of this enterprise is to let the Haskell runtime manage the laziness associated with our lazy list of byte buffers. This sounds like a great deal: GHC’s evaluator is a finely-tuned instrument, and it’s unlikely that we can improve thereupon.
This lets us imagine how ByteStream.Lazy.readFile
would work: given some chunk size N, the runtime would read N
bytes from a source, placing them in the !ByteString
parameter. The second parameter would then be a thunk that, if evaluated, would read another byte buffer of maximum length N
. If that second parameter is never evaluated—in other words, if our calling code operates only on the first N
bytes, the Lazy.ByteString
parameter will never be evaluated, and the overhead of reading further bytes is never incurred. This, indeed, is more-or-less how that function is implemented.
Yet lazy ByteString
values don’t work reliably when reading files lazily from disk. It violates the principle of deterministic resource cleanup: once a Chunk
is read, reading beyond its embedded ByteString
requires evaluating its second parameter, which entails further system calls to perform further needed disk I/O to build another Chunk
(or an Empty
if we’re at the end of the file). Should those system calls reference a socket or file descriptor that has since been closed, we’ll encounter a runtime crash. Even though openFile
takes place in the IO
monad, the resulting ByteString
has IO
hidden nefariously within, even when passed to contexts where it should be pure. I’ve seen this referred to as “pseudo-pure”: it may look like a pure computation, but it can incur I/O happening somewhere else, which, even if it works, is not the Haskell way. I/O is too important to be left implicit!
This has all been an exceptionally long-winded way to explain why no basic string type fulfills all our criteria in the presence of lazy I/O:
String
andText
operations fail in the face of binary or non-Unicode data;- A strict
ByteString
is read all-at-once into memory, which violates both our need for laziness and constant resource consumption; Lazy.ByteString
violates the princple of deterministic resource cleanup, unless you use Haskell’s experimental support for linear types, which prevent lazy bytestrings from outliving their associated file handle (though linear types are a bleeding-edge feature that have yet to see widespread adoption).
Now, this may not always matter. Your program may deal only in small files, in which case you don’t need lazy I/O at all: you can just read file contents in as a strict ByteString
and you’ll be okay. Computers have a lot of memory nowadays. But Haskell is a lazy language, and it excels at problems that can be phrased lazily. Should our business logic be a matter of lazy stream processing, we need some sort of abstraction that can lazily stream byte buffers from a source of data with constant memory consumption, minimal copying, and safe, deterministic resource cleanup. The trifecta of streaming
, streaming-bytestring
, and resourcet
take care of this: let’s take a look and see how it works.
A Look at the API
The Important Types
The Stream
type Stream (Of a) m r
represents a stream capable of yielding zero or more a
values, performing effects in m
, and returning a final r
result type. We can see these capabilities in the definition of Stream
itself: such a computation can yield a computed Step
, embed a monadic Effect
, or simply Return
a pure value.
module Streaming where
data Stream f m r
= Step !(f (Stream f m r)) -- yield an item, and the rest of a stream, defined by the functor f
| Effect (m (Stream f m r)) -- perform some monadic effect resulting in a further stream
| Return r -- do nothing and return a final value
The most common functor used as the value of f
is the Of
functor, which is identical to the tuple type (,)
, but strict in its left argument, again to avoid the overhead of a lazy value when we know that the value in question has already been evaluated.
module Data.Functor.Of where
data Of a b = !a :< b
The usual way to run a Stream
is to call Streaming.toList
, which returns an Of
-pair containing a list of yielded a
values and a final r
. Note that Of
is partially applied within the Stream
, with its parameter a
being the type of elements a stream yields.
toList :: Monad m => Stream (Of a) m r -> m (Of [a] r)
There’s nothing here specific to bytestrings or I/O, which means that Stream
is suitable for building streaming abstractions in any monad, not just IO
. In contrast, ByteStream
is concerned with holding chunked ByteString
values, not with Stream
:
module Streaming.ByteString
data ByteStream m r
= Empty r
| Chunk {-# UNPACK #-} !ByteString (ByteStream m r)
| Go (m (ByteStream m r))
You’ll notice that ByteStream
is similar to Lazy.ByteString
, with a few differences. Firstly, it has access to two type variables m
and r
: m
represents a monadic context, with which chunked reads can perform side effects, using the Go
constructor. This Go
constructor is also new; it makes it explicit that reading further ByteString
chunks can cause side effects, unlike Lazy.ByteString
, which hid the fact that file I/O may occur when reading long strings from disk. By using MonadResource
to clean up file handles, we can indicate, with the type system, that reading ByteString
values out of a ByteString
entails disk I/O, and thread that MonadResource
constraint anywhere that file handles or ephemeral data must be cleaned up. Lastly, the Empty
constructor takes an r
argument, representing the final value, if any, of a given ByteStream
computation.
The difference between ByteStream
and the lazy ByteString
might not seem seismic, but it represents something very profound about Haskell: often, we gain expressive power by treating computations as data. The fact that ByteStream
has a Go
constructor, which allows embedding arbitrary m
-actions as long as they return a further ByteStream
, gives ByteStream
the ability to represent any kind of computation. In the case of ByteStream
values yielded from the system, that m
can be IO
, or a monad transformer implementing MonadIO
, or an effect stack implementing Lift IO
. But if we were dealing with a ByteStream
defined ahead of time, that m
can be Identity
.An interesting consequence of this is the Show
instance for ByteStream
, which requires the m
parameter to be Identity
and the r
return type to be ()
, because the Show
typeclass does not have access to the monadic context required to print out, say, a ByteStream
that needs IO
to perform its effects. (To print such a ByteStream
, you’d pass it to the stdout
eliminator, of type MonadIO m => ByteStream m r -> m r
, which evaluates that stream for its effects and prints to the console any chunks encountered while doing so.)
Furthermore, ByteStream
isn’t just a computation, it’s a monad transformer, parameterized in terms of some parent monad m
. This means we can use it in unexpected ways, such as the base monad in our quick-and-dirty web server: in that case, having a ByteStream IO
monad at the center of our effect stack gave us the capability to send streams of bytes down a network connection, using the sendM
function. We farmed out the handling of Request
values to the Reader
effect, and a State
effect took care of handling Response
values, but for an HTTP handler to do anything useful, it must have access to a sink of bytes. Using Lift (ByteStream IO)
allowed us access to such capabilities, without divulging the way in which these bytes are ultimately transmitted to the user (all we know is that it happens in IO
, as indeed all network activity must).
Creating a ByteStream
Examining the ways to construct ByteStream
values makes the relationship between ByteString
, Stream
, and ByteStream
clearer:
-- An empty ByteString is generalizable to any m, but carries no value
-- in its return type.
empty :: ByteStream m ()
-- The current program's standard input is itself a stream of bytes,
-- using 'MonadIO' to actually read from the input's file descriptor.
stdin :: MonadIO m => ByteStream m ()
-- If we already have a strict ByteString, we can package it up with
-- 'Chunk' (and an 'Empty' tail).
fromStrict :: Strict.ByteString -> ByteStream m ()
-- Converting from a lazy ByteStream to a ByteString is a matter of
-- folding over the former, replacing the 'Empty' and 'Chunk'
-- constructors with those provided by ByteStream.
fromLazy :: Lazy.ByteString -> ByteStream m ()
-- In general, we can see ByteStream as a more efficient
-- representation of a Stream of unchunked Word8 values, or a stream
-- of chunked, strict ByteString values. Note that these preserve the
-- return value of the input Stream.
pack :: Stream (Of Word8) m r -> ByteStream m r
fromStream :: Stream (Of Strict.ByteString) m r -> ByteStream m r
More interesting is the fundamental file-reading method, readFile
. It is interesting in that it is the first time we see MonadResource
, a typeclass provided by the resourcet
package.
readFile :: MonadResource m => FilePath -> ByteStream m ()
The MonadResource
typeclass represents monads that are capable of managing the lifetime of critical resources, even in the presence of Haskell’s support for asynchronous exceptions. In a multithreaded Haskell program, any thread can send an exception to any other thread at any time, thanks to the fearsome Control.Exception.throwTo
. MonadResource
, and its corresponding ResourceT
monad transformer, allow us a degree of durability in the face of this fact. Though readFile
entails accessing a file on disk, we can be confident that even the most unluckily-timed of exceptions will not result in the associated file handle being orphaned or left unclosed. Additionally, the fact that this MonadResource m
constraint is propagated, via the m
parameter, to the resulting ByteStream
makes it explicit that accessing the entirety of the associated ByteStream
requires access to an associated resource, the lifetime of which is managed by the invoking MonadResource
computation.
Running, Extracting, and Converting
Since ByteString
is the lingua franca byte-buffer type, it follows that we need to be able to run a ByteStream
and assemble its constituent chunks into a ByteString
, whether lazy or strict. However, because ByteStream
computations have a return value, we must take it into account as well. We do so by using the Of
type, again taking advantage of Of a b
’s slight efficiency compared to (a, b)
.
toLazy :: ByteStream m r -> m (Of Lazy.ByteString r)
toStrict :: ByteStrean m r -> m (Of Strict.ByteString r)
That toStrict
function looks awfully tempting, and indeed it is, given how ubiquitous strict ByteString
is in practice. We must be careful, however, not to lose constant-memory streaming here; if we create some infinite stream, and then immediately blow our memory by reading it aggressively into a buffer, then we haven’t accomplished anything. A safer approach is converting it back into a Stream
:
unpack :: ByteStream m r -> Stream (Of Word8) m r
toChunks :: ByteStream m r -> Stream (Of Strict.ByteString) m r
Note that unpack
and toChunks
return their result in a pure context, not a monadic one, since converting between stream types doesn’t entail actually running the streams.
The interface boundaries between bytestrings and streams aren’t the only kind of eliminators that ByteStream
supports. Indeed, familiar functions like head
, last
, and length
are eliminators, consuming their input and running it for as long is as needed to return a result:
head :: Monad m => ByteStream m r -> m (Of (Maybe Word8) r)
last :: Monad m => ByteStream m r -> m (Of (Maybe Word8) r)
length :: Monad m => ByteStream m r -> m (Of Int r)
Most of these eliminators have variant forms, suffixed with an underscore, that discard the ByteStream
’s return value, returning only the left-hand side of the Of
-pair; given how often one deals with ByteStream m ()
, these functions can save many a pattern-match. Note that the only difference between these functions and their equivalents over strict or lazy bytestrings is the m
effects parameter: because getting the constitient chunks of a ByteString m ()
may involve monadic effects in m
, these functions must have a monadic return type.
head_ :: Monad m => ByteStream m r -> m (Maybe Word8)
last_ :: Monad m => ByteStream m r -> m (Maybe Word8)
length_ :: Monad m => ByteStream m r -> m Int
Transformations
As with the above eliminators, there also exist familiar combinators for building new ByteStream
values out of existing ones:
cons :: Monad m => Word8 -> ByteStream m r -> ByteStream m r
map :: Monad m => (Word8 -> Word8) -> ByteStream m r -> ByteStream m r
append :: Monad m => ByteStream m r -> ByteStream m s -> ByteStream m s
take :: Monad m => Int64 -> ByteStream m r -> ByteStream m r
drop :: Monad m => Int64 -> ByteStream m r -> ByteStream m r
Again, it helps to start thinking in computations rather than in data. take n
takes in a ByteString
computation and returns the result of limiting that computation’s underlying byte stream to n
characters. map
promotes a function on bytes (Word8 -> Word8
) to a function operating over ByteStream
computations. In promoting ByteString
values to ByteStream
computations, we’ve lost none of the vocabulary from ByteString
with which we’re familiar, but that vocabulary becomes in turn more lyrical and powerful. Note that unlike the eliminators, these functions do not run in m
: they merely describe modifiers to existing ByteStream
computations, deferring the actual execution of the stream to an eliminator.
Practical (Sort Of) Examples
I enjoy the interplay between the types provided by streaming
and streaming-bytestring
, and in an effort to share why this is so, I’ve expressed the classic Knuth-McIlroy word-frequency classifier: given some input file, we are to process it word-by-word, keeping track of how many times a given word is encountered, and print the twenty-five most commonly-occurring words. This isn’t a perfect problem to express with streaming combinators, since to classify the occurrences of words we must retain them in memory (and thereby break streaming), but it’s a classic problem and the resulting code is quite handsome. Let’s build it, like so. First, we’ll need some imports.
{-# LANGUAGE ImportQualifiedPost #-}
module Main where
import Control.Category ((>>>))
import Control.Monad.Trans.Resource (MonadResource)
import Data.ByteString qualified as Strict
import Data.ByteString.Lazy qualified as Lazy
import Data.HashMap.Strict qualified as Map
import Data.List qualified as List
import Data.Monoid
import Data.Word8 qualified as Word8 -- from 'word8' package
import Streaming
import Streaming.Prelude qualified as Streaming
Given a FilePath
as input, we need to return an association list, mapping words to their frequency.
classify :: MonadResource m => FilePath -> m [(Strict.ByteString, Int)]
In classical Haskell fashion, we’ll express this with function composition. As is my preference when explicating compositions, I’m going to use the >>>
operator, provided by Control.Category
, to express said composition in a left-to-right fashion. Furthermore, I’m going to place, in the comments, the type of the computation at the given point. If omitted, it is the same as the previous stage. (We could do this in code with id
and the TypeApplications
extension, which has the advantage of checking our assumptions at each stage, but it gets a little noisy.)
We start off by calling ByteStream.readFile
, yielding a stream with resource-based access, but no associated return type. We then normalize the input by converting everything to lowercase and ignoring.
-- FilePath
classify = ByteStream.readFile -- ByteStream m ()
>>> ByteStream.map Word8.toLower
At this point, we want to split this ByteStream
up into individual words, so that we can eventually shove these words piece-by-piece into the actual classification stage. ByteStream.splitWith
takes care of this for us, yielding a stream where the functorial type is itself ByteStream m
(rather than the Of
functor that we usually use). That m
appears twice in the resulting type, since it’s propagated both to the Stream
and the ByteStream
yielded from each per-element chunk of the stream.
>>> ByteStream.splitWith Word8.isSpace -- Stream (ByteStream m) m ()
Subsequently, we want to ensure that there aren’t any stray empty strings. The denull
function operates on Streams
of ByteStreams
, removing any empty chunks that appear.
>>> ByteStream.denull
We now have a Stream
containing ByteStream
values, each of which we know to be all lowercase characters. We now need to build something capable of representing a histogram of their frequency of occurrence. At first glance, such a data type could be Map (ByteStream m ()) Int
, but we run into a problem: ByteStream
quantities have no Ord
instance, being computations rather than bare values. Nor do we have a Hashable
instance, so we cannot evade this with HashMap
. What we must do instead is take a step that converts our functorial shape from ByteStream m
to Of Strict.ByteString
, element-by-element. Streaming.mapped
is a combinator capable of changing the functorial shape in such a way, and to it we pass a familiar function: toStrict
, the function that turns a ByteStream
into a Strict.ByteString
. (Though calling this function on a massive ByteStream
could break streaming, we know the individual words are going to be fairly short, so we needn’t worry.) Subsequently, we can filter the stream to remove punctuation and other such non-alphanumeric characters.
>>> Streaming.mapped ByteStream.toStrict -- Stream (Of Strict.ByteString) m ()
>>> Streaming.filter (ByteString.all Word8.isAlpha)
Now we must build a HashMap
-based histogram. Though we could pull in the foldl-statistics
package rather than do this ourselves, it’s not painful to express with the streaming
library’s fold_
function, which takes a step function, a starting value, and an extraction function. Our starting value will be the empty HashMap
, our extraction function will be its toList
destructor yielding (ByteString, Int)
pairs, and our step function will either insert 1
or add 1
to the value already contained therein.
>>> let step m str = Map.insertWith (+) str 1 m
in Streaming.fold_ step Map.empty Map.toList -- m [(ByteString, Int)]
And now we can use the list primitivesA fun exercise for the interested reader is to use a less naïve method to extract the most common occurrences, avoiding the overhead of Map.toList
.
sortOn
and take
to extract the twenty-five most common occurrences. (The snd >>> Down
invocation ensures that we compare each (ByteString, Int)
pair based on its second field, in descending order.)
>>> fmap (List.sortOn (snd >>> Down) >>> List.take 25)
Put all together, we have a pipeline that is aesthetically appealing, and around half as fast as do the optimized-over-decades tools that make up McIlroy’s pipeline (which is what we expect, given that Haskell programs run around half as fast as does handwritten C).
classify :: MonadResource m => FilePath -> m [(ByteString, Int)]
=
classify
ByteStream.readFile>>> ByteStream.map Word8.toLower
>>> ByteStream.splitWith Word8.isSpace
>>> ByteStream.denull
>>> Streaming.mapped ByteStream.toStrict
>>> Streaming.filter (ByteString.all Word8.isAlpha)
>>> let step m str = Map.insertWith (+) str 1 m
in Streaming.fold_ step Map.empty Map.toList
>>> fmap (List.sortOn (snd >>> Ord.Down) >>> List.take 25)
Explicit Copying
Here’s another example, extracted from some code I once worked on to interact with Git repositories. This code, given some Blob
type carrying with it a pointer to some Git data on disk, summarizes its contents, retaining both the first line of the blob for display as well as the number of bytes found in the blob in total.
newtype Result = Result (Of (Maybe Lazy.ByteString) Int)
summarize ::
MonadMask m, MonadUnliftIO m, Git.MonadGit r m, Git.Libgit2.HasLgRepo m) =>
(Git.Blob r m ->
Result m
The first few lines are uninteresting, as they are concerned solely with coercing the Lazy.ByteString
contents of this Git.Blob
into a ByteStream
. The only new function here is mwrap
, an alias for the Go
constructor, which promotes an effect returning a ByteStream
into a ByteStream
itself.
=
summarize -- m Lazy.ByteString
Git.blobToLazyByteString >>> fmap ByteStream.fromLazy -- m (ByteStream m ())
>>> ByteStream.mwrap -- ByteStream m ()
Now we must think a bit harder. The ByteStream
contents of this blob have two purposes: we must count the number of lines therein, and we must retain their first line for display purposes. As such, we have two eliminating functions we want to use ByteStream.head_
and ByteStream.length
. Because ByteStream
hews closely to a consumer-producer model, it won’t suffice simply to invoke these functions: we must copy the ByteStream
input.
The copy operation on strict ByteString
values has a pretty straightforward type signature:
copy :: ByteString -> ByteString
Not rocket science: it takes in a byte buffer, copies it with memcpy
or some equivalent formulation, and returns the copy. In contrast, the type signature of streaming-bytestring
’s copy operation is subtzle:
copy :: ByteStream m a -> ByteStream (ByteStream m) a
Copying a ByteStream
is not a process of memcpy
. Indeed, we specifically don’t want to copy the byte buffers within the stream, because that can be expensive, and we’re not operating destructively upon them, since Haskell is a pure language. Instead, our copy
will copy the constructors, rather than the contents, of ByteStream
, wrapping them around the m
parameter in ByteStream m
and ultimately yielding a nested ByteStream (ByteStream m)
. This nested datum represents our responsibility to handle those effects, whatever we do to them. In handling those effects, we will eliminate that nested ByteStream
layer, returning it to an m
, and ultimately acting on a stream of data in two ways without breaking streaming or incurring otiose buffer copies.
>>> ByteStream.copy -- ByteStream (ByteStream m) ()
We now have two eliminators to call. Let’s start by specializing the type of length_
so that it behaves as though we had passed in a ByteStream (ByteStream m)
, using the TypeApplications
extension to GHC:
@(ByteStream m)
length_ :: Monad m => ByteStream (ByteStream m) r -> ByteStream m Int
Computing the length
of a nested ByteStream
returning an r
value returns another ByteStream
, but paired up with more information. The resulting ByteStream
now keeps track of the length of all the data that runs through it; when it finishes and returns an r
value, that value is now associated with an Int
representing that length. The fact that computing the length
of that nested ByteStream
entailed its elimination is made apparent by the fact that the resulting ByteStream
is flat, its monadic parameter just m
.
>>> ByteStream.length_ -- ByteStream m Int
Now that we’re set to compute the length of the input stream as a whole, we need to extract its first line. As with our previous examples, we’ll call lines
to turn the input ByteStream
into a Stream
, then Streaming.mapped
to yield a stream of Lazy.ByteString
values (since the whole file contents could, concievably, be on one line, we must exercise caution). We’ll then call head
to get the first line of the blob and its total length.
>>> ByteStream.lines -- Stream (ByteStream m) m Int
>>> Streaming.mapped ByteStream.toLazy -- Stream (Of Lazy.ByteString) m Int
>>> Streaming.head -- m (Of (Maybe Lazy.ByteString), Int)
>>> fmap Result -- m Result
All together, this pipeline is remarkably concise, and it retains glorious type safety. We can be confident that, should we be provided some massive Git blob, our program will not chew through more memory than it should.
summarize ::
MonadMask m, MonadUnliftIO m, Git.MonadGit r m, Git.Libgit2.HasLgRepo m) =>
(Git.Blob r m ->
Result
m =
summarize -- m Lazy.ByteString
Git.blobToLazyByteString >>> fmap ByteStream.fromLazy -- m (ByteStream m ())
>>> ByteStream.mwrap -- ByteStream m ()
>>> ByteStream.copy -- ByteStream (ByteStream m) ()
>>> ByteStream.length_ -- ByteStream m Int
>>> ByteStream.lines -- Stream (ByteStream m) m Int
>>> Streaming.mapped ByteStream.toLazy -- Stream (Of Lazy.ByteString) m Int
>>> Streaming.head -- m (Of (Maybe Lazy.ByteString), Int)
>>> fmap Result -- m Result
Let’s Get Categorical
You may notice that these examples don’t deal exclusively with the tools provided by streaming-bytestring
: idiomatic use of streaming-bytestring
is inextricably connected with streaming
itself. I pointed out earlier that ByteStream
is like a version of Lazy.ByteString
extended to encapsulate effectful computations, and that doing so gives us ByteStream
instances for Functor
, Monad
, and MonadTrans
. But the similarities don’t end there: because streaming
depends on the mmorph
package, these types also implement the MFunctor
and MMonad
classes, which represent functors and monads in the category of monads. (If your attention just wandered, I know, I know, but stay with me. I promise that this is pretty cool.)
Let’s start by taking a look at the copy
operations for Stream
and ByteStream
.
:: Stream (Of a) m r -> Stream (Of a) (Stream (Of a) m) r
Streaming.copy :: ByteStream m r -> ByteStream (ByteStream m) r ByteStream.copy
These functions both take in an action based in some monad m
and return a new action in which that m
is wrapped in a further type, Stream (Of a) m
for the former and ByteStream m
for the latter. For both of these, we can think of copying as a nesting operation. And as it happens, MFunctor
and MMonad
provide us vocabulary to handle monads within monads. We’ll cover MFunctor
here, as it’s pretty simple.
class MFunctor t where
hoist :: Monad m => (forall a . old a -> new a) -> t old b -> t new b
hoist
promotes a natural transformation from old
to new
to a function operating on MFunctor
values, one that changes the type of the nested monad parameter m
. We can see this in action if we speak in terms of Stream
and ByteStream
:
@(Stream (Of a)) ::
hoist Monad m =>
forall a . old a -> new a) ->
(Stream (Of a) old r ->
Stream (Of a) new r
@ByteStream ::
hoist Monad m =>
forall a . old a -> new a) ->
(ByteStream old r ->
ByteStream new r
Thanks to hoist
, we have a consistent vocabulary for speaking about nested Stream
and ByteStream
values, no matter how deeply nested they may be. It also gives us certain invariants for how we can expect our library to behave: for example, hoisting the effects
eliminator into a nested ByteStream
peels away the nesting, discarding all chunks and merely running the effects. From this we can deduce that copy >>> hoist effects
is equivalent to id
, as is just plain copy >>> effects
itself. Whether we want to flatten a nested monad externally or internally with hoist
, we have options: use whichever is more aesthetically pleasing at the time.
end of file
If you kept with me during all of this, I appreciate you. This post got away from me, length-wise. But now it is over.
I thank Rob Rix and Joe Kachmar for their comments and suggestions on drafts of this article.