Real-time versus semi-real-time ingestion
Real-time ingestion refers to the process of collecting, processing, and loading data almost instantaneously as it is generated, as we have discussed. This approach is critical for applications that require immediate insights and actions, such as fraud detection, stock trading, and live monitoring systems. Real-time ingestion provides the lowest latency, enabling businesses to react to events as they occur. However, it demands robust infrastructure and continuous resource allocation, making it complex and potentially expensive to maintain.
Semi-real-time ingestion, on the other hand, also known as near real-time ingestion, involves processing data with minimal delay, typically in seconds or minutes, rather than instantly. This approach strikes a balance between real-time and batch processing, providing timely insights while reducing the resource intensity and complexity associated with true real-time systems. Semi-real-time ingestion is suitable for applications such as social media monitoring, customer feedback analysis, and operational dashboards, where near-immediate data processing is beneficial but not critically time-sensitive.
Common use cases for near-real-time ingestion
Let’s look at some of the common use cases wherein we can use near-real-time ingestion.
Real-time analytics
Streaming enables organizations to continuously monitor data as it flows in, allowing for real-time dashboards and visualizations. This is critical in industries such as finance, where stock prices, market trends, and trading activities need to be tracked live. It also allows for instant report generation, facilitating timely decision-making and reducing the latency between data generation and analysis.
Social media and sentiment analysis
Companies track mentions and sentiments on social media in real-time to manage brand reputation and respond to customer feedback promptly. Streaming data allows for the continuous analysis of public sentiment towards brands, products, or events, providing immediate insights that can influence marketing and PR strategies.
Customer experience enhancement
Near-real-time processing allows support teams to access up-to-date information on customer issues and behavior, enabling quicker and more accurate responses to customer inquiries. Businesses can also use near-real-time data to update customer profiles and trigger personalized marketing messages, such as emails or notifications, shortly after a customer interacts with their website or app.
Semi-real-time mode with an example
Transitioning from real-time to semi-real-time data processing involves adjusting the example to introduce a more structured approach to handling data updates, rather than processing each record immediately upon arrival. This can be achieved by batching data updates over short intervals, which allows for more efficient processing while still maintaining a responsive data processing pipeline. Let’s have a look at the example and as always, you can find the code in the GitHub repository https://github.com/PacktPublishing/Python-Data-Cleaning-and-Preparation-Best-Practices/blob/main/chapter01/3.semi_real_time.py:
- For generating mock data continuously, there are no changes from the previous example. This continuously generates mock data records with a slight delay (
time.sleep(0.1)
). - For processing in semi-real-time, we can use a deque to buffer incoming records. This function processes records when either the specified time interval has elapsed, or the buffer reaches a specified size (
batch_size
). Then, it converts the deque to a list (list(buffer)
) before passing it totransform_data
, ensuring the data is processed in a batch:def process_semi_real_time(batch_size, interval): buffer = deque() start_time = time.time() for record in generate_mock_data(): buffer.append(record)
- Check whether the interval has elapsed, or the buffer size has been reached:
if (time.time() - start_time) >= interval or len(buffer) >= batch_size:
- Process and clear the buffer:
transformed_batch = transform_data(list(buffer)) # Convert deque to list print(f"Batch of {len(transformed_batch)} records before loading:") for rec in transformed_batch: print(rec) load_data(transformed_batch) buffer.clear() start_time = time.time() # Reset start time
- Then, we transform each record in the batch. There are no changes from the previous example and we load the data.
When you run this code, it continuously generates mock data records. Records are buffered until either the specified time interval (interval
) has elapsed, or the buffer reaches the specified size (batch_size
). Once the conditions are met, the buffered records are processed as a batch, transformed, and then “loaded” (printed) into the simulated database.
When discussing the different types of data sources that are suitable for batch, streaming, or semi-real-time streaming processing, it’s essential to consider the diversity and characteristics of these sources. Data can originate from various sources, such as databases, logs, IoT devices, social media, or sensors, as we will see in the next section.