Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Free Learning
Arrow right icon
Arrow up icon
GO TO TOP
Python Data Cleaning and Preparation Best Practices

You're reading from   Python Data Cleaning and Preparation Best Practices A practical guide to organizing and handling data from various sources and formats using Python

Arrow left icon
Product type Paperback
Published in Sep 2024
Publisher Packt
ISBN-13 9781837634743
Length 456 pages
Edition 1st Edition
Languages
Arrow right icon
Author (1):
Arrow left icon
Maria Zervou Maria Zervou
Author Profile Icon Maria Zervou
Maria Zervou
Arrow right icon
View More author details
Toc

Table of Contents (19) Chapters Close

Preface 1. Part 1: Upstream Data Ingestion and Cleaning
2. Chapter 1: Data Ingestion Techniques FREE CHAPTER 3. Chapter 2: Importance of Data Quality 4. Chapter 3: Data Profiling – Understanding Data Structure, Quality, and Distribution 5. Chapter 4: Cleaning Messy Data and Data Manipulation 6. Chapter 5: Data Transformation – Merging and Concatenating 7. Chapter 6: Data Grouping, Aggregation, Filtering, and Applying Functions 8. Chapter 7: Data Sinks 9. Part 2: Downstream Data Cleaning – Consuming Structured Data
10. Chapter 8: Detecting and Handling Missing Values and Outliers 11. Chapter 9: Normalization and Standardization 12. Chapter 10: Handling Categorical Features 13. Chapter 11: Consuming Time Series Data 14. Part 3: Downstream Data Cleaning – Consuming Unstructured Data
15. Chapter 12: Text Preprocessing in the Era of LLMs 16. Chapter 13: Image and Audio Preprocessing with LLMs 17. Index 18. Other Books You May Enjoy

Ingesting data in streaming mode

Streaming ingestion is a data processing technique whereby data is collected, processed, and loaded into a system in real-time, as it is generated. Unlike batch ingestion, which accumulates data for processing at scheduled intervals, streaming ingestion handles data continuously, allowing organizations to analyze and act on information immediately. For instance, a company might process customer transaction data the moment it occurs, enabling real-time insights and decision-making. This method is particularly useful for organizations that require up-to-the-minute data analysis, such as in financial trading, fraud detection, or sensor data monitoring.

Streaming ingestion is advantageous because it enables immediate processing of data, reducing latency and allowing organizations to react quickly to changing conditions. This is particularly beneficial in scenarios where timely responses are critical, such as detecting anomalies, personalizing user experiences, or responding to real-time events. Additionally, streaming can lead to more efficient resource utilization by distributing the processing load evenly over time, rather than concentrating it into specific batch windows. In cloud-based environments, this can also translate into cost savings, as resources can be scaled dynamically to match the real-time data flow. For organizations with irregular or unpredictable data flows, streaming ingestion offers a flexible, responsive, and scalable approach to data processing and analytics. Let’s look at some of its advantages and disadvantages.

Advantages and disadvantages

Streaming ingestion offers several distinct advantages, making it an essential choice for specific data processing needs:

  • One of the primary benefits is the ability to obtain real-time insights from data. This immediacy is crucial for applications such as fraud detection, real-time analytics, and dynamic pricing, where timely data is vital.
  • Streaming ingestion supports continuous data processing, allowing systems to handle data as it arrives, thereby reducing latency and improving responsiveness.
  • This method is highly scalable, as well as capable of managing high-velocity data streams from multiple sources without significant delays.

However, streaming ingestion also presents some challenges:

  • Implementing a streaming ingestion system can be complex, requiring sophisticated infrastructure and specialized tools to manage data streams effectively.
  • Continuous processing demands constant computational resources, which can be costly and resource-intensive.
  • Ensuring data consistency and accuracy in a streaming environment can be difficult due to the constant influx of data and the potential for out-of-order or duplicate records

Let’s look at common use cases for ingesting data in batch mode.

Common use cases for streaming ingestion

While batch processing is well-suited for periodic, large-scale data updates and transformations, streaming data ingestion is crucial for real-time data analytics and applications that require immediate insights. Here are some common use cases for streaming data ingestion.

Real-time fraud detection and security monitoring

Financial institutions use streaming data to detect fraudulent activities by analyzing transaction data in real-time. Immediate anomaly detection helps prevent fraud before it can cause significant damage. Streaming data is used in cybersecurity to detect and respond to threats immediately. Continuous monitoring of network traffic, user behavior, and system logs helps identify and mitigate security breaches as they occur.

IoT and sensor data

In manufacturing, streaming data from sensors on machinery allows for predictive maintenance. By continuously monitoring equipment health, companies can prevent breakdowns and optimize maintenance schedules.

Another interesting application in the IoT and sensors space is smart cities. Streaming data from various sensors across a city (traffic, weather, pollution, etc.) helps in managing city operations in real-time, improving services such as traffic management and emergency response.

Online recommendations and personalization

Streaming data enables e-commerce platforms to provide real-time recommendations to users based on their current browsing and purchasing behavior. This enhances user experience and increases sales. Platforms such as Netflix and Spotify use streaming data to update recommendations as users interact with the service, providing personalized content suggestions in real-time.

Financial market data

Stock traders rely on streaming data for up-to-the-second information on stock prices and market conditions to make informed trading decisions. Automated trading systems use streaming data to execute trades based on predefined criteria, requiring real-time data processing for optimal performance.

Telecommunications

Telecommunication companies use streaming data to monitor network performance and usage in real-time, ensuring optimal service quality and quick resolution of issues. Streaming data also helps in tracking customer interactions and service usage in real-time, enabling personalized customer support and improving the overall experience.

Real-time logistics and supply chain management

Streaming data from GPS devices allows logistics companies to track vehicle locations and optimize routes in real-time, improving delivery efficiency. Real-time inventory tracking helps businesses maintain optimal stock levels, reducing overstock and stockouts while ensuring timely replenishment.

Streaming ingestion in an e-commerce platform

Streaming ingestion is a methodical process involving several key steps: data extraction, data transformation, data loading, and monitoring and alerting. To illustrate these steps, let’s explore a use case involving an e-commerce platform that needs to process and analyze user activity data in real-time for personalized recommendations and dynamic inventory management.

An e-commerce platform needs to collect, transform, and load user activity data from various sources such as website clicks, search queries, and purchase transactions into a central system. This data will be used for generating real-time personalized recommendations, monitoring user behavior, and managing inventory dynamically.

Data extraction

This is the first step is identifying the sources from which data will be extracted. For the e-commerce platform, this includes web servers, mobile apps, and third-party analytics services. These sources contain critical data such as user clicks, search queries, and transaction details. Once the sources are identified, data is collected using streaming connectors or APIs. This involves setting up data pipelines that extract data from web servers, mobile apps, and analytics services in real-time. The extracted data is then streamed to processing systems such as Apache Kafka or AWS Kinesis.

Data transformation

The extracted data often contains inconsistencies and noise. Real-time data cleaning is performed to filter out irrelevant information, handle missing values, and correct errors. For the e-commerce platform, this ensures that user activity records are accurate and relevant for analysis. After cleaning, the data undergoes transformations such as parsing, enrichment, and aggregation. For example, the e-commerce platform might parse user clickstream data to identify browsing patterns, enrich transaction data with product details, and aggregate search queries to identify trending products. The transformed data must be mapped to the schema of the target system. This involves aligning the data fields with the structure of the real-time analytics system. For instance, user activity data might be mapped to tables representing sessions, products, and user profiles, ensuring seamless integration with the existing data model.

Data loading

The transformed data is processed continuously using tools such as Apache Flink or Apache Spark Streaming. Continuous processing allows the e-commerce platform to handle high-velocity data streams efficiently, performing transformations and aggregations in real-time. Once processed, the data is loaded into the target storage system, such as a real-time database or analytics engine, where it can be accessed for personalized recommendations and dynamic inventory management.

Monitoring and alerting

To ensure that the streaming ingestion process runs smoothly and consistently, monitoring tools such as Prometheus or Grafana are used. These tools provide real-time insights into the performance and health of the data ingestion pipelines, identifying any failures or performance bottlenecks. Implementing alerting mechanisms is crucial to promptly detect and resolve any issues in the streaming ingestion process. For the e-commerce platform, this ensures that any disruptions in data flow are quickly addressed, maintaining the integrity and reliability of the data pipeline.

Streaming ingestion with an example

As we said, in streaming, data is processed as it arrives rather than in predefined batches. Let’s modify the batch example to transition to a streaming paradigm. For simplicity, we will generate data continuously, process it immediately upon arrival, transform it, and then load it:

  1. The generate_mock_data function generates records continuously using a generator and simulates a delay between each record:
    def generate_mock_data():
        while True:
            record = {
                'id': random.randint(1, 1000),
                'value': random.random() * 100
            }
            yield record
            time.sleep(0.5)  # Simulate data arriving every 0.5 seconds
  2. The process_stream function processes each record as it arrives from the data generator, without waiting for a batch to be filled:
    def process_stream(run_time_seconds=10):
        start_time = time.time()
        for record in generate_mock_data():
            transformed_record = transform_data(record)
            load_data(transformed_record)
            # Check if the run time has exceeded the limit
            if time.time() – start_time > run_time_seconds:
                print("Time limit reached. Terminating the stream processing.")
                break
  3. The transform_data function transforms each record individually as it arrives:
    def transform_data(record):
        transformed_record = {
            'id': record['id'],
            'value': record['value'],
            'transformed_value': record['value'] * 1.1  # Example transformation
        }
        return transformed_record
  4. The load_data function simulates loading data by processing each record as it arrives, instead of processing each record within a batch as before:
    def load_data(record):
        print(f"Loading record into database: {record}")

Let’s move from real-time to semi-real-time processing, which you can think it as batch processing over short intervals. It is usually called micro-batch processing.

You have been reading a chapter from
Python Data Cleaning and Preparation Best Practices
Published in: Sep 2024
Publisher: Packt
ISBN-13: 9781837634743
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $19.99/month. Cancel anytime
Banner background image