Savepoints: Turning Back Time
This post is the first in a series where the data Artisans team will highlight some of Apache Flink’s® core features. By Fabian Hueske (@fhueske) and Mike Winters (@wints)
Stream processing is commonly associated with ‘data in motion’, powering systems that make sense of and respond to data in nearly the same instant it’s created. The most frequently discussed streaming topics, such as latency and throughput or watermarks and handling of late data, focus on the present rather than the past.
In reality, though, there are a number of cases where you’ll need to reprocess data that your streaming application has already processed before. Some examples include:
- Deployment of a new version of your application with a new feature, a bug fix, or a better machine learning model
- A/B testing different versions of an application using the same source data streams, starting the test from the same point in time without sacrificing prior state
- Evaluating and carrying out the migration of applications to newer releases of the processing framework or to a different cluster
What exactly do you mean by “reprocessing”?To make sure the idea of data reprocessing is clear, let’s talk through a business case where you might need to reprocess data. Imagine a social media company that rolls out a paid or promoted posts feature in addition to its standard ‘organic’ posts. The company’s end users have access to a simple, Flink-powered dashboard showing how many times all of their posts, both organic and paid, have been viewed, clicked, etc. A few weeks later, it becomes clear (based on user feedback) that the dashboard would be a lot more helpful if it split out organic vs. paid post data. To make this happen, it’s necessary to go back in time to when the paid post feature was released and then reprocess all post data from that point forward, this time separating views and interactions for paid vs. organic posts. It’d be a big burden to reprocess all of historical data starting from when the company was founded, so being able to reprocess from the point the paid post feature was rolled out while maintaining state on earlier computations is crucial. So when we use the word ‘reprocessing’, we’re talking about returning to a prior, consistent state of the system (as defined by the developer and not necessarily the beginning of the stream) and then continuing to process again from that state, probably after having made a change to your Flink program. Good news for all of our readers out there: reprocessing as defined above comes for free in Flink using a feature called savepoints. When we say ‘comes for free’, we mean that as long as your program is fault tolerant and able to recover from a failure, you’ll be able to create a savepoint and reprocess data in Flink with almost zero extra setup work.
Savepoints in a nutshellPut simply, a savepoint of a Flink application is a globally consistent snapshot of both:
- the positions of all data sources
- the state of all (parallel) operators
This sounds great! What do I have to do?Not much! Actually, all applications that support failure recovery also automatically support savepoints. Hence, most programs written for stateful computations already meet the required criteria and if they don’t, they can be quickly updated so that they have:
- Checkpointing enabled: There’s not really a case where we’d recommend building a Flink application without checkpointing enabled–and adding checkpointing to your Flink program only requires one extra line of code.
- Resettable data sources (e.g., Apache Kafka, Amazon Kinesis, or a file system): Data sources must be able to replay data from the point in time from which you want to re-process.
- All state stored via Flink’s managed state interfaces: All custom operator state must be kept in Flink’s fault tolerant state data structures, which makes it ‘resettable’ to a prior savepoint.
- An appropriate state backend configuration: Flink offers different state backends to persist checkpoints and savepoints. By default, savepoints are stored in the JobManager, but you should configure an appropriate state backend for your application such as RocksDB.
Step 1: Create a savepointFirst, get a list of all running Flink jobs:
user$ flink list
10.10.2016 16:20:33 : job_id : Sample Job (RUNNING)
user$ flink savepoint job_id
user$ flink cancel job_id
Step 2: Start job from a savepointOnce you’ve updated your application, it’s time to start a job from your savepoint.
user$ flink run -d -s hdfs://savepoints/1 directory/your-updated-application.jarIf you’d like to reproduce these steps yourself in a demo application, we recommend you check out an earlier data Artisans blog post that’ll give you an opportunity to do so.
What if I want to update my application?There are a few things to consider if you want to start a modified application from a savepoint. We can distinguish two cases:
- changing the logic of a user-defined function, such as a MapFunction
- changing the topology of an application, i.e., adding or removing operators.
DataStream stream = env.
// Stateful source (e.g. Kafka) with ID
// The stateful mapper with ID
// Stateless sink (no specific ID required)