IntroductionIn today's Crypto rapidly evolving world, staying informed about the latest news and developments is crucial. However, with the overwhelming amount of information available, it's becoming increasingly challenging to find relevant news quickly. In this article, we will delve into the creation of a powerful system that fetches real-time news articles from various RSS feeds and stores them in the Weaviate vector database. We will explore how this application lays the foundation for context-based news retrieval and how it can be a stepping stone for more advanced applications, such as similarity search and contextual understanding.Before we dive into the technical details, let's ensure that you have a basic understanding of the technologies we'll be using. Familiarity with Python and Docker will be beneficial as we build and deploy our applications.Setting up the EnvironmentTo get started, we need to set up the development environment. This environment consists of three primary components: the RSS news fetcher, the Weaviate vector database, and the Transformers Inference API for text vectorization.Our application's architecture is orchestrated using Docker Compose. The provided docker-compose.yml file defines three services: rss-fetcher, weaviate, and t2v-transformers. These services interact to fetch news, store it in the vector database, and prepare it for vectorization.version: '3.4'
services:
rss-fetcher:
image: rss/python
build:
context: ./rss_fetcher
app:
build:
context: ./app
ports:
- 8501:8501
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
depends_on:
- rss-fetcher
- weaviate
weaviate:
image: semitechnologies/weaviate:latest
restart: on-failure:0
ports:
- "8080:8080"
environment:
QUERY_DEFAULTS_LIMIT: 20
AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true'
PERSISTENCE_DATA_PATH: "./data"
DEFAULT_VECTORIZER_MODULE: text2vec-transformers
ENABLE_MODULES: text2vec-transformers
TRANSFORMERS_INFERENCE_API: <http://t2v-transformers:8080>
CLUSTER_HOSTNAME: 'node1'
t2v-transformers:
image: semitechnologies/transformers-inference:sentence-transformers-multi-qa-MiniLM-L6-cos-v1
environment:
ENABLE_CUDA: 0 # set to 1 to enable
# NVIDIA_VISIBLE_DEVICES: all # enable if running with CUDAEach service is configured with specific environment variables that define its behavior. In our application, we make use of environment variables like OPENAI_API_KEY to ensure secure communication with external services. We also specify the necessary dependencies, such as the Python libraries listed in the requirements.txt files for the rss-fetcher and weaviate services.Creating the RSS News FetcherThe foundation of our news retrieval system is the RSS news fetcher. This component will actively fetch articles from various RSS feeds, extract essential information, and store them in the Weaviate vector database.This is the Dockerfile of our RSS fetcher:FROM python:3
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "-u", "rss_fetcher.py"]Our RSS news fetcher is implemented within the rss_fetcher.py script. This script performs several key tasks, including fetching RSS feeds, parsing articles, and inserting data into the Weaviate database.import feedparser
import pandas as pd
import time
from bs4 import BeautifulSoup
import requests
import random
from datetime import datetime, timedelta
import pytz
import uuid
import weaviate
import json
import time
def wait_for_weaviate():
"""Wait until Weaviate is available."""
while True:
try:
# Try fetching the Weaviate metadata without initiating the client here
response = requests.get("<http://weaviate:8080/v1/meta>")
response.raise_for_status()
meta = response.json()
# If successful, the instance is up and running
if meta:
print("Weaviate is up and running!")
return
except (requests.exceptions.RequestException):
# If there's any error (connection, timeout, etc.), wait and try again
print("Waiting for Weaviate...")
time.sleep(5)
RSS_URLS = [
"<https://thedefiant.io/api/feed>",
"<https://cointelegraph.com/rss>",
"<https://cryptopotato.com/feed/>",
"<https://cryptoslate.com/feed/>",
"<https://cryptonews.com/news/feed/>",
"<https://smartliquidity.info/feed/>",
"<https://bitcoinmagazine.com/feed>",
"<https://decrypt.co/feed>",
"<https://bitcoinist.com/feed/>",
"<https://cryptobriefing.com/feed>",
"<https://www.newsbtc.com/feed/>",
"<https://coinjournal.net/feed/>",
"<https://ambcrypto.com/feed/>",
"<https://www.the-blockchain.com/feed/>"
]
def get_article_body(link):
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.3'}
response = requests.get(link, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
paragraphs = soup.find_all('p')
# Directly return list of non-empty paragraphs
return [p.get_text().strip() for p in paragraphs if p.get_text().strip() != ""]
except Exception as e:
print(f"Error fetching article body for {link}. Reason: {e}")
return []
def parse_date(date_str):
# Current date format from the RSS
date_format = "%a, %d %b %Y %H:%M:%S %z"
try:
dt = datetime.strptime(date_str, date_format)
# Ensure the datetime is in UTC
return dt.astimezone(pytz.utc)
except ValueError:
# Attempt to handle other possible formats
date_format = "%a, %d %b %Y %H:%M:%S %Z"
dt = datetime.strptime(date_str, date_format)
return dt.replace(tzinfo=pytz.utc)
def fetch_rss(from_datetime=None):
all_data = []
all_entries = []
# Step 1: Fetch all the entries from the RSS feeds and filter them by date.
for url in RSS_URLS:
print(f"Fetching {url}")
feed = feedparser.parse(url)
entries = feed.entries
print('feed.entries', len(entries))
for entry in feed.entries:
entry_date = parse_date(entry.published)
# Filter the entries based on the provided date
if from_datetime and entry_date <= from_datetime:
continue
# Storing only necessary data to minimize memory usage
all_entries.append({
"Title": entry.title,
"Link": entry.link,
"Summary": entry.summary,
"PublishedDate": entry.published
})
# Step 2: Shuffle the filtered entries.
random.shuffle(all_entries)
# Step 3: Extract the body for each entry and break it down by paragraphs.
for entry in all_entries:
article_body = get_article_body(entry["Link"])
print("\\nTitle:", entry["Title"])
print("Link:", entry["Link"])
print("Summary:", entry["Summary"])
print("Published Date:", entry["PublishedDate"])
# Create separate records for each paragraph
for paragraph in article_body:
data = {
"UUID": str(uuid.uuid4()), # UUID for each paragraph
"Title": entry["Title"],
"Link": entry["Link"],
"Summary": entry["Summary"],
"PublishedDate": entry["PublishedDate"],
"Body": paragraph
}
all_data.append(data)
print("-" * 50)
df = pd.DataFrame(all_data)
return df
def insert_data(df,batch_size=100):
# Initialize the batch process
with client.batch as batch:
batch.batch_size = 100
# Loop through and batch import the 'RSS_Entry' data
for i, row in df.iterrows():
if i%100==0:
print(f"Importing entry: {i+1}") # Status update
properties = {
"UUID": row["UUID"],
"Title": row["Title"],
"Link": row["Link"],
"Summary": row["Summary"],
"PublishedDate": row["PublishedDate"],
"Body": row["Body"]
}
client.batch.add_data_object(properties, "RSS_Entry")
if __name__ == "__main__":
# Wait until weaviate is available
wait_for_weaviate()
# Initialize the Weaviate client
client = weaviate.Client("<http://weaviate:8080>")
client.timeout_config = (3, 200)
# Reset the schema
client.schema.delete_all()
# Define the "RSS_Entry" class
class_obj = {
"class": "RSS_Entry",
"description": "An entry from an RSS feed",
"properties": [
{"dataType": ["text"], "description": "UUID of the entry", "name": "UUID"},
{"dataType": ["text"], "description": "Title of the entry", "name": "Title"},
{"dataType": ["text"], "description": "Link of the entry", "name": "Link"},
{"dataType": ["text"], "description": "Summary of the entry", "name": "Summary"},
{"dataType": ["text"], "description": "Published Date of the entry", "name": "PublishedDate"},
{"dataType": ["text"], "description": "Body of the entry", "name": "Body"}
],
"vectorizer": "text2vec-transformers"
}
# Add the schema
client.schema.create_class(class_obj)
# Retrieve the schema
schema = client.schema.get()
# Display the schema
print(json.dumps(schema, indent=4))
print("-"*50)
# Current datetime
now = datetime.now(pytz.utc)
# Fetching articles from the last days
days_ago = 3
print(f"Getting historical data for the last {days_ago} days ago.")
last_week = now - timedelta(days=days_ago)
df_hist = fetch_rss(last_week)
print("Head")
print(df_hist.head().to_string())
print("Tail")
print(df_hist.head().to_string())
print("-"*50)
print("Total records fetched:",len(df_hist))
print("-"*50)
print("Inserting data")
# insert historical data
insert_data(df_hist,batch_size=100)
print("-"*50)
print("Data Inserted")
# check if there is any relevant news in the last minute
while True:
# Current datetime
now = datetime.now(pytz.utc)
# Fetching articles from the last hour
one_min_ago = now - timedelta(minutes=1)
df = fetch_rss(one_min_ago)
print("Head")
print(df.head().to_string())
print("Tail")
print(df.head().to_string())
print("Inserting data")
# insert minute data
insert_data(df,batch_size=100)
print("data inserted")
print("-"*50)
# Sleep for a minute
time.sleep(60)Before we start fetching news, we need to ensure that the Weaviate vector database is up and running. The wait_for_weaviate function repeatedly checks the availability of Weaviate using HTTP requests. This ensures that our fetcher waits until Weaviate is ready to receive data.The core functionality of our fetcher lies in its ability to retrieve articles from various RSS feeds. We iterate through the list of RSS URLs, using the feedparser library to parse the feeds and extract key information such as the article's title, link, summary, and published date.To provide context for similarity search and other applications, we need the actual content of the articles. The get_article_body function fetches the article's HTML content, parses it using BeautifulSoup, and extracts relevant text paragraphs. This content is crucial for creating a rich context for each article.After gathering the necessary information, we create data objects for each article and insert them into the Weaviate vector database. Weaviate provides a client library that simplifies the process of adding data. We use the weaviate.Client class to interact with the Weaviate instance and batch-insert articles' data objects.Now that we have laid the groundwork for building our context-based news retrieval system, in the next sections, we'll delve deeper into Weaviate's role in this application and how we can leverage it for similarity search and more advanced features.Weaviate Configuration and SchemaWeaviate, an open-source knowledge graph, plays a pivotal role in our application. It acts as a vector database that stores and retrieves data based on their semantic relationships and vector representations. Weaviate's ability to store text data and create vector representations for efficient similarity search aligns perfectly with our goal of context-based news retrieval. By utilizing Weaviate, we enable our system to understand the context of news articles and retrieve semantically similar content.To structure the data stored in Weaviate, we define a class called RSS_Entry. This class schema includes properties like UUID, Title, Link, Summary, PublishedDate, and Body. These properties capture essential information about each news article and provide a solid foundation for context retrieval. # Define the "RSS_Entry" class
class_obj = {
"class": "RSS_Entry",
"description": "An entry from an RSS feed",
"properties": [
{"dataType": ["text"], "description": "UUID of the entry", "name": "UUID"},
{"dataType": ["text"], "description": "Title of the entry", "name": "Title"},
{"dataType": ["text"], "description": "Link of the entry", "name": "Link"},
{"dataType": ["text"], "description": "Summary of the entry", "name": "Summary"},
{"dataType": ["text"], "description": "Published Date of the entry", "name": "PublishedDate"},
{"dataType": ["text"], "description": "Body of the entry", "name": "Body"}
],
"vectorizer": "text2vec-transformers"
}
# Add the schema
client.schema.create_class(class_obj)
# Retrieve the schema
schema = client.schema.get()The uniqueness of Weaviate lies in its ability to represent text data as vectors. Our application leverages the text2vec-transformers module as the default vectorizer. This module transforms text into vector embeddings using advanced language models. This vectorization process ensures that the semantic relationships between articles are captured, enabling meaningful similarity search and context retrieval.Real-time and Historical Data InsertionEfficient data insertion is vital for ensuring that our Weaviate-based news retrieval system provides up-to-date and historical context for users. Our application caters to two essential use cases: real-time context retrieval and historical context analysis. The ability to insert real-time news articles ensures that users receive the most recent information. Additionally, historical data insertion enables a broader perspective by allowing users to explore trends and patterns over time.To populate our database with historical data, we utilize the fetch_rss function. This function fetches news articles from the last few days, as specified by the days_ago parameter. The retrieved articles are then processed, and data objects are batch-inserted into Weaviate. This process guarantees that our database contains a diverse set of historical articles.def fetch_rss(from_datetime=None):
all_data = []
all_entries = []
# Step 1: Fetch all the entries from the RSS feeds and filter them by date.
for url in RSS_URLS:
print(f"Fetching {url}")
feed = feedparser.parse(url)
entries = feed.entries
print('feed.entries', len(entries))
for entry in feed.entries:
entry_date = parse_date(entry.published)
# Filter the entries based on the provided date
if from_datetime and entry_date <= from_datetime:
continue
# Storing only necessary data to minimize memory usage
all_entries.append({
"Title": entry.title,
"Link": entry.link,
"Summary": entry.summary,
"PublishedDate": entry.published
})
# Step 2: Shuffle the filtered entries.
random.shuffle(all_entries)
# Step 3: Extract the body for each entry and break it down by paragraphs.
for entry in all_entries:
article_body = get_article_body(entry["Link"])
print("\\nTitle:", entry["Title"])
print("Link:", entry["Link"])
print("Summary:", entry["Summary"])
print("Published Date:", entry["PublishedDate"])
# Create separate records for each paragraph
for paragraph in article_body:
data = {
"UUID": str(uuid.uuid4()), # UUID for each paragraph
"Title": entry["Title"],
"Link": entry["Link"],
"Summary": entry["Summary"],
"PublishedDate": entry["PublishedDate"],
"Body": paragraph
}
all_data.append(data)
print("-" * 50)
df = pd.DataFrame(all_data)
return dfThe real-time data insertion loop ensures that newly published articles are promptly added to the Weaviate database. We fetch news articles from the last minute and follow the same data insertion process. This loop ensures that the database is continuously updated with fresh content.ConclusionIn this article, we've explored crucial aspects of building an RSS news retrieval system with Weaviate. We delved into Weaviate's role as a vector database, examined the RSS_Entry class schema, and understood how text data is vectorized using text2vec-transformers. Furthermore, we discussed the significance of real-time and historical data insertion in providing users with relevant and up-to-date news context. With a solid foundation in place, we're well-equipped to move forward and explore more advanced applications, such as similarity search and context-based content retrieval, which is what we will be building in the next article. The seamless integration of Weaviate with our news fetcher sets the stage for a powerful context-aware information retrieval system.Author BioAlan Bernardo Palacio is a data scientist and an engineer with vast experience in different engineering fields. His focus has been the development and application of state-of-the-art data products and algorithms in several industries. He has worked for companies such as Ernst and Young, and Globant, and now holds a data engineer position at Ebiquity Media helping the company to create a scalable data pipeline. Alan graduated with a Mechanical Engineering degree from the National University of Tucuman in 2015, participated as the founder of startups, and later on earned a Master's degree from the faculty of Mathematics at the Autonomous University of Barcelona in 2017. Originally from Argentina, he now works and resides in the Netherlands.LinkedIn
Read more