A streaming library with a superpower: FS2 and functional programming
As a full-stack Scala developer, I‘ve worked with numerous streaming libraries including Akka Streams, Monix Observable, and fs2. Among these, FS2 stands out as the most principled, performant, and ergonomic. Its purely functional design and seamless integration with the cats-effect IO monad have made it an indispensable tool in my architecture toolbox.
In this deep dive, we‘ll explore the power of FS2 and how it allows us to construct sophisticated streaming pipelines in an elegant, functional style. I‘ll share code snippets, performance benchmarks, and real-world use cases to demonstrate why FS2 is a "streaming library with a superpower."
The Basics: Stream and Pure Functions
At the heart of FS2 is the Stream[F, O]
data type. This represents a stream of O
output values, where producing each value may require effects captured by the effect type F
. Effects can be any external interaction like reading a file, querying a database, or making an HTTP request.
What‘s special about Stream
is that it‘s defined entirely in terms of pure functions. Let‘s look at a simple example:
import cats.effect.IO
import fs2.Stream
val s: Stream[IO, Int] = Stream(1, 2, 3).covary[IO]
Here we construct a Stream of integers, then convert it to a Stream[IO, Int]
using the covary
method. Despite the use of IO
, the stream itself is a pure value. It‘s merely a description of a computation that, when interpreted, will emit the values 1
, 2
, and 3
. No effects are actually performed until we run the stream:
s.compile.toList.unsafeRunSync()
// res: List[Int] = List(1, 2, 3)
This separation of computation description from execution is a hallmark of functional programming, and it enables us to build complex pipelines in a modular, testable, and declarative way.
Transforming Streams: It‘s Just Map and FlatMap
Once we have a Stream
, we can transform it using a arsenal of combinators. The most fundamental are map
and flatMap
:
def intToString(i: Int): String = i.toString
val s1: Stream[IO, String] = s.map(intToString)
// Stream("1", "2", "3")
def intToStream(i: Int): Stream[IO, Int] = Stream.emits(List(i, i + 1))
val s2: Stream[IO, Int] = s.flatMap(intToStream)
// Stream(1, 2, 2, 3, 3, 4)
map
applies a pure function to each element, transforming a Stream[IO, Int]
to a Stream[IO, String]
. flatMap
maps each element to a new stream and concatenates the results, allowing us to produce variable numbers of outputs per input.
With just these two combinators plus filter
, fold
, zip
, and others, we can construct elaborate pipelines. For example, here‘s a streaming CSV parser:
def rows[F[_]](path: Path)(implicit F: Sync[F]): Stream[F, List[String]] =
io.file.readAll[F](path, 4096)
.through(text.utf8Decode)
.through(text.lines)
.map(_.split(",").toList)
We read the file as a stream of bytes, decode the bytes into a stream of characters using UTF-8, split the characters into lines, and finally parse each line into a list of strings by splitting on commas. The through
combinator is just an alias for flatMap
that allows us to chain transformations in a readable left-to-right fashion.
Effectful Streams: Talking to the External World
Of course, most streaming problems involve more than just transforming in-memory data. We often need to interact with external systems like databases, web services, or message queues. FS2 makes these interactions both simple and safe.
Consider a program that loads data from a database, enriches it with information from a web API, and publishes the results to a message queue. An FS2 implementation might look like:
def userIds(path: Path): Stream[IO, Long] = ...
def userFromDb(id: Long): IO[User] = ...
def enrichUser(user: User): IO[UserWithStats] =
IO(???) // Call web API
def publish(user: UserWithStats): IO[Unit] =
IO(???) // Publish to message queue
val program: Stream[IO, Unit] =
userIds(Paths.get("ids.csv"))
.evalMap(userFromDb)
.evalMap(enrichUser)
.evalMap(publish)
The evalMap
combinator allows us to perform an effectful computation for each element of the stream. In this case, we load each user from the database, enrich their data by calling a web API, and publish the enriched user to a message queue. At each step, we‘re mapping elements of the stream to IO
actions.
Note that these IO
actions aren‘t actually executed until we run the stream. We can build up the entire pipeline as a pure value, then compile and run it when we‘re ready:
val task: IO[Unit] = program.compile.drain
task.unsafeRunSync()
The IO
data type and FS2‘s Stream
are a match made in functional programming heaven. IO
gives us a principled, purely functional way to interact with the external world, while Stream
allows us to compose these interactions into reusable, modular components.
Buffers and Concurrency: Optimizing for Performance
FS2‘s combinators are useful for more than just IO. They can also help us optimize our streaming pipelines for performance. Two key constructs are Buffer
and Concurrent
.
Buffer: Decoupling Production from Consumption
In FS2, a Buffer
allows us to decouple the production of values from their consumption. Consider a stream that reads bytes from a file and parses them into JSON values:
def parseJson[F[_]](bytes: Stream[F, Byte]): Stream[F, Json] = ...
val pipeline: Stream[IO, Json] =
io.file.readAll[IO](Paths.get("data.json"), 4096)
.through(parseJson)
By default, FS2 will read bytes from the file and parse them into JSON one element at a time. This is simple but inefficient, as the overhead of reading from a file for each byte is high.
We can improve this with a Buffer
:
val pipeline2: Stream[IO, Json] =
io.file.readAll[IO](Paths.get("data.json"), 4096)
.buffer(1024)
.through(parseJson)
Now FS2 will read bytes in chunks of size 1024
and only parse a value when the downstream parseJson
step is ready to receive it. This "buffers" the production of bytes and allows the file reader to proceed in parallel with JSON parsing, improving overall throughput.
Concurrent: Parallelizing Independent Steps
In addition to buffering, FS2 supports concurrency via the parJoin
and parJoinUnbounded
combinators. These allow us to run multiple streams concurrently and combine their outputs.
Let‘s extend our JSON parsing example to handle multiple files:
def parseFile(path: Path): Stream[IO, Json] =
io.file.readAll[IO](path, 4096).through(parseJson)
def parseFiles(paths: List[Path]): Stream[IO, Json] =
Stream.emits(paths).map(parseFile).parJoin(10)
parJoin
runs up to 10 instances of parseFile
in parallel, emitting JSON values downstream as they‘re produced. This can dramatically speed up our pipeline if the files are independent and we have available CPU cores.
FS2‘s concurrency primitives are built on the same functional foundation as the rest of the library. They‘re purely functional descriptions of concurrent computations, and they compose seamlessly with other combinators like map
, flatMap
, and buffer
.
Real-World FS2: A High-Performance Redis Client
To see FS2 in action, let‘s analyze the fs2-redis
library. This is a Redis client built on FS2, used in production at companies like Disney Streaming Services.
At a high level, fs2-redis
exposes two key data types:
trait RedisClient[F[_], K, V] {
def send(command: Command[K, V]): F[Resp[K, V]]
}
trait Streaming[F[_], K, V] extends RedisClient[Stream[F, *], K, V] {
def stream(command: Command[K, V]): Stream[F, Resp[K, V]]
}
RedisClient
allows us to send a single command to Redis and receive a response wrapped in an effect F
. Streaming
extends this to support streaming responses, returning a Stream[F, Resp[K, V]]
.
With these abstractions, we can build complex Redis interactions:
def streamingInfo(client: Streaming[IO, String, String]): Stream[IO, String] =
client.stream(Command("INFO", Nil)).flatMap {
case InfoResp(info) => Stream.emit(info)
case _ => Stream.empty
}
Here we use the INFO
command to retrieve Redis server information as a stream of strings. The flatMap
call allows us to process the different response types and emit strings downstream.
To see how fs2-redis
achieves high performance, let‘s look at its use of pipelining:
def pipeline(client: RedisClient[IO, String, String]): IO[Unit] = {
val commands = List("PING", "PING", "PING")
Stream.emits(commands)
.map(Command(_, Nil))
.through(client.pipeline)
.compile
.drain
}
The pipeline
combinator buffers commands and sends them to Redis in a single batch. This reduces network roundtrips and can dramatically improve throughput.
Under the hood, fs2-redis
is built on the cats-effect
and scodec
libraries. cats-effect
provides the IO
monad for purely functional effects, while scodec
is used for binary serialization of Redis commands and responses.
By leveraging FS2 and these other functional Scala libraries, fs2-redis
achieves a clean, modular design without sacrificing performance. It‘s a testament to the power of functional programming for building high-performance, real-world systems.
The FS2 Philosophy: Effects as Values
At its core, FS2 is about treating effects as values. Whether we‘re reading from a file, querying a database, or publishing to a message queue, FS2 allows us to represent these actions as pure data in our programs.
This has a profound impact on our ability to write modular, testable, and maintainable code. We can build complex pipelines by composing small, reusable units, confident that nothing is happening "under the hood" that could introduce bugs or performance issues.
When we do need to execute effects, we can do so in a controlled, principled way using cats-effect
data types like IO
. These data types provide a clear separation between the description of an effect and its execution, allowing us to reason about our code in a pure, functional way.
Wrapping Up
FS2 is a powerful tool for building streaming data pipelines in Scala. By treating streams as first-class values and leveraging pure functions for transformation and combination, FS2 allows us to write expressive, modular, and efficient code.
Through examples like CSV parsing, database integration, and a real-world Redis client, we‘ve seen how FS2‘s combinators and abstractions enable us to solve complex problems with elegance and grace.
At the same time, FS2 doesn‘t sacrifice performance. Features like concurrent stream processing and pipelined requests allow us to build high-throughput, low-latency systems without abandoning functional programming principles.
As a full-stack Scala developer, I reach for FS2 whenever I need to build a streaming data pipeline. Its pure, declarative API and seamless integration with cats-effect
make it a joy to use, while its performance and flexibility make it suitable for even the most demanding production workloads.
If you‘re a Scala developer looking to up your streaming game, give FS2 a try. I think you‘ll find that it really is a "streaming library with a superpower."