With the explosive rise of real-time data sources like IoT devices, social media feeds, and log streams, the need for data processing on the fly becomes the most critical for many applications. Apache Spark, popularly known for its inbuilt powerful data processing abilities, offers PySpark Structured Streaming as a high-level API to handle data streams in real time very conveniently and efficiently.
In this post, we go through some basics of PySpark Structured Streaming. We discuss a simple example and cover some key features that make this technology so top-notch when it comes to real-time data processing.
What is PySpark Structured Streaming?
PySpark Structured Streaming is a part of Spark that provides an API to build scalable, fault-tolerant streaming applications. It does not process the data at once, as in batch processing, but rather allows incremental processing of continuously arriving data, as with handling batch data, but with real-time updates.
Key features include:
- Unified Batch and Streaming APIs: Code written for batch processing can often be used for streaming with minimal modification.
- End-to-End Fault Tolerance: PySpark Structured Streaming handles failures elegantly so that no data is lost.
- Stateful Operations: It allows for the processing of data with complex logic, such as maintaining running counts, averages, or even more complex state computations.
- Scalability: You can process a large amount of data since Spark enables distributed processing.
PySpark Structured Streaming Setup
To get started, you first need to install PySpark:
pip install pyspark
In the example below, we are going to use a socket stream to mimic real-time data.
Step-by-Step Tutorial on Streaming with PySpark
Let's get started with a simple example of processing a real-time stream of text data.
- Import Libraries and Initialize Spark Session
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split
spark = SparkSession.builder
.appName("StructuredStreamingExample")
.getOrCreate()
- Define Data Stream Source
For this example, we will read from a socket source, which is one of the most common sources for development and testing.
lines = spark.readStream \\
.format("socket") \\
.option("host", "localhost") \\
.option("port", 9999) \\
.load()
- Data Transformation
Assume that every line in the stream is a sentence, and we would like to count, in real time, the occurrences of each word.
words = lines.select(
explode(split(lines.value, " ").alias("word"))
)
wordCounts = words.groupBy("word").count()
- Output Sink
We need to define the output in which we could write in that case, it could be console, Kafka, and many more sinks supported in Spark. Here, it will be a console output.
query = wordCounts.writeStream
.outputMode("complete") \\
.format("console") \\
.start() \\
query.awaitTermination()
When you run this application and bind to the socket with data (like by using nc -lk 9999
to send some data), you will find word counts coming up live on the console.
Important Concepts in PySpark Structured Streaming
Triggers
Structured Streaming runs with a trigger, which describes how often Spark checks for new data. Options include
• Default: process data as soon as it arrives
• Fixed Interval: triggers on some fixed interval, e.g., every 10 seconds.
• Once: It processes all available data and ends.Output Modes
The output mode specifies how the output is emitted for each micro-batch:
• Append: It emits only new rows since the last trigger.
• Complete: It rewinds the entire result for every trigger.
• Update: It emits only changed rows.Fault Tolerance and Checkpointing
Structured Streaming employs WAL and checkpointing to avert loss of data during a failure. Configure the checkpointing as follows:
query = wordCounts.writeStream \\
.outputMode("complete") \\
.format("console") \\
.option("checkpointLocation", "/path/to/checkpoint") \\
.start()
PySpark Structured Streaming Use Cases
Structured Streaming is particularly useful for lots of applications ranging from straightforward text processing to complex multi-source integration. Here are some good use cases:
- Real-Time Dashboard Analytics: Trends and key metrics of user activity on a website in real-time.
- Fraud Detection Systems: Detecting potentially fraudulent transactions or activities in financial systems by analyzing patterns in real-time.
- IoT Data Processing: Monitoring data from connected devices for real-time anomaly detection and reporting.
- Log Processing: Aggregating and analyzing log files from servers to detect errors or security threats.
Conclusion
It becomes rather easy to construct robust real-time data processing pipelines with PySpark Structured Streaming. This brings the familiarity of Spark's DataFrame API into streaming and allows seamless transitions between batch and streaming applications. The support for fault tolerance, scalability, and real-time data transformation makes it ideally suited for managing the ever-growing demand for real-time analytics.
Structured Streaming is a really great solution if someone's goal is to access or process real-time data; let's build their applications streaming to see it really can.