The previous chapter discussed important concepts of distributed stream processing, such as parallelization, time, and state. In this chapter we give a high-level introduction to Flink’s architecture and describe how Flink addresses the aspects of stream processing that we discussed before. In particular, we explain Flink’s process architecture and the design of its networking stack. We show how Flink handles time and state in streaming applications and discuss its fault tolerance mechanisms. This chapter provides relevant background information to successfully implement and operate advanced streaming applications with Apache Flink. It will help you to understand Flink’s internals and to reason about the performance and behavior of streaming applications.
Flink is a distributed system for stateful parallel data stream processing. A Flink setup consists of multiple processes that run distributed across multiple machines. Common challenges that distributed systems need to address are allocation and management of compute resources in a cluster, process coordination, durable and available data storage, and failure recovery.
Flink does not implement all the required functionality by itself. Instead, it focuses on its core function - distributed data stream processing - and leverages existing cluster infrastructure and services. Flink is tightly integrated with cluster resource managers, such as Apache Mesos, YARN, and Kubernetes, but can also be configured to run as a stand-alone cluster. Flink does not provide durable, distributed storage. Instead it supports distributed file systems like HDFS or object stores such as S3. For leader election in highly-available setups, Flink depends on Apache ZooKeeper.
In this section we describe the different components that a Flink setup consists of and discuss their responsibilities and how they interact with each other to execute an application. We present two different styles of deploying Flink applications and discuss how tasks are distributed and executed. Finally, we explain how Flink’s highly-available mode works.
A Flink setup consists for four different components that work together to execute streaming applications. These components are a JobManager, a ResourceManager, a TaskManager, and a Dispatcher. Since Flink is implemented in Java and Scala, all components run on a Java Virtual Machine (JVM). We discuss the responsibilities of each component and how it interacts with the other components in the following.
Please note that Figure 3-1 is a high-level sketch to visualize the responsibilities and interactions of the components. Depending on the the environment (YARN, Mesos, Kubernetes, stand-alone cluster), some steps can be omitted or components might run in the same process. For instance, in a stand-alone setup, i.e., a setup without a resource provider, the ResourceManager can only distribute the slots of manually started TaskManagers and cannot start new TaskManagers. In Chapter 9, we will discuss how to setup and configure Flink for different environments.
Flink applications can be deployed in two different styles.
The framework style is follows the traditional approach of submitting an application (or query) via a client to a running service. In the library mode, there is no Flink service continuously running. Instead, Flink is bundled as a library together with the application in a container image. This deployment mode is also common for microservice architectures. We discuss the topic of application deployment in more detail in Chapter 10.
A TaskManager can execute several tasks at the same time. These tasks can be of the same operator (data parallelism), a different operator (task parallelism), or even from a different application (job parallelism). A TaskManager provides a certain number of processing slots to control the number of tasks that it can concurrently execute. A processing slot is able to execute one slice of an application, i.e., one task of each operator of the application. Figure 3-2 visualizes the relationship of TaskManagers, slots, tasks, and operators.
On the left hand side you see a JobGraph - the non-parallel representation of an application - consisting of five operators. Operators A and C are sources and operator E is a sink. Operators C and E have a parallelism of two. The other operators have a parallelism of four. Since the maximum operator parallelism is four, the application requires at least four available processing slots to be executed. Given two TaskManagers with two processing slots each, this requirement is fulfilled. The JobManager parallelizes the JobGraph into an ExecutionGraph and assigns the tasks to the four available slots. The tasks of the operators with a parallelism of four are assigned to each slot. The two tasks of operators C and E are assigned to slots 1.1 and 2.1 and slots 1.2 and 2.2, respectively1. Scheduling tasks as slices to slots has the advantage that many tasks are co-located on the TaskManager which means that they can efficiently exchange data without accessing the network.
A TaskManager executes its tasks multi-threaded in the same JVM process. Threads are more lightweight than individual processes and have lower communication costs but do not strictly isolate tasks from each other. Hence, a single misbehaving task can kill the whole TaskManager process and all tasks which run on the TaskManager. Therefore, it is possible to isolate applications across TaskManagers, i.e., a TaskManager runs only tasks of one application. By leveraging thread-parallelism inside of a TaskManager and the option to deploy several TaskManager processes per host, Flink offers a lot of flexibility to trade off performance and resource isolation when deploying applications. We will discuss the configuration and setup of Flink clusters in detail in Chapter 9.
Streaming applications are typically designed to run 24/7. Hence, it is important that their execution does not stop even if an involved process fails. Recovery from failures consists of two aspects, first restarting failed processes and second restarting the application and recovering its state. In this section, we explain how Flink restarts failed processes. Restoring the state of an application is discussed in a later section of this chapter.
As discussed before, Flink requires a sufficient amount of processing slots in order to execute all tasks of an application. Given a Flink setup with four TaskManagers that provide two slots each, a streaming application can be executed with a maximum parallelism of eight. If one of the TaskManagers fails, the number of available slots is reduced to six. In this situation, the JobManager will ask the ResourceManager to provide more processing slots. If this is not possible, for example because the application runs in a stand-alone cluster, the JobManager scales the application down and executes it on fewer slots until more slots become available.
A more challenging problem than TaskManager failures are JobManager failures. The JobManager controls the execution of a streaming application and keeps metadata about its execution, such as pointers to completed checkpoints. A streaming application cannot continue processing if the associated JobManager process disappears which makes the JobManager a single-point-of-failure in Flink. To overcome this problem, Flink features a high-availability mode that migrates the responsibility and metadata for a job to another JobManager in case that the original JobManager disappears.
Flink’s high-availability mode is based on Apache ZooKeeper, a system for distributed services that require coordination and consensus. Flink uses ZooKeeper for leader election and as a highly-available and durable data store. When operating in high-availability mode, the JobManager writes the JobGraph and all required metadata such as the application’s JAR file into a remote persistent storage system. In addition, the JobManager writes a pointer to the storage location into ZooKeeper’s data store. During the execution of an application, the JobManager receives the state handles (storage locations) of the individual task checkpoints. Upon the completion of a checkpoint, i.e., when all tasks have successfully written their state into the remote storage, the JobManager writes the state handles to the remote storage and a pointer to this location to ZooKeeper. Hence, all data that is required to recover from a JobManager failure is stored in the remote storage and ZooKeeper holds pointers to the storage locations. Figure 3-3 illustrates this design.
When a JobManager fails, all tasks that belong to its application are automatically cancelled. A new JobManager that takes over the work of the failed master performs the following steps.
When running an application as a library deployment in a container environment, such as Kubernetes, failed JobManager or TaskManager containers can be automatically restarted. When running on YARN or on Mesos, Flink’s remaining processes trigger the restart of JobManager or TaskManager processes. Flink does not provide tooling to restart failed processes when running in a stand-alone cluster. Hence, it can be useful to run stand-by JobManagers and TaskManager that can take over the work of failed processes. We will discuss the configuration of highly available Flink setups later in Chapter 9.
The tasks of a running application are continuously exchanging data. The TaskManagers take care of shipping data from sending tasks to receiving tasks. The network component of a TaskManager collect records in buffers before they are shipped, i.e., records are not shipped one-by-one but batched into buffers. This technique is fundamental to effectively utilize the networking resource and achieve high throughput. The mechanism is similar to buffering techniques used in networking or disk IO protocols. Note shipping records in buffers does imply that Flink’s processing model is based on micro-batches.
Each TaskManager has a pool of network buffers (by default 32KB in size) which are used to send and receive data. If the sender and receiver tasks run in separate TaskManager processes, they communicate via the network stack of the operating system. Streaming applications need to exchange data in a pipelined fashion, i.e., each pair of TaskManagers maintains a permanent TCP connection to exchange data2. In case of a shuffle connection pattern, each sender task needs to be able to send data to each receiving task. A TaskManager needs one dedicated network buffer for each receiving task that any of its tasks need to send data to. Once a buffer is filled, it is shipped over the network to the receiving task. On the receiver side, each receiving task needs one network buffer for each of its connected sending tasks. Figure 3-4 visualizes this architecture.
The figure shows four sender and four receiver tasks. Each sender task has four network buffers to send data to each receiver task and each receiver task has four buffers to receive data. Buffers which need to be sent to the other TaskManager are multiplexed over the same network connection. In order to enable a smooth pipelined data exchange, a TaskManager must be able to provide enough buffers to serve all outgoing and incoming connections concurrently. In case of a shuffle or broadcast connection, each sending task needs a buffer for each receiving task, i.e, the number of required buffers is quadratic to the parallelism of the involved operators.
If the sender and receiver task run in the same TaskManager process, the sender task serializes the outgoing records into a byte buffer and puts the buffer into a queue once it is filled. The receiving task takes the buffer from the queue and deserializes the incoming records. Hence, no network communication is involved. Serializing records between TaskManager-local tasks has the advantage that it decouples the tasks and allows to use mutable objects in tasks which can considerably improve the performance because it reduces object instantiations and garbage collection. Once an object has been serialized, it can be safely modified.
On the other hand, serialization can cause significant computational overhead. Therefore, Flink can - under certain conditions - chain multiple DataStream operators into a single task. Operators in the same task communicate by passing objects through nested function calls which avoids serialization. The concept of operator chaining is discussed in more detail in Chapter 10.
Sending individual records over a network connection is inefficient and causes significant overhead. Buffering is a mandatory technique to fully utilize the bandwidth of network connections. In the context of stream processing, one disadvantage of buffering is that it adds latency because records are collected in a buffer instead of being immediately shipped. If a sender task only rarely produces records for a specific receiving task, it might take a long time until the respective buffer is filled and shipped. Because this would cause high processing latencies, Flink ensures that each buffer is shipped after a certain period of time regardless of how much it is filled. This timeout can be interpreted as an upper bound for the latency added by a network connection. However, the threshold does not serve as a strict latency SLA for the job as a whole because a job might involve multiple network connections and it does also not account for delays caused by the actual processing.
Streaming applications that ingest streams with high volume can easily come to a point where a task is not able to process its input data at the rate at which it arrives. This might happen if the volume of an input stream is too high for the amount of resources allocated to a certain operator or if the input rate of an operator significantly varies and causes spikes of high load. Regardless of the reason why an operator cannot handle its input, this situation should never be a reason for a stream processor to terminate an application. Instead the stream processor should gracefully throttle the rate at which a streaming application ingests its input to the maximum speed at which the application can process the data. With a decent monitoring infrastructure in place, a throttling situation can be easily detected and usually resolved by adding more compute resources and increasing the parallelism of the bottleneck operator. The described flow control technique is called backpressure and an important feature of stream processors.
Flink naturally supports backpressure due to the design of its network layer. Figure 3-5 illustrates the behavior of the network stack when a receiving task is not able to process its input data at the rate at which it is emitted by the sender task.
The figure shows a sender and a receiver task running on different machines.
In Chapter 2, we highlighted the importance of time semantics for stream processing applications and explained the differences between processing-time and event-time. While processing-time is easy to understand because it is based on the local time of the processing machine, it produces somewhat arbitrary, inconsistent, and non-reproducible results. In contrast, event-time semantics yield reproducible and consistent results which is a hard requirement for many stream processing use cases. However, event-time applications require some additional configuration compared to applications with processing-time semantics. Also the internals of a stream processor that supports event-time are more involved than the internals of a system that purely operates in processing-time.
Flink provides intuitive and easy-to-use primitives for common event-time processing operations but also exposes expressive APIs to implement more advanced event-time applications with custom operators. For such advanced applications, a good understanding of Flink’s internal time handling is often helpful and sometimes required. The previous chapter introduced two concepts that Flink leverages to provide event-time semantics: record timestamps and watermarks. In the following we will describe how Flink internally implements and handles timestamps and watermarks to support streaming applications with event-time semantics.
All records that are processed by a Flink event-time streaming application must have a timestamp. A timestamp associates the record with a specific point in time. Usually, the timestamp references the point in time at which the event that is encoded by the record happened. However, applications can freely choose the meaning of the timestamps as long as the timestamps of the stream records are roughly ascending as the stream is advancing. As motivated in Chapter 2, a certain degree of timestamp out-of-orderness is given in basically all real-world use cases.
When Flink processes a data stream in event-time mode, it evaluates time-based operators based on the timestamps of records. For example, a time-window operator assigns records to windows according to their associated timestamp. Flink encodes timestamps as 16-byte long values and attaches them as metadata to records. Its built-in operators interpret the long value as a Unix timestamp with millisecond precision, i.e., the number of milliseconds since 1970-01-01-00:00:00.000. However, custom operators can have their own interpretation and, for example, adjust the precision to microseconds.
In addition to record timestamps, a Flink event-time application must also provide watermarks. Watermarks are used to derive the current event-time at each task in an event-time application. Time-based operators use this time to trigger computations and make progress. For example, a time-window operator finalizes a window computation and emits the result when the operator event-time passes the window’s end boundary.
In Flink, watermarks are implemented as special records holding a timestamp long value. Watermarks flow in a stream of regular records with annotated timestamps as Figure 3-6 shows.
Watermarks have two basic properties.
The second property is used to handle streams with out-of-order record timestamps, such as the records with timestamps 3 and 5 in Figure 3-6. Tasks of time-based operators collect and process records with possibly unordered timestamps and finalize a computation when their event-time clock, which is advanced by received watermarks, indicates that no more records with relevant timestamps have to be expected. When a task receives a record that violates the watermark property and has smaller timestamps than a previously received watermark, it might be the case that the computation it would belong to has already been completed. Such records are called late records. Flink provides different mechanisms to deal with late records which are discussed in Chapter 6.
A very interesting property of watermarks is that they allow an application to control result completeness and latency. Watermarks that are very tight, i.e., close to the record timestamps, result in low processing latency because a task will only briefly wait for more records to arrive before finalizing a computation. At the same time, the result completeness might suffer because more records might not be included in the result and would be considered as late records. Inversely, very wide watermarks increase processing latency but improve result completeness.
In this section, we discuss how operators process watermark records. Watermarks are implemented in Flink as special records that are received and emitted by operator tasks. Tasks have an internal time service that maintains timers. A timer can be registered at the timer service to perform a computation at a specific point in time in the future. For example, a time-window task registers a timer for the ending time of each of its active windows in order to finalize a window when the event-time passed the window’s end boundary.
When a task receives a watermark, it performs the following steps.
The time service of the task identifies all timers with a time smaller than the updated event-time. The task invokes for each expired timer a call-back function that can perform a computation and emit records.
The task emits a watermark with the updated event-time.
Flink restricts the access to timestamps or watermarks through the DataStream API. Except for the ProcessFunction, functions are not able to read or modify record timestamps or watermarks. The ProcessFunction can read the timestamp of a currently processed record, request the current event-time of the operator, and register timers4. None of the functions exposes an API to set the timestamps of emitted records, manipulate the event-time clock of a task, or emit watermarks. Instead, time-based DataStream operator tasks internally set the timestamps of emitted records to ensure that they are properly aligned with the emitted watermarks. For instance, a time-window operator task attaches the end time of a window as timestamp to all records emitted by the window computation before it emits the watermark with the timestamp that triggered the computation of the window.
We explained before that a task emits watermarks and updates its event-time clock when it receives a new watermark. How this is actually done deserves a detailed discussion. As discussed in Chapter 2, Flink processes a data stream in parallel by partitioning the stream and processing each partition by a separate operator task. A partition is a stream of timestamped records and watermarks. Depending on how an operator is connected with its predecessor or successor operators, the tasks of the operator can receive records and watermarks from one or more input partitions and emit records and watermarks to one or more output partitions. In the following we describe in detail how a task emits watermarks to multiple output tasks and how it computes its event-time clock from the watermarks it received from its input tasks.
A task maintains for each input partition a partition watermark. When it receives a watermark from a partition, it updates the respective partition watermark to be the maximum of the received watermark and the current partition watermark. Subsequently the task updates its event-time clock to be the minimum of all partition watermarks. If the event-time clock advances, the task processes all triggered timers and finally broadcasts its new event-time to all downstream tasks by emitting a corresponding watermark to all connected output partitions.
Figure 3-7 visualizes how a task with four input partitions and three output partitions receives watermarks, updates its partition watermarks and event-time clock, and emits watermarks.
The tasks of operators with two or more input streams such as Union or CoFlatMap operators (see Chapter 5) also compute their event-time clock as the minimum of all partition watermarks, i.e., they do not distinguish between partition watermarks of different input streams. Consequently, records of both inputs are processed based on the same event-time clock.
The watermark handling and propagation algorithm of Flink ensures that operator tasks emit properly aligned timestamped records and watermarks. However, it relies on the fact that all partitions continuously provide increasing watermarks. As soon as one partition does not advance its watermarks or becomes completely idle and does not ship any records or watermarks, the event-time clock of a task will not advance and the timers of the task will not trigger. This situation is problematic for time-based operators that rely on an advancing clock to perform computations and clean up their state. Consequently, the processing latencies and state size of time-based operators can significantly increase if a task does not receive new watermarks from all input tasks in regular intervals.
A similar effect appears for operators with two input streams whose watermarks significantly diverge. The event-time clocks of a task with two input streams will correspond to the watermarks of the slower stream and usually the records or intermediate results of the faster stream are buffered in state before the event-time clock allows to process them.
So far we have explained what timestamps and watermarks are and how they are internally handled by Flink. However, we have not discussed yet where they originate from. Timestamps and watermarks are usually assigned and generated when a stream is ingested by a streaming application. Because the choice of the timestamp is application-specific and the watermarks depend on the timestamps and characteristics of the stream, applications have to explicitly assign timestamps and generate watermarks. A Flink DataStream application can assign timestamps and generate watermarks to a stream in three ways.
AssignerWithPeriodicWatermarks that extracts a timestamp from each record and is periodically queried for the current watermark. The extracted timestamps are assigned to the respective record and the queried watermarks are ingested into the stream. This function will be discussed in Chapter 6.AssignerWithPeriodicWatermarks function, this function can, but does not need to, extract a watermark from each record. The function is called AssignerWithPunctuatedWatermarks and can be used to generate watermarks that are encoded in the input records. This function will be discussed in Chapter 6 as well.User-defined timestamp assignment functions to are usually applied as closed to a source operator as possible because it is usually easier to reason about the out-of-orderness of timestamps before the stream was processed by a operator. This is also the reason why it is often not a good idea to override existing timestamps and watermarks in the middle of a streaming applications, although this is possible with the user-defined functions.
In Chapter 2 we pointed out that most streaming applications are stateful. Many operators continuously read and update some kind of state such as records collected in a window, reading positions of an input source, or custom, application-specific operator state like machine learning models. Flink treats all state - regardless of built-in or user-defined operators - the same. In this section we discuss the different types of state that Flink supports. We explain how state is stored and maintained by state backends and how stateful applications can be scaled by redistributing state.
In general, all data which are maintained by task and which are used to compute the results of the function belong to the state of the task. You can think of state as any local or instance variable that is accessed by a task’s business logic. Figure 3-8 visualizes the interaction of a task and its state.
A task receives some input data. While processing the data, the task can read and update its state and compute its result based on its input data and state. A simple example is a task that continuously counts how many records it receives. When the task receives a new record, it accesses the state to get the current count, increments the count, updates the state, and emits the new count.
The application logic to read from and write to state is often straightforward. However, efficient and reliable management of state is more challenging. This includes handling of very large state, possibly exceeding memory, and ensuring that no state is lost in case of failures. All issues related to state consistency, failure handling, and efficient storage and retrieval are the responsibility and taken care of by Flink such that developers can focus on the logic of their applications.
In Flink, state is always associated with a specific operator. In order to make Flink’s runtime aware of the state of an operator, the operator needs to register its state. There are two types of state, Operator State and Keyed State, that are accessible from different scopes and which are discussed in the following sections.
Operator state is scoped to an operator task. This means that all records which are processed by the same parallel task have access to the same state. Operator state cannot be accessed by another task of the same or a different operator. Figure 3-9 visualizes how tasks access operator state.
Flink offers three primitives for operator state.
Union List State represents state as a list of entries as well. It differs from regular list state in how it is restored in case of a failure or if an application is started from a savepoint. We discussed this difference later in this section.
Broadcast State is designed for the special case where the state of each task of an operator is identical. This property can be leveraged during checkpoints and when rescaling an operator. Both aspects are discussed in later sections of this chapter.
Keyed state is scoped to a key that is defined on the records of an operator’s input stream. Flink maintains one state instance per key value and partitions all records with the same key to the operator task that maintains the state for this key. When a task processes a record, it automatically scopes the state access to the key of the current record. Consequently, all records with the same key access the same state. Figure 3-10 shows how tasks interact with keyed state.
You can think of keyed state as a key-value map that is partitioned (or sharded) on the key across all parallel tasks of an operator. Flink provides a different primitives for keyed state that determine the type of the value that is stored for each key in this distribute map. We will briefly discuss the most common keyed state primitives.
State primitives expose the structure of the state to Flink and enable more efficient state accesses.
A task of a stateful operator commonly reads and updates its state for each incoming record. Because efficient state access is crucial to process records with low latency, each parallel task locally maintains its state to ensure local state accesses. How exactly the state is stored, accessed, and maintained is determined by a pluggable component that is called state backend. A state backend is responsible for mainly two aspects, local state management and checkpointing state to a remote location.
For local state management, a state backend ensures that keyed state is correctly scoped to the current key and stores and accesses all keyed state. Flink provides state backends that manage keyed state as objects stored in in-memory data structures on the JVM heap. Another state backend serializes state objects and puts them into RocksDB which writes them local hard disks. While the first option gives very fast state accesses, it is bound to the size of the memory. Accessing state stored by the RocksDB state backend is slower but its state may grow very large.
State checkpointing is important because Flink is a distributed system and state is only locally maintained. A TaskManager process (and with it all tasks running on it) may fail at any point in time such that its storage must be considered as volatile. A state backends takes care of checkpointing the state of a task to a remote and persistent storage. The remote storage for checkpointing can be a distributed file system or a database system. State backends differ in how the state is checkpointed. For instance the RocksDB state backend supports asynchronous and incremental checkpoints which significantly reduces the checkpointing overhead for very large state sizes.
We will discuss the different state backends and their pros and cons in more detail in Chapter 8.
A common requirement for streaming applications is to adjust the parallelism of operators due to increasing or decreasing input rates. While scaling stateless operators is trivial, changing the parallelism of stateful operators is much more challenging because their state needs to be re-partitioned and assigned to more or fewer parallel tasks. Flink supports four patterns to scale different types of state.
Operators with keyed state are scaled by re-partitioning keys to fewer or more tasks. However, to improve the efficiency of the necessary state transfer between tasks, Flink does not re-distributed individual keys. Instead, Flink organizes keys in so-called Key Groups. A key group is a partition of keys and Flink’s unit to assign keys to tasks. Figure 3-11 visualizes how keyed state is repartitioned in key groups.
Operators with operator list state are scaled by redistributing the list entries. Conceptually, the list entries of all parallel operator tasks are collected and evenly re-distributed to a smaller or larger number of tasks. If there are fewer list entries than the new parallelism of an operator, some tasks will not receive state and have to rebuilt it from scratch. Figure 3-12 shows the redistribution of operator list state.
Operators with operator union list state are scaled by broadcasting the full list of state entries to each task. The task can then choose which entries to use and which to discard. Figure 3-13 shows how operator union list state is redistributed.
Operators with operator broadcast state are scaled up by copying the state to new tasks. This works because broadcasting state ensures that all tasks have the same state. In case of down scaling, the surplus tasks are simply canceled since state is already replicated and will not be lost. Figure 3-14 visualizes the redistribution of operator broadcast state.
Flink is a distributed data processing system and has as such to deal with failures such as killed processes, failing machines, and interrupted network connections. Since tasks maintain their state locally, Flink has to ensure that this state does not get lost and remains consistent in case of a failure.
In this section, we present Flink’s lightweight checkpointing and recovery mechanism to guarantee exactly-once state consistency. We also discuss Flink’s unique savepoint feature, a “swiss army knife”-like tool that addresses many challenges of operating streaming applications.
Flink’s recovery mechanism is based on consistent checkpoints of application state. A consistent checkpoint of a stateful streaming application is a copy of the state of each of its tasks at a point when all tasks have processed exactly the same input. What this means can be explained by going through the steps of a naive algorithm that takes a consistent checkpoint of an application.
Note that Flink does not implement this naive algorithm. We will present Flink’s more sophisticated checkpointing algorithm later in this section.
Figure 3-15 shows a consistent checkpoint of a simple example application.
The application has a single source task that consumes a stream of increasing numbers, i.e., 1, 2, 3, and so on. The stream of numbers is partitioned into a stream of even and odd numbers. A sum operator computes with two tasks the running sums of all even and odd numbers. The source task stores the current offset of its input stream as state, the sum tasks persist the current sum value as state. In Figure 3-15, Flink took a checkpoint when the input offset was 5, and the sums were 6 and 9.
During the execution of a streaming application, Flink periodically takes consistent checkpoints of the applications state. In case of a failure, Flink uses the latest checkpoint to consistently restore the applications state and restarts the processing. Figure 3-16 visualizes the recovery process.
An application is recovered in three steps.
Restart all failed tasks.
Reset the state of the whole application to the latest checkpoint, i.e., resetting the state of each task.
Resume the processing of all tasks.
This checkpointing and recovery mechanism is able to provide exactly-once consistency for application state, given that all operators checkpoint and restore all of their state and that all input streams are reset to the position up to which they were consumed when the checkpoint was taken. Whether a data source can reset its input stream depends on its implementation and the external system or interface from which the stream is consumed. For instance, event logs like Apache Kafka can provide records from a previous offset of the stream. In contrast, a stream consumed from a socket cannot be reset because sockets discard data once it has been consumed. Consequently, an application can only be operated under exactly-once state consistency if all input streams are consumed by resettable data sources.
After an application is restarted from a checkpoint, its internal state is exactly the same as when the checkpoint was taken. It then starts to consume and process all data that was processed between the checkpoint and the failure. Although this means that some messages are processed twice (before and after the failure) by Flink operators, the mechanism still achieves exactly-once state consistency because the state of all operators was reset to a point that had not seen this data yet.
We also need to point out that Flink’s checkpointing and recovery mechanism only resets the internal state of a streaming application. Once the recovery completed, some records have been processed more than once. Depending on the sink operators of an applications, it might happen that some result records are emitted multiple times to downstream systems, such as an event log, a file system, or a database. For selected systems, Flink provides sink functions that feature exactly-once output for example by committing emitted records on checkpoint completion. Another approach that works for many common sink systems are idempotent updates. The challenge of end-to-end exactly-once applications and approaches to address it are discussed in detail in Chapter 7.
Flink’s recovery mechanism is based on consistent application checkpoints. The naive approach to take a checkpoint from a streaming application, i.e, to pause, checkpoint, and resume the application, suffers from its “stop-the-world” behavior which is not acceptable for applications that have even moderate latency requirements. Instead, Flink implements an algorithm which is based on the well-known Chandy-Lamport algorithm for distributed snapshots. The algorithm does not pause the complete application but decouples the checkpointing of individual tasks such that some tasks continue processing while others persist their state.In the following we explain how this algorithm works.
Flink’s checkpointing algorithm is based on a special type of record that is called checkpoint barrier. Similar to watermarks, checkpoint barriers are injected by source operators into the regular stream of records and cannot overtake or be passed by any other record. A checkpoint barrier carries a checkpoint ID to identify the checkpoint it belongs to and logically splits a stream into two parts. All state modifications due to records that precede a barrier are included in the checkpoint and all modifications due to records that follow the barrier are included in a later checkpoint.
We use an example of a simple streaming application to explain the algorithm step-by-step. The application consists of two source tasks which consume a stream of increasing numbers. The output of the source tasks is partitioned into streams of even and odd numbers. Each partition is processed by a task that computes the sum of all received number and forwards the updated sum to a sink. The application is depicted in Figure 3-17.
A checkpoint is initiated by the JobManager by sending a message with a new checkpoint ID to each data source task as shown in Figure 3-18.
When a data source task receives the message, it pauses emitting records, triggers a checkpoint of its local state at the state backend, and broadcasts checkpoint barriers with the checkpoint ID via all outgoing stream partitions. The state backend notifies the task once the task checkpoint is complete and the task acknowledges the checkpoint at the JobManager. After all barriers are sent out, the source continues its regular operations. By injecting the barrier into its output stream, the source function defines the stream position on which the checkpoint is taken. Figure 3-19 shows the streaming application after both source tasks checkpointed their local state and emitted checkpoint barriers.
The checkpoint barriers emitted by the source tasks are shipped to the subsequent tasks. Similar to watermarks, checkpoint barriers are broadcasted to all connected parallel tasks. Checkpoint barriers must be broadcasted to ensure that each task receives a checkpoint from each of its input streams, i.e., all downstream connected tasks. When a task receives a barrier for a new checkpoint, it waits for the arrival of all barriers for the checkpoint. While it is waiting, it continues processing records from stream partitions that did not provided a barrier yet. Records which arrive via partitions that forwarded a barrier already must not be processed and need to be buffered. The process of waiting for all barriers to arrive is called barrier alignment and depicted in Figure 3-20.
As soon as all barriers have arrived, the task initiates a checkpoint at the state backend and broadcasts the checkpoint barrier to all of its downstream connected tasks as shown in Figure 3-21.
Once all checkpoint barriers have been emitted, the task starts to process the buffered records. After all buffered records have been emitted, the task continues processing its input streams. Figure 3-22 shows the application at this point.
Eventually, the checkpoint barriers arrive at a sink task. When a sink task receives a barrier, it performs a barrier alignment, checkpoints its own state, and acknowledges the reception of the barrier to the JobManager. The JobManager records the checkpoint of an application as completed once it received a checkpoint acknowledgement from all tasks of the application. Figure 3-23 shows the final step of the checkpointing algorithm. The completed checkpoint can be used to recover the application from a failure as described before.
The discussed algorithm produces consistent distributed checkpoints from streaming applications without stopping the whole application. However, it has two properties that can increase the latency of an application. Flink’s implementation features tweaks that improve the performance of the application under certain conditions.
The first spot is the process of checkpointing the state of a task. During this step, a task is blocked and its input is buffered. Since operator state can become quite large and checkpointing means sending the data over the network to a remote storage system, taking a checkpoint can easily take several seconds, much too long for latency sensitive applications. In Flink’s design it is the responsibility of the state backend to perform a checkpoint. How exactly the state of a task is copied depends on the implementation of the state backend and can be optimized. For example, the RocksDB state backend supports asynchronous and incremental checkpoints. When a checkpoint is triggered, the RocksDB state backend, locally snapshots all state modifications since the last checkpoint (a very lightweight and fast operation due to RocksDBs design) and immediately returns such that the task can continue processing. A background thread asynchronously copies the local snapshot to the remote storage and notifies the task once it completed the checkpoint. Asynchronous checkpointing significantly reduces the latency of copying state to remote storage. Incremental checkpointing reduces the amount of data to transfer.
Another reason for increased latency can result from the record buffering during the barrier alignment step. For applications that require consistently very low latency and can tolerate at-least-once state guarantees, Flink can be configured to process all arriving records during buffer alignment instead of buffering those for which the barrier has already arrived. Once all barriers for a checkpoint have arrived, the operator checkpoints the state, which might now also include modifications cause by records that would usually belong to the next checkpoint. In case of a failure, these records will be processed again which means that the checkpoint provides at-least-once instead of exactly-once consistency guarantees.
Flink’s recovery algorithm is based on state checkpoints. Checkpoints are periodically taken and automatically discarded when a new checkpoint completes. Their sole purpose is to ensure that in case of a failure an application can be restarted without losing state. However, consistent snapshots of the state of an application can be used for many more purposes.
One of Flink’s most valuable and unique features are savepoints. In principle, savepoints are checkpoints with some additional metadata and are created using the same algorithm as checkpoints. Flink does not automatically take a savepoint, but a user (or external scheduler) has to trigger its creation. Flink does also not automatically clean up savepoints.
Given an application and a compatible savepoint, you can start the application from the savepoint which will initialize state of the application to the state of the savepoint and run the application from the point at which the savepoint was taken. While this sounds basically the same as recovering an application from a failure using a checkpoint, failure recovery is actually just a special case because it starts the same application with the same configuration on the same cluster. Starting an application from a savepoint allows you to do much more.
Since savepoints are such a powerful feature, many users periodically take savepoints to be able to go back in time. One of the most interesting applications of savepoints we have seen in the wild is to migrate streaming applications to the data center that provides the lowest instance prices.
In this chapter we have discussed Flink’s high-level architecture and the internals of its networking stack, event-time processing mode, state management, and failure recovery mechanism. You will find knowledge about these internals helpful when designing advanced streaming applications, setting up and configuring clusters, and operating streaming applications as well as reasoning about their performance.
1 Chapter 10 will discuss how the DataStream API allows to control the assignment and grouping of tasks.
2 Batch applications can - in addition to pipelined communication - exchange data by collecting outgoing data at the sender. Once the sender task completes, the data is sent as a batch over a temporary TCP connection to the receiver
3 A TaskManager ensures that each task has at least one incoming and one outgoing buffer and respects additional buffer assignment constraints to avoid deadlocks and maintain smooth communication.
4 The ProcessFunction is discussed in more detail in Chapter 6.