A streaming library with a superpower: FS2 and functional programming

As a full-stack Scala developer, I‘ve worked with numerous streaming libraries including Akka Streams, Monix Observable, and fs2. Among these, FS2 stands out as the most principled, performant, and ergonomic. Its purely functional design and seamless integration with the cats-effect IO monad have made it an indispensable tool in my architecture toolbox.

In this deep dive, we‘ll explore the power of FS2 and how it allows us to construct sophisticated streaming pipelines in an elegant, functional style. I‘ll share code snippets, performance benchmarks, and real-world use cases to demonstrate why FS2 is a "streaming library with a superpower."

The Basics: Stream and Pure Functions

At the heart of FS2 is the Stream[F, O] data type. This represents a stream of O output values, where producing each value may require effects captured by the effect type F. Effects can be any external interaction like reading a file, querying a database, or making an HTTP request.

What‘s special about Stream is that it‘s defined entirely in terms of pure functions. Let‘s look at a simple example:

import cats.effect.IO
import fs2.Stream

val s: Stream[IO, Int] = Stream(1, 2, 3).covary[IO]

Here we construct a Stream of integers, then convert it to a Stream[IO, Int] using the covary method. Despite the use of IO, the stream itself is a pure value. It‘s merely a description of a computation that, when interpreted, will emit the values 1, 2, and 3. No effects are actually performed until we run the stream:

s.compile.toList.unsafeRunSync()
// res: List[Int] = List(1, 2, 3)

This separation of computation description from execution is a hallmark of functional programming, and it enables us to build complex pipelines in a modular, testable, and declarative way.

Transforming Streams: It‘s Just Map and FlatMap

Once we have a Stream, we can transform it using a arsenal of combinators. The most fundamental are map and flatMap:

def intToString(i: Int): String = i.toString

val s1: Stream[IO, String] = s.map(intToString)  
// Stream("1", "2", "3")

def intToStream(i: Int): Stream[IO, Int] = Stream.emits(List(i, i + 1))  

val s2: Stream[IO, Int] = s.flatMap(intToStream)
// Stream(1, 2, 2, 3, 3, 4)

map applies a pure function to each element, transforming a Stream[IO, Int] to a Stream[IO, String]. flatMap maps each element to a new stream and concatenates the results, allowing us to produce variable numbers of outputs per input.

With just these two combinators plus filter, fold, zip, and others, we can construct elaborate pipelines. For example, here‘s a streaming CSV parser:

def rows[F[_]](path: Path)(implicit F: Sync[F]): Stream[F, List[String]] =
  io.file.readAll[F](path, 4096)
    .through(text.utf8Decode)
    .through(text.lines)
    .map(_.split(",").toList) 

We read the file as a stream of bytes, decode the bytes into a stream of characters using UTF-8, split the characters into lines, and finally parse each line into a list of strings by splitting on commas. The through combinator is just an alias for flatMap that allows us to chain transformations in a readable left-to-right fashion.

Effectful Streams: Talking to the External World

Of course, most streaming problems involve more than just transforming in-memory data. We often need to interact with external systems like databases, web services, or message queues. FS2 makes these interactions both simple and safe.

Consider a program that loads data from a database, enriches it with information from a web API, and publishes the results to a message queue. An FS2 implementation might look like:

def userIds(path: Path): Stream[IO, Long] = ...

def userFromDb(id: Long): IO[User] = ...

def enrichUser(user: User): IO[UserWithStats] =
  IO(???) // Call web API  

def publish(user: UserWithStats): IO[Unit] =
  IO(???) // Publish to message queue

val program: Stream[IO, Unit] =
  userIds(Paths.get("ids.csv"))
    .evalMap(userFromDb)
    .evalMap(enrichUser)
    .evalMap(publish)

The evalMap combinator allows us to perform an effectful computation for each element of the stream. In this case, we load each user from the database, enrich their data by calling a web API, and publish the enriched user to a message queue. At each step, we‘re mapping elements of the stream to IO actions.

Note that these IO actions aren‘t actually executed until we run the stream. We can build up the entire pipeline as a pure value, then compile and run it when we‘re ready:

val task: IO[Unit] = program.compile.drain
task.unsafeRunSync()  

The IO data type and FS2‘s Stream are a match made in functional programming heaven. IO gives us a principled, purely functional way to interact with the external world, while Stream allows us to compose these interactions into reusable, modular components.

Buffers and Concurrency: Optimizing for Performance

FS2‘s combinators are useful for more than just IO. They can also help us optimize our streaming pipelines for performance. Two key constructs are Buffer and Concurrent.

Buffer: Decoupling Production from Consumption

In FS2, a Buffer allows us to decouple the production of values from their consumption. Consider a stream that reads bytes from a file and parses them into JSON values:

def parseJson[F[_]](bytes: Stream[F, Byte]): Stream[F, Json] = ...

val pipeline: Stream[IO, Json] =
  io.file.readAll[IO](Paths.get("data.json"), 4096)
    .through(parseJson) 

By default, FS2 will read bytes from the file and parse them into JSON one element at a time. This is simple but inefficient, as the overhead of reading from a file for each byte is high.

We can improve this with a Buffer:

val pipeline2: Stream[IO, Json] =
  io.file.readAll[IO](Paths.get("data.json"), 4096)
    .buffer(1024)
    .through(parseJson)

Now FS2 will read bytes in chunks of size 1024 and only parse a value when the downstream parseJson step is ready to receive it. This "buffers" the production of bytes and allows the file reader to proceed in parallel with JSON parsing, improving overall throughput.

Concurrent: Parallelizing Independent Steps

In addition to buffering, FS2 supports concurrency via the parJoin and parJoinUnbounded combinators. These allow us to run multiple streams concurrently and combine their outputs.

Let‘s extend our JSON parsing example to handle multiple files:

def parseFile(path: Path): Stream[IO, Json] =
  io.file.readAll[IO](path, 4096).through(parseJson)

def parseFiles(paths: List[Path]): Stream[IO, Json] =
  Stream.emits(paths).map(parseFile).parJoin(10)

parJoin runs up to 10 instances of parseFile in parallel, emitting JSON values downstream as they‘re produced. This can dramatically speed up our pipeline if the files are independent and we have available CPU cores.

FS2‘s concurrency primitives are built on the same functional foundation as the rest of the library. They‘re purely functional descriptions of concurrent computations, and they compose seamlessly with other combinators like map, flatMap, and buffer.

Real-World FS2: A High-Performance Redis Client

To see FS2 in action, let‘s analyze the fs2-redis library. This is a Redis client built on FS2, used in production at companies like Disney Streaming Services.

At a high level, fs2-redis exposes two key data types:

trait RedisClient[F[_], K, V] {
  def send(command: Command[K, V]): F[Resp[K, V]]
}

trait Streaming[F[_], K, V] extends RedisClient[Stream[F, *], K, V] {
  def stream(command: Command[K, V]): Stream[F, Resp[K, V]]  
}

RedisClient allows us to send a single command to Redis and receive a response wrapped in an effect F. Streaming extends this to support streaming responses, returning a Stream[F, Resp[K, V]].

With these abstractions, we can build complex Redis interactions:

def streamingInfo(client: Streaming[IO, String, String]): Stream[IO, String] =
  client.stream(Command("INFO", Nil)).flatMap {
    case InfoResp(info) => Stream.emit(info)
    case _ => Stream.empty
  }

Here we use the INFO command to retrieve Redis server information as a stream of strings. The flatMap call allows us to process the different response types and emit strings downstream.

To see how fs2-redis achieves high performance, let‘s look at its use of pipelining:

def pipeline(client: RedisClient[IO, String, String]): IO[Unit] = {
  val commands = List("PING", "PING", "PING")
  Stream.emits(commands)
    .map(Command(_, Nil))
    .through(client.pipeline)
    .compile
    .drain
}

The pipeline combinator buffers commands and sends them to Redis in a single batch. This reduces network roundtrips and can dramatically improve throughput.

Under the hood, fs2-redis is built on the cats-effect and scodec libraries. cats-effect provides the IO monad for purely functional effects, while scodec is used for binary serialization of Redis commands and responses.

By leveraging FS2 and these other functional Scala libraries, fs2-redis achieves a clean, modular design without sacrificing performance. It‘s a testament to the power of functional programming for building high-performance, real-world systems.

The FS2 Philosophy: Effects as Values

At its core, FS2 is about treating effects as values. Whether we‘re reading from a file, querying a database, or publishing to a message queue, FS2 allows us to represent these actions as pure data in our programs.

This has a profound impact on our ability to write modular, testable, and maintainable code. We can build complex pipelines by composing small, reusable units, confident that nothing is happening "under the hood" that could introduce bugs or performance issues.

When we do need to execute effects, we can do so in a controlled, principled way using cats-effect data types like IO. These data types provide a clear separation between the description of an effect and its execution, allowing us to reason about our code in a pure, functional way.

Wrapping Up

FS2 is a powerful tool for building streaming data pipelines in Scala. By treating streams as first-class values and leveraging pure functions for transformation and combination, FS2 allows us to write expressive, modular, and efficient code.

Through examples like CSV parsing, database integration, and a real-world Redis client, we‘ve seen how FS2‘s combinators and abstractions enable us to solve complex problems with elegance and grace.

At the same time, FS2 doesn‘t sacrifice performance. Features like concurrent stream processing and pipelined requests allow us to build high-throughput, low-latency systems without abandoning functional programming principles.

As a full-stack Scala developer, I reach for FS2 whenever I need to build a streaming data pipeline. Its pure, declarative API and seamless integration with cats-effect make it a joy to use, while its performance and flexibility make it suitable for even the most demanding production workloads.

If you‘re a Scala developer looking to up your streaming game, give FS2 a try. I think you‘ll find that it really is a "streaming library with a superpower."

Similar Posts