A Complete Guide to Node.js Streams

Avatar of Hemanta Sundaray

Hemanta Sundaray

Published

Updated

Streams are a way to transfer and process large amounts of data in small chunks.

Consider reading a 6GB video file. Without streams, you would need to load the entire 6GB into memory before doing anything with it. What if your machine only has 4GB of RAM? Your program would crash before it even started.

With streams, on the other hand, you can start processing the first chunk of data almost immediately, while the rest of the file is still being read. Each chunk gets processed and can be released from memory, so you might only ever have a few kilobytes in memory at any given moment.

This approach gives us two major benefits:

  • Memory efficiency (spatial): Because you process small chunks, you don't need huge amounts of RAM. You can process files larger than your available memory.
  • Time efficiency (temporal): You can start processing data as soon as you receive the first chunk. You don't have to wait for the entire payload to arrive. This significantly reduces latency.

In Node.js, there are four types of streams:

  • Readable: A source you consume data from, such as reading a file or receiving an HTTP response.
  • Writable: A destination you send data to, such as writing to a file or sending an HTTP request.
  • Duplex: A stream that is both readable and writable, such as a TCP socket where you can send and receive data simultaneously.
  • Transform: A special type of duplex stream that can modify or transform data (compressing or encrypting, for example) as it passes through.

Let's explore each of these in detail, starting with Readable streams.

Readable Streams

A readable stream represents a source of data. We can read from a readable stream in two modes: paused mode and flowing mode.

Reading from a stream in paused mode

Paused mode is the default mode. In this mode, we pull data from the source. This is why paused mode is also known as the pull mode. Here is an example where we read a file using streams:

import { createReadStream } from "node:fs";
// Get the readable stream
const readable = createReadStream("example.txt");
// Set the encoding to receive strings instead of Buffer objects
readable.setEncoding("utf-8");
// Attach a listener to the readable event
readable.on("readable", function () {
let chunk: string | null;
// Read data in loop until the internal buffer is empty (returns null)
while ((chunk = readable.read()) !== null) {
console.log(chunk);
}
});
// Handle the end of the stream
readable.on("end", function () {
console.log("Finished reading the file");
});
// Handle potential errors
readable.on("error", function (err) {
console.error("Error reading file:", err);
});

In this code snippet:

  • We first create a readable stream for the example.txt file using the createReadStream method. At this point, Node.js hasn't read anything yet; it has just set up the stream.
  • We call setEncoding("utf-8") to receive strings instead of raw Buffer objects. Without this, we would need to manually convert each chunk using chunk.toString("utf-8").
  • We attach a listener to the readable event, which fires whenever new data is available in the stream's internal buffer.
  • Inside the listener, we use a while loop to repeatedly call read() until it returns null. This pulls data from the internal buffer chunk by chunk. When read() returns null, it means the buffer is temporarily empty, not that the file is finished. When more data arrives, the readable event fires again.
  • We listen for the end event, which fires once when the entire stream has been consumed and there is no more data coming.
  • We listen for the error event to handle any problems that might occur, such as the file not existing or permission issues.

Reading from a stream in flowing mode

While paused mode requires you to "pull" data, flowing mode "pushes" data to you as soon as it is available. This is why flowing mode is often referred to as push mode.

To switch a stream into flowing mode, you simply attach a listener to the data event. Here is how the previous example looks in flowing mode:

import { createReadStream } from "node:fs";
// Get the readable stream
const readable = createReadStream("example.txt");
// Set the encoding to receive strings instead of Buffer objects
readable.setEncoding("utf-8");
// Attach a listener to the data event (switches to flowing mode)
readable.on("data", function (chunk: string) {
console.log(chunk);
});
// Handle the end of the stream
readable.on("end", function () {
console.log("Finished reading the file");
});
// Handle potential errors
readable.on("error", function (err) {
console.error("Error reading file:", err);
});

In this code snippet:

  • Instead of the readable event, we listen for the data event. This single change switches the stream from paused mode to flowing mode.
  • We no longer need a while loop or the read() method. The stream automatically pushes each chunk directly to our data event handler.
  • The chunk is passed as an argument to our listener function, so we simply use it rather than pulling it ourselves.
  • The end and error events work exactly the same as in paused mode.

Notice how the code is simpler. But this simplicity comes with a trade-off. In flowing mode, data keeps arriving whether you're ready for it or not. With paused mode, you have fine-grained control: you decide when to call read(), how many bytes to request, and when to pause processing. In flowing mode, you're just reacting to data as it arrives.

That said, you can regain some control by calling pause() to temporarily stop the flow and resume() to start it again. But if you need precise control over when and how much data to read, paused mode is the better choice.

Reading streams using async iterators

Readable streams in Node.js implement the async iterator protocol. This means you can use the for await...of syntax to consume data from a stream, which often results in cleaner and more readable code.

Here is the same file reading example using async iterators:

import { createReadStream } from "node:fs";
async function readFile(): Promise<void> {
const readable = createReadStream("example.txt");
readable.setEncoding("utf-8");
try {
for await (const chunk of readable) {
console.log(chunk);
}
console.log("Finished reading the file");
} catch (err) {
console.error("Error reading file:", err);
}
}
readFile();

In this code snippet:

  • We wrap our logic in an async function since for await...of can only be used inside async functions.
  • The for await...of loop waits for each chunk to arrive and gives it to us as the loop variable. No need to listen for events or call read() manually.
  • When the stream ends, the loop exits naturally. There is no need for an end event listener.
  • If the stream emits an error, the loop throws an exception. We handle this with a standard try...catch block instead of an error event listener.

This approach reads almost like synchronous code while remaining fully asynchronous under the hood. The await keyword pauses the loop until the next chunk arrives, but it doesn't block the event loop—other operations can still happen while waiting.

One thing to keep in mind: when you use for await...of, the stream operates in flowing mode. If you need fine-grained control over when and how much data to read, stick with paused mode and the readable event.

Implementing readable streams

So far, we have been consuming readable streams. You can also create your own custom readable streams that produce data from any source you want.

The stream module provides a base Readable class that you can use to implement custom readable streams. There are two approaches to creating them: the inheritance approach (extending the class) and the constructor approach (passing options to the Readable constructor).

Let's understand how both approaches work by creating a simple custom stream that generates a sequence of numbers from 1 to a specified maximum value. Each time the stream is read, it will produce the next number in the sequence until it reaches the maximum value and signals the end.

The inheritance approach

When you extend the Readable class, you must implement a method called _read(). This is the internal method that the stream calls whenever it needs more data to fill its internal buffer. Inside _read(), you push data into the stream using this.push().

import { Readable } from "node:stream";
class CounterStream extends Readable {
private current: number;
private max: number;
constructor(max: number) {
super({ encoding: "utf-8" });
this.current = 1;
this.max = max;
}
_read(): void {
if (this.current <= this.max) {
this.push(`${this.current}\n`);
this.current++;
} else {
this.push(null); // signals end of stream
}
}
}
const counter = new CounterStream(5);
counter.on("data", function (chunk: string) {
console.log(`Received: ${chunk.trim()}`);
});
counter.on("end", function () {
console.log("Stream finished");
});

In this code snippet:

  • We create a CounterStream class that extends the Readable class.
  • In the constructor, we call super({ encoding: "utf-8" }) to initialize the parent class and set the encoding so we receive strings instead of buffers.
  • We implement the _read() method. We don't call this method ourselves—the stream calls it internally whenever its buffer needs filling.
  • Inside _read(), we push data using this.push(). When we've produced all our numbers, we push null to signal that there is no more data.
  • We consume the stream using the data event, just like any other readable stream.

The constructor approach

Instead of creating a class, you can pass a read function directly to the Readable constructor. This achieves the same result with less boilerplate:

import { Readable } from "node:stream";
let current = 1;
const max = 5;
const counter = new Readable({
encoding: "utf-8",
read(): void {
if (current <= max) {
this.push(`${current}\n`);
current++;
} else {
this.push(null);
}
},
});
counter.on("data", function (chunk: string) {
console.log(`Received: ${chunk.trim()}`);
});
counter.on("end", function () {
console.log("Stream finished");
});

This does exactly the same thing, but without defining a class. The read function in the options object works just like the _read() method in the inheritance approach.

The constructor approach is useful when you need a quick, one-off stream and don't want the ceremony of defining a class. The inheritance approach makes more sense when you need reusable stream types with additional methods or complex internal state.

Writable Streams

A writable stream represents a destination for data. While readable streams are sources you consume data from, writable streams are targets you send data to. Examples include writing to a file using createWriteStream, sending data in an HTTP request, or writing to process.stdout.

Writing to a stream

Let's look at an example where we create a file using streams:

import { createWriteStream } from "node:fs";
const writable = createWriteStream("output.txt");
const chunkSize = 1024 * 1024; // 1MB per chunk
const totalChunks = 50; // 50 chunks = ~50MB
const chunk = "x".repeat(chunkSize);
for (let i = 0; i < totalChunks; i++) {
writable.write(chunk);
console.log(`Written chunk ${i + 1} of ${totalChunks}`);
}
// Signal that we're done writing
writable.end();
// Fires after end() is called and all data has been flushed
writable.on("finish", function () {
console.log("All data has been written to the file");
});
// Handle potential errors
writable.on("error", function (err) {
console.error("Error writing to file:", err);
});

In this code snippet:

  • We create a writable stream for the output.txt file using the createWriteStream method.
  • We define a chunk of 1MB (a string of repeated "x" characters) and write it 50 times to create a file of approximately 50MB.
  • We call the write() method to send data to the stream. This method accepts a chunk of data and writes it to the underlying destination.
  • We call the end() method to signal that we're done writing. After calling end(), you cannot write any more data to the stream.
  • We listen for the finish event, which fires after end() has been called and all data has been flushed to the underlying destination.
  • We listen for the error event to handle any problems that might occur during writing.

This example works, but there's a problem: we're writing data as fast as possible without checking if the stream can keep up. What happens if we write data faster than the underlying destination can handle it? This is where backpressure comes in.

Backpressure

Every writable stream has an internal buffer with a size limit called highWaterMark. When you call write(), the data goes into this buffer before being flushed to the underlying destination.

Here's how backpressure works:

  • When the buffer fills up beyond the highWaterMark, write() returns false. This signals that you should stop writing.
  • Once the buffer has been drained (flushed to the destination), the stream emits a drain event. This signals that it's safe to resume writing.

If you ignore backpressure and keep writing when write() returns false, the internal buffer will keep growing, potentially consuming all available memory.

Let's see backpressure in action. We'll set a small highWaterMark of 1KB and write chunks of 10KB to clearly observe the effect:

import { createWriteStream } from "node:fs";
const writable = createWriteStream("output.txt", { highWaterMark: 1024 });
let chunkSize = 10 * 1024;
let totalChunks = 10;
let chunk = "h".repeat(chunkSize);
let chunksCount = 0;
function writeChunk() {
if (chunksCount >= totalChunks) {
writable.end();
return;
}
chunksCount++;
const canContinue = writable.write(chunk);
console.log(`Chunk ${chunksCount}: write() returned ${canContinue}`);
if (!canContinue) {
console.log("Buffer full - waiting for drain event...\n");
return;
}
}
writable.on("drain", function () {
console.log("Buffer drained. Resuming...");
writeChunk();
});
writable.on("finish", function () {
console.log("Finished wriitng to the file.");
});
writable.on("error", function () {
console.error("Error writing to the file.");
});
writeChunk();

In this code snippet:

  • We create a writable stream with a highWaterMark of 1KB, meaning the internal buffer will fill up quickly.
  • We write chunks of 10KB, which is 10 times larger than our buffer limit.
  • The write() method returns a boolean: true if the buffer has room for more data, false if the buffer is full.
  • When write() returns false, we stop writing and wait for the drain event.
  • When the drain event fires, we resume writing by calling writeChunk() again.
  • This cycle continues until all chunks have been written.

When you run this code, you'll see write() returning false repeatedly, followed by drain events. This back-and-forth is backpressure in action—the stream telling your application to slow down and wait.

Implementing writable streams

Just like readable streams, you can create your own custom writable streams. The stream module provides a base Writable class that you can use to implement custom writable streams.

When creating a custom writable stream, you must implement the _write() method. This is the internal method that the stream calls whenever data is written to it. The _write() method receives three parameters:

  • chunk — the data being written to the stream
  • encoding — the encoding of the chunk (if it's a string)
  • callback — a function you must call when you're done processing the chunk

There are two approaches to creating custom writable streams: the inheritance approach (extending the class) and the constructor approach (passing options to the Writable constructor).

Let's understand how both approaches work by creating a simple custom writable stream that converts incoming data to uppercase and logs it to the console. This simulates a destination that transforms data before outputting it.

The inheritance approach

When you extend the Writable class, you implement the _write() method to define how your stream handles incoming data:

import { Writable } from "node:stream";
class UppercaseLogger extends Writable {
_write(
chunk: Buffer,
encoding: string,
callback: (error?: Error | null) => void,
): void {
const text = chunk.toString().toUpperCase();
console.log(`Received: ${text}`);
callback(); // Signal that we're done processing this chunk
}
}
const logger = new UppercaseLogger();
logger.write("hello");
logger.write("world");
logger.end();
logger.on("finish", function () {
console.log("All data has been processed");
});

In this code snippet:

  • We create an UppercaseLogger class that extends the Writable class.
  • We implement the _write() method. We don't call this method ourselves—the stream calls it internally whenever data is written using write().
  • Inside _write(), we convert the chunk to a string, transform it to uppercase, and log it.
  • We must call callback() when we're done processing the chunk. This tells the stream that it can proceed with the next chunk. If an error occurs during processing, we pass it to the callback: callback(new Error("Something went wrong")).
  • We use the stream just like any other writable stream, We call write() to send data and end() to signal we're done.

The constructor approach

Instead of creating a class, you can pass a write function directly to the Writable constructor:

import { Writable } from "node:stream";
const logger = new Writable({
write(
chunk: Buffer,
encoding: string,
callback: (error?: Error | null) => void,
): void {
const text = chunk.toString().toUpperCase();
console.log(`Received: ${text}`);
callback();
},
});
logger.write("hello");
logger.write("world");
logger.end();
logger.on("finish", function () {
console.log("All data has been processed");
});

This does exactly the same thing, but without defining a class. The write function in the options object works just like the _write() method in the inheritance approach.

As with readable streams, the constructor approach is useful for quick, one-off streams, while the inheritance approach is better for reusable stream types with additional methods or complex internal state.

Duplex Streams

A duplex stream is both readable and writable. It combines both capabilities into a single stream, allowing data to flow in both directions.

Common examples of duplex streams include TCP sockets, where you can send and receive data simultaneously, and WebSocket connections for real-time bidirectional communication.

Here's a simple example showing a TCP socket, which is a duplex stream:

import { createServer, Socket } from "node:net";
const server = createServer(function (socket: Socket) {
// Reading from the duplex stream
socket.on("data", function (chunk: Buffer) {
console.log(`Received: ${chunk.toString()}`);
});
// Writing to the duplex stream
socket.write("Hello from server!\n");
});
server.listen(3000);

In this example, the socket is a duplex stream. We can read from it using the data event (just like a readable stream) and write to it using write() (just like a writable stream).

Implementing duplex streams

To create a custom duplex stream, you provide implementations for both _read() and _write() methods. As with the other stream types, there are two approaches: inheritance and constructor.

Let's create a simple duplex stream that generates a greeting on the readable side and logs any data it receives on the writable side.

The inheritance approach

import { Duplex } from "node:stream";
class MyDuplex extends Duplex {
private greeted: boolean;
constructor() {
super({ encoding: "utf-8" });
this.greeted = false;
}
_read(): void {
if (!this.greeted) {
this.push("Hello from the readable side!\n");
this.greeted = true;
} else {
this.push(null);
}
}
_write(
chunk: Buffer,
encoding: string,
callback: (error?: Error | null) => void,
): void {
console.log(`Writable side received: ${chunk.toString().trim()}`);
callback();
}
}
const duplex = new MyDuplex();
// Using the readable side
duplex.on("data", function (chunk: string) {
console.log(`Read: ${chunk.trim()}`);
});
// Using the writable side
duplex.write("Hello from the writable side!");
duplex.end();
duplex.on("finish", function () {
console.log("Writable side finished");
});

The constructor approach

import { Duplex } from "node:stream";
let greeted = false;
const duplex = new Duplex({
encoding: "utf-8",
read(): void {
if (!greeted) {
this.push("Hello from the readable side!\n");
greeted = true;
} else {
this.push(null);
}
},
write(
chunk: Buffer,
encoding: string,
callback: (error?: Error | null) => void,
): void {
console.log(`Writable side received: ${chunk.toString().trim()}`);
callback();
},
});
// Using the readable side
duplex.on("data", function (chunk: string) {
console.log(`Read: ${chunk.trim()}`);
});
// Using the writable side
duplex.write("Hello from the writable side!");
duplex.end();
duplex.on("finish", function () {
console.log("Writable side finished");
});

An important thing to understand: in a duplex stream, the readable and writable sides are independent. Data you write doesn't automatically become data you can read. They're two separate channels that happen to share the same stream object.

Transform Streams

A transform stream is a special kind of duplex stream specifically designed for data transformation. Unlike regular duplex streams where the readable and writable sides are independent, in a transform stream, the data you write goes through a transformation and becomes the data you can read.

Common examples of transform streams include compression (like zlib.createGzip()), encryption, and parsing data from one format to another.

Here's an example using Node.js's built-in gzip compression, which is a transform stream:

import { createReadStream, createWriteStream } from "node:fs";
import { createGzip } from "node:zlib";
const readable = createReadStream("input.txt");
const writable = createWriteStream("input.txt.gz");
const gzip = createGzip();
// Data flows: readable -> gzip (transform) -> writable
readable.pipe(gzip).pipe(writable);
writable.on("finish", function () {
console.log("File compressed successfully");
});

In this example, createGzip() returns a transform stream. Data from the readable stream flows into the transform, gets compressed, and flows out to the writable stream. The pipe() method connects streams together, handling backpressure automatically.

Implementing transform streams

To create a custom transform stream, you implement the _transform() method. This method receives each chunk of data, transforms it, and pushes the result. The _transform() method receives three parameters:

  • chunk — the data being transformed
  • encoding — the encoding of the chunk (if it's a string)
  • callback — a function you must call when you're done transforming the chunk

There's also an optional _flush() method that gets called right before the stream ends. This is useful when your transformation needs to output some final data after all chunks have been processed.

Let's understand both approaches by creating a transform stream that converts text to uppercase.

The inheritance approach

import { Transform } from "node:stream";
class UppercaseTransform extends Transform {
_transform(
chunk: Buffer,
encoding: string,
callback: (error?: Error | null) => void,
): void {
const upperCased = chunk.toString().toUpperCase();
this.push(upperCased);
callback();
}
_flush(callback: (error?: Error | null) => void): void {
this.push("\n[END OF TRANSFORMATION]");
callback();
}
}
const uppercase = new UppercaseTransform();
uppercase.on("data", function (chunk: Buffer) {
console.log(chunk.toString());
});
uppercase.write("hello ");
uppercase.write("world");
uppercase.end();

In this code snippet:

  • We create an UppercaseTransform class that extends the Transform class.
  • We implement the _transform() method, which receives each chunk, converts it to uppercase, and pushes the result using this.push().
  • We implement the _flush() method, which pushes a final message right before the stream ends. This is optional but useful when you need to output something at the very end.
  • We must call callback() in both methods to signal that we're done processing.

The constructor approach

import { Transform } from "node:stream";
const uppercase = new Transform({
transform(
chunk: Buffer,
encoding: string,
callback: (error?: Error | null) => void,
): void {
const upperCased = chunk.toString().toUpperCase();
this.push(upperCased);
callback();
},
flush(callback: (error?: Error | null) => void): void {
this.push("\n[END OF TRANSFORMATION]");
callback();
},
});
uppercase.on("data", function (chunk: Buffer) {
console.log(chunk.toString());
});
uppercase.write("hello ");
uppercase.write("world");
uppercase.end();

Notice that in the constructor approach, the method names are transform and flush (without the underscore prefix), while in the inheritance approach they are _transform and _flush.

The key difference between a transform stream and a regular duplex stream is that in a transform stream, you push data from within the _transform() method—the input and output are connected. In a duplex stream, _read() and _write() are completely independent operations.

Connecting Streams Using Pipes

Node.js streams can be connected using the pipe() method of readable streams. The pipe() method takes data from a readable stream and pumps it into a writable stream, creating a seamless flow of data.

Here's what pipe() does for you:

  • Automatic data flow: Data flows automatically from the source to the destination. There's no need to manually call read() or write().
  • Automatic backpressure handling: When the writable stream's buffer fills up, pipe() automatically pauses the readable stream. When the buffer drains, it resumes reading. You don't have to manage this yourself.
  • Automatic ending: When the readable stream emits an end event, the writable stream is automatically ended. You can disable this behavior by passing { end: false } as an option.

Here's an example that copies a file using pipe():

import { createReadStream, createWriteStream } from "node:fs";
const readable = createReadStream("source.txt");
const writable = createWriteStream("destination.txt");
readable.pipe(writable);
writable.on("finish", function () {
console.log("File copied successfully");
});

You can also chain multiple pipes together. Here's an example that reads a file, compresses it, and writes the compressed data to a new file:

import { createReadStream, createWriteStream } from "node:fs";
import { createGzip } from "node:zlib";
const readable = createReadStream("input.txt");
const gzip = createGzip();
const writable = createWriteStream("input.txt.gz");
readable.pipe(gzip).pipe(writable);
writable.on("finish", function () {
console.log("File compressed successfully");
});

The data flows through the pipeline: readable → gzip → writable. Each connection handles backpressure automatically, ensuring smooth data flow even when processing large files.

The problem with pipe()

While pipe() is convenient, it has a significant problem: errors are not propagated through the pipeline. If an error occurs in any stream, you need to handle it separately for each stream:

import { createReadStream, createWriteStream } from "node:fs";
import { createGzip } from "node:zlib";
const readable = createReadStream("input.txt");
const gzip = createGzip();
const writable = createWriteStream("input.txt.gz");
readable.on("error", function (err) {
console.error("Readable error:", err);
});
gzip.on("error", function (err) {
console.error("Gzip error:", err);
});
writable.on("error", function (err) {
console.error("Writable error:", err);
});
readable.pipe(gzip).pipe(writable);

This is verbose and error-prone. But there's a bigger problem: when an error occurs in one stream, that stream is simply unpiped from the pipeline—it's not destroyed. This can leave dangling resources and leak memory. For example, if the gzip stream fails, the readable stream might keep reading data that goes nowhere, and file handles might not be properly closed.

The node:stream/promises module exports a pipeline() function that solves these problems. It provides centralized error handling and ensures all streams are properly destroyed when the pipeline finishes—whether it succeeds or fails.

Here's the previous example rewritten using pipeline():

import { createReadStream, createWriteStream } from "node:fs";
import { pipeline } from "node:stream/promises";
import { createGzip } from "node:zlib";
async function compressFile(): Promise<void> {
const readable = createReadStream("input.txt");
const gzip = createGzip();
const writable = createWriteStream("input.txt.gz");
try {
await pipeline(readable, gzip, writable);
console.log("File compressed successfully");
} catch (err) {
console.error("Pipeline failed:", err);
}
}
compressFile();

In this code snippet:

  • We import pipeline from node:stream/promises, which gives us the promise-based version of the function.
  • The pipeline() function accepts any number of streams and connects them in order.
  • It returns a promise that resolves when the pipeline completes successfully, or rejects if any stream emits an error.
  • We use a try...catch block to handle errors from any stream in the pipeline. No need to attach error handlers to each stream individually.
  • Most importantly, when the pipeline ends (whether successfully or due to an error), all streams are properly destroyed. File handles are closed, resources are released, and there are no memory leaks.

The pipeline() function is the recommended way to connect streams in production code. It's cleaner, safer, and handles edge cases that pipe() doesn't.

TAGS:

Node.js
A Complete Guide to Node.js Streams