Ingesting data in batch mode
Batch ingestion is a data processing technique whereby large volumes of data are collected, processed, and loaded into a system at scheduled intervals, rather than in real-time. This approach allows organizations to handle substantial amounts of data efficiently by grouping data into batches, which are then processed collectively. For example, a company might collect customer transaction data throughout the day and then process it in a single batch during off-peak hours. This method is particularly useful for organizations that need to process high volumes of data but do not require immediate analysis.
Batch ingestion is beneficial because it optimizes system resources by spreading the processing load across scheduled times, often when the system is underutilized. This reduces the strain on computational resources and can lower costs, especially in cloud-based environments where computing power is metered. Additionally, batch processing simplifies data management, as it allows for the easy application of consistent transformations and validations across large datasets. For organizations with regular, predictable data flows, batch ingestion provides a reliable, scalable, and cost-effective solution for data processing and analytics.
Let’s explore batch ingestion in more detail, starting with its advantages and disadvantages.
Advantages and disadvantages
Batch ingestion offers several notable advantages that make it an attractive choice for many data processing needs:
- Efficiency is a key benefit, as batch processing allows for the handling of large volumes of data in a single operation, optimizing resource usage and minimizing overhead
- Cost-effectiveness is another benefit, reducing the need for continuous processing resources and lowering operational expenses.
- Simplicity makes it easier to manage and implement periodic data processing tasks compared to real-time ingestion, which often requires more complex infrastructure and management
- Robustness, as batch processing is well-suited for performing complex data transformations and comprehensive data validation, ensuring high-quality, reliable data
However, batch ingestion also comes with certain drawbacks:
- There is an inherent delay between the generation of data and its availability for analysis, which can be a critical issue for applications requiring real-time insights.
- Resource spikes can occur during batch processing windows, leading to high resource usage and potential performance bottlenecks
- Scalability can also be a concern, as handling very large datasets may require significant infrastructure investment and management
- Lastly, maintenance is a crucial aspect of batch ingestion; it demands careful scheduling and ongoing maintenance to ensure the timely and reliable execution of batch jobs
Let’s look at some common use cases for ingesting data in batch mode.
Common use cases for batch ingestion
Any data analytics platform such as data warehouses or data lakes requires regularly updated data for Business Intelligence (BI) and reporting. Batch ingestion is integral as it ensures that data is continually updated with the latest information, enabling businesses to perform comprehensive and up-to-date analyses. By processing data in batches, organizations can efficiently handle vast amounts of transactional and operational data, transforming it into a structured format suitable for querying and reporting. This supports BI initiatives, allowing analysts and decision-makers to generate insightful reports, track Key Performance Indicators (KPIs), and make data-driven decisions.
Extract, Transform, and Load (ETL) processes are a cornerstone of data integration projects, and batch ingestion plays a crucial role in these workflows. In ETL processes, data is extracted from various sources, transformed to fit the operational needs of the target system, and loaded into a database or data warehouse. Batch processing allows for efficient handling of these steps, particularly when dealing with large datasets that require significant transformation and cleansing. This method is ideal for periodic data consolidation, where data from disparate systems is integrated to provide a unified view, supporting activities such as data migration, system integration, and master data management.
Batch ingestion is also widely used for backups and archiving, which are critical processes for data preservation and disaster recovery. Periodic batch processing allows for the scheduled backup of databases, ensuring that all data is captured and securely stored at regular intervals. This approach minimizes the risk of data loss and provides a reliable restore point in case of system failures or data corruption. Additionally, batch processing is used for data archiving, where historical data is periodically moved from active systems to long-term storage solutions. This not only helps in managing storage costs but also ensures that important data is retained and can be retrieved for compliance, auditing, or historical analysis purposes.
Batch ingestion use cases
Batch ingestion is a methodical process involving several key steps: data extraction, data transformation, data loading, scheduling, and automation. To illustrate these steps, let’s explore a use case involving an investment bank that needs to process and analyze trading data for regulatory compliance and performance reporting.
Batch ingestion in an investment bank
An investment bank needs to collect, transform, and load trading data from various financial markets into a central data warehouse. This data will be used for generating daily compliance reports, evaluating trading strategies, and making informed investment decisions.
Data extraction
The first step is identifying the sources from which data will be extracted. For the investment bank, this includes trading systems, market data providers, and internal risk management systems. These sources contain critical data such as trade execution details, market prices, and risk assessments. Once the sources are identified, data is collected using connectors or scripts. This involves setting up data pipelines that extract data from trading systems, import real-time market data feeds, and pull risk metrics from internal systems. The extracted data is then temporarily stored in staging areas before processing.
Data transformation
The extracted data often contains inconsistencies, duplicates, and missing values. Data cleaning is performed to remove duplicates, fill in missing information, and correct errors. For the investment bank, this ensures that trade records are accurate and complete, providing a reliable foundation for compliance reporting and performance analysis. After cleaning, the data undergoes transformations such as aggregations, joins, and calculations. For example, the investment bank might aggregate trade data to calculate daily trading volumes, join trade records with market data to analyze price movements, and calculate key metrics such as Profit and Loss (P&L) and risk exposure. The transformed data must be mapped to the schema of the target system. This involves aligning the data fields with the structure of the data warehouse. For instance, trade data might be mapped to tables representing transactions, market data, and risk metrics, ensuring seamless integration with the existing data model.
Data loading
The transformed data is processed in batches, which allows the investment bank to handle large volumes of data efficiently, performing complex transformations and aggregations in a single run. Once processed, the data is loaded into the target storage system, such as a data warehouse or data lake. For the investment bank, this means loading the cleaned and transformed trading data into their data warehouse, where it can be accessed for compliance reporting and performance analysis.
Scheduling and automation
To ensure that the batch ingestion process runs smoothly and consistently, scheduling tools such as Apache Airflow or Cron jobs are used. These tools automate the data ingestion workflows, scheduling them to run at regular intervals, such as every night or every day. This allows the investment bank to have up-to-date data available for analysis without manual intervention. Implementing monitoring is crucial to track the success and performance of batch jobs. Monitoring tools provide insights into job execution, identifying any failures or performance bottlenecks. For the investment bank, this ensures that any issues in the data ingestion process are promptly detected and resolved, maintaining the integrity and reliability of the data pipeline.
Batch ingestion with an example
Let’s have a look at a simple example of a batch processing ingestion system written in Python. This example will simulate the ETL process. We’ll generate some mock data, process it in batches, and load it into a simulated database.
You can find the code for this part in the GitHub repository at https://github.com/PacktPublishing/Python-Data-Cleaning-and-Preparation-Best-Practices/blob/main/chapter01/1.batch.py. To run this example, we don’t need any bespoke library installation. We just need to ensure that we are running it in a standard Python environment (Python 3.x):
- We create a
generate_mock_data
function that generates a list of mock data records:def generate_mock_data(num_records): data = [] for _ in range(num_records): record = { 'id': random.randint(1, 1000), 'value': random.random() * 100 } data.append(record) return data
Each record is a dictionary with two fields:
id
: A random integer between 1 and 1000value
: A random float between 0 and 100
Let’s have a look at what the data looks like:
print("Original data:", data) {'id': 449, 'value': 99.79699336555473} {'id': 991, 'value': 79.65999078145887}
A list of dictionaries is returned, each representing a data record.
- Next, we create a batch processing function:
def process_in_batches(data, batch_size): for i in range(0, len(data), batch_size): yield data[i:i + batch_size]
This function takes the data, which is a list of data records to process, and
batch_size
, which represents the number of records per batch, as parameters. The function uses afor
loop to iterate over the data in steps ofbatch_size
. Theyield
keyword is used to generate batches of data, each of thebatch_size
size. A generator that yields batches of data is returned. - We create a
transform_data
function that transforms each record in the batch:def transform_data(batch): transformed_batch = [] for record in batch: transformed_record = { 'id': record['id'], 'value': record['value'], 'transformed_value': record['value'] * 1.1 } transformed_batch.append(transformed_record) return transformed_batch
This function takes as an argument the batch, which is a list of data records to be transformed. The transformation logic is simple: a new
transformed_value
field is added to each record, which is the original value multiplied by 1.1. At the end, we have a list of transformed records. Let’s have a look at some of our transformed records:{'id': 558, 'value': 12.15160339587219, 'transformed_value': 13.36676373545941} {'id': 449, 'value': 99.79699336555473, 'transformed_value': 109.77669270211021} {'id': 991, 'value': 79.65999078145887, 'transformed_value': 87.62598985960477}
- Next, we create a
load_data
function to load the data. This function simulates loading each transformed record into a database:def load_data(batch): for record in batch: # Simulate loading data into a database print(f"Loading record into database: {record}")
This function takes the batch as a parameter, which is a list of transformed data records that is ready to be loaded. Each record is printed to the console to simulate loading it into a database.
- Finally, we create a
main
function. This function calls all the aforementioned functions:def main(): # Parameters num_records = 100 # Total number of records to generate batch_size = 10 # Number of records per batch # Generate data data = generate_mock_data(num_records) # Process and load data in batches for batch in process_in_batches(data, batch_size): transformed_batch = transform_data(batch) print("Batch before loading:") for record in transformed_batch: print(record) load_data(transformed_batch) time.sleep(1) # Simulate time delay between batches
This function calls
generate_mock_data
to create the mock data and usesprocess_in_batches
to divide the data into batches. For each batch, the function does the following:- Transforms the batch using
transform_data
- Prints the batch to show its contents before loading
- Simulates loading the batch using
load_data
- Transforms the batch using
Now, let’s transition from batch processing to a streaming paradigm. In streaming, data is processed as it arrives, rather than in predefined batches.