Build Knowledge Graph RAG For Data Analysis

This project designs and implements a generative AI system to automate financial market intelligence. The solution ingests real-time, unstructured data from diverse sources, including news outlets and market feeds. A structured knowledge graph is constructed using a LlamaIndex SimplePropertyGraphStore, dynamically modeling entities and their complex interrelationships. This graph serves as the foundational knowledge base for a sophisticated Retrieval-Augmented Generation (RAG) pipeline. The integrated system delivers context-aware, analytical responses to complex user queries, transforming raw data into actionable strategic insights for investment decision-making.

πŸš€ Complete Knowledge Graph RAG Pipeline

This comprehensive guide walks you through building an intelligent financial analysis system that transforms unstructured data into actionable insights using Knowledge Graph RAG technology.

πŸ“Š
Data Collection

Automated web scraping & dataset creation

πŸ•ΈοΈ
Knowledge Graph

Entity extraction & relationship mapping

πŸ”
Intelligent Retrieval

Context-aware query processing

πŸ’‘
Financial Insights

Actionable investment intelligence

Tools & Tech Stack

Folder Structure

1. Create a Virtual Environment

2. Install Required Libraries

This command will install all required dependencies specified in requirements.txt file

pip install -r requirements.txt

3. Importing Dependencies

Every AI model is built on the shoulders of powerful libraries. This section covers the essential toolsβ€”for data loading, graph processing, and LLM connectivityβ€”that form the foundation of our application.

import os
import openai
import asyncio
import warnings
import requests
import nest_asyncio
from bs4 import BeautifulSoup 
from llama_index.core import SimpleDirectoryReader 
from llama_index.core import PropertyGraphIndex 
from llama_index.embeddings.openai import OpenAIEmbedding 
from llama_index.core.graph_stores import SimplePropertyGraphStore 
from llama_index.llms.openai import OpenAI 
from llama_index.core.indices.property_graph import SchemaLLMPathExtractor 
from llama_index.graph_stores.neo4j import Neo4jPropertyGraphStore 

4. API Configuration and Environment Setup

This is where we provide our AI model with access to external intelligence. We configure the API keys for Google Search and OpenAI, while setting up the necessary environment for asynchronous operations.

google_api = "YOUR_GOOGLE_API_KEY_HERE"
search_engine_id = "YOUR_SEARCH_ENGINE_ID_HERE"
open_ai_key  = "YOUR_OPENAI_API_KEY_HERE"
os.environ["OPENAI_API_KEY"] = open_ai_key
warnings.filterwarnings("ignore")
nest_asyncio.apply()

Code Explanation:

5. Complete Web Scraping & Dataset Creation System

This code creates an automated pipeline that generates search queries, scrapes web content, and builds a dataset for financial analysis.

search_with_google_api(query)

Purpose: Performs Google searches using the Custom Search API

πŸ”— Core Search Function
def search_with_google_api(query):
    url = f"https://www.googleapis.com/customsearch/v1?q={query}&key={google_api}&cx={search_engine_id}"
    response = requests.get(url)
    if response.status_code == 200:
        return response.json().get("items", [])
    else:
        print(f"Error: {response.status_code}, {response.text}")
        return []

Functionality: Takes a search query, calls Google's API, and returns search results as structured data.

generate_search_queries(user_input)

Purpose: Uses GPT-4 to generate diverse search queries for comprehensive research

πŸ”— Integrates with: OpenAI GPT-4
def generate_search_queries(user_input):
    """
    Generates a list of 5-7 detailed and relevant search queries for financial sentiment analysis
    based on the user's input, such as a target sector, field, or region.
    """
    prompt = f"""
You are a financial analyst and search query expert. Based on the following user input, generate a list of 5-7 search queries
for financial sentiment analysis based on user input. Ensure the queries cover diverse aspects of the topic, including sector-specific trends,
regional financial overviews, and broader financial landscapes. The queries should focus on extracting data relevant to sentiment
and performance analysis.

User Input: {user_input}

Strictly output the queries as a python list of strings. Do not add any additional comments.
"""

    response = openai.chat.completions.create(
        model="gpt-4",
        messages=[
            {"role": "system", "content": "You are an expert in generating search queries for financial sentiment analysis."},
            {"role": "user", "content": prompt}
        ],
        max_tokens=200
    )

    # Extract and clean up the list of queries
    queries =  response.choices[0].message.content.strip()
    return eval(queries)

Functionality: Takes user input (such as a sector, field, or region), uses GPT-4 to generate targeted search queries for financial sentiment analysis, and returns them as a Python list.

  • Input: User input describing a financial sector, field, or region
  • Processing:
    • Constructs a detailed prompt for GPT-4
    • Calls OpenAI's API with the prompt
    • Extracts and processes the response
  • Output: A Python list containing 5-7 targeted search queries
  • Integration: Works seamlessly with the search_with_google_api function to create a complete search pipeline

fetch_full_content(url)

Purpose: Extracts complete textual content from webpages by parsing all paragraph elements

πŸ”— Integrates with: BeautifulSoup, requests
def fetch_full_content(url):
    """
    Fetches the full content of a webpage given its URL.
    """
    headers = {
        "User-Agent": (
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
            "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.121 Safari/537.36"
        )
    }
    try:
        response = requests.get(url, headers=headers, timeout=10)
        if response.status_code == 200:
            soup = BeautifulSoup(response.text, "html.parser")
            paragraphs = soup.find_all("p")
            full_text = "\n".join([p.get_text() for p in paragraphs])
            return full_text.strip() if full_text else None
        else:
            print(f"Error: Unable to fetch content from {url} (Status Code: {response.status_code})")
            return None
    except Exception as e:
        print(f"Error fetching content from {url}: {e}")
        return None

Functionality: Takes a URL, fetches the webpage content using proper headers, extracts all paragraph text using BeautifulSoup, and returns clean, structured text for analysis.

  • Input: Webpage URL
  • Processing:
    • Uses realistic browser headers to avoid blocking
    • Implements timeout protection (10 seconds)
    • Parses HTML with BeautifulSoup
    • Extracts all paragraph (<p>) elements
    • Joins text content with newline separators
  • Output: Clean, extracted text content or None if extraction fails
  • Error Handling: Comprehensive exception handling with descriptive error messages
  • Integration: Works with search results from search_with_google_api to build complete dataset

Key Features:

create_dataset_from_queries(queries, directory="dataset")

Purpose: Processes search queries, fetches web content, and saves structured data files for analysis

πŸ”— Integrates with: search_with_google_api(), fetch_full_content()
def create_dataset_from_queries(queries, directory="dataset"):
    """
    Process search queries and save results as text files in the same directory.
    """
    if not os.path.exists(directory):
        os.makedirs(directory)

    file_count = 1  # To ensure unique filenames across all queries

    for query in queries:
        print(f"Processing query: {query}")
        valid_count = 0
        page_number = 1

        while valid_count < 10:
            print(f"Fetching search results, page {page_number}...")
            results = search_with_google_api(query + f"&start={page_number * 10}")

            if not results:
                print("No more results found. Try refining the query.")
                break

            for result in results:
                if valid_count >= 10:
                    break  # Stop when 10 valid documents are saved

                title = result["title"]
                link = result["link"]
                snippet = result.get("snippet", "No snippet")

                # Fetch full content of the link
                full_content = fetch_full_content(link)
                if full_content:  # Save only if content is valid
                    filename = f"{directory}/doc_{file_count}.txt"
                    with open(filename, "w", encoding="utf-8") as f:
                        f.write(f"Query: {query}\n")
                        f.write(f"Title: {title}\n")
                        f.write(f"Link: {link}\n")
                        f.write(f"Snippet: {snippet}\n\n")
                        f.write(f"Full Content:\n{full_content}")
                    print(f"Saved: {filename}")
                    valid_count += 1
                    file_count += 1
                else:
                    print(f"Skipped: {link} (No valid content)")

            page_number += 1  # Move to the next page of results

    print(f"Finished processing all queries. Total files saved: {file_count - 1}")

Functionality: Orchestrates the complete data pipeline - processes search queries, fetches multiple pages of results, extracts full content, and saves structured datasets with metadata.

🎯 Key Integration Point:

This function serves as the bridge between AI-generated queries and the web scraping pipeline, ensuring seamless data flow from conceptual queries to structured datasets.

  • Input: List of search queries generated by generate_search_queries()
  • Processing:
    • Creates output directory if it doesn't exist
    • Processes each query sequentially
    • Fetches multiple pages of search results using pagination
    • Extracts full content from each valid URL
    • Saves structured data files with metadata
  • Output: Multiple text files in the dataset directory, each containing:
    • Original search query
    • Page title and URL
    • Search result snippet
    • Full extracted content
  • Quality Control: Only saves documents with valid, extractable content

Pipeline Integration:

Dataset Structure Example:

dataset/
β”œβ”€β”€ doc_1.txt
β”œβ”€β”€ doc_2.txt
β”œβ”€β”€ doc_3.txt
└── ...

Each file contains structured information ready for knowledge graph construction and RAG pipeline processing.

6. Complete System Integration & Execution

This example demonstrates the end-to-end workflow of the financial sentiment analysis system, from user input to dataset creation.

End-to-End System Execution

Purpose: Demonstrates the complete integration of all system components to transform user input into a structured dataset

# User provides financial analysis topic
user_input = "Financial sentiment analysis for the electric vehicle sector in the US"

# Step 1: Generate targeted search queries using AI
queries = generate_search_queries(user_input)
print("Generated Queries:")
for i, query in enumerate(queries, 1):
    print(f"  {i}. {query}")

# Step 2: Execute complete dataset creation pipeline
create_dataset_from_queries(queries)

Workflow: This code demonstrates the complete automation pipeline that transforms a simple user request into a comprehensive dataset for financial analysis.

  • Step 1 - User Input: Define the financial analysis topic
    • user_input = "Financial sentiment analysis for the electric vehicle sector in the US"
  • Step 2 - Query Generation: AI-powered search query creation
    • Calls generate_search_queries(user_input)
    • Returns 5-7 targeted search queries
    • Displays generated queries for verification
  • Step 3 - Dataset Creation: Automated data collection pipeline
    • Executes create_dataset_from_queries(queries)
    • Processes all queries sequentially
    • Creates structured dataset files

Expected Output:

Generated Queries:
  1. "electric vehicle market trends US 2024 financial analysis"
  2. "EV sector stock performance US investor sentiment"
  3. "electric vehicle company earnings reports US market"
  4. "US EV industry government policies and financial impact"
  5. "electric vehicle consumer adoption rates US financial outlook"
  6. "EV battery technology investments US market sentiment"
  7. "electric vehicle charging infrastructure financial analysis US"

Processing query: electric vehicle market trends US 2024 financial analysis
Fetching search results, page 1...
Saved: dataset/doc_1.txt
Saved: dataset/doc_2.txt
...
Finished processing all queries. Total files saved: 68

System Architecture Flow:

User Input β†’ AI Query Generation β†’ Google Search API β†’ Web Content Extraction β†’ Structured Dataset

Generated Query Examples:

Final Dataset Structure:

dataset/
β”œβ”€β”€ doc_1.txt
β”œβ”€β”€ doc_2.txt
β”œβ”€β”€ doc_3.txt
β”œβ”€β”€ ...
└── doc_68.txt

Each file contains:
Query: [original search query]
Title: [webpage title]
Link: [source URL]
Snippet: [search result summary]
Full Content: [complete extracted text]

This integrated execution demonstrates how the system transforms a simple user request into a comprehensive, structured dataset ready for knowledge graph construction and advanced financial sentiment analysis using RAG pipelines.

πŸ—οΈ System Architecture Overview

1

Data Ingestion Layer

  • Google Custom Search API
  • Web Content Extraction
  • Query Generation (GPT-4)
2

Knowledge Graph Layer

  • SimplePropertyGraphStore
  • Entity Extraction
  • Relationship Mapping
3

Retrieval Layer

  • Graph-based Retrieval
  • Semantic Search
  • Context-Aware Queries

Data Flow: User Input β†’ Query Generation β†’ Web Scraping β†’ Knowledge Graph β†’ Intelligent Retrieval

βœ“ All components are fully integrated and operational
πŸ•ΈοΈ

7. Knowledge Graph Construction with PropertyGraphIndex

Transform unstructured text into structured knowledge with entity extraction and relationship mapping

Knowledge Graph Initialization

Purpose: Creates a structured knowledge graph from the collected dataset using entity extraction and embedding models

# Initialize in-memory graph store
graph_store = SimplePropertyGraphStore()

# Load documents from dataset directory
documents = SimpleDirectoryReader("dataset").load_data()

# Create Property Graph Index with entity extraction
index = PropertyGraphIndex.from_documents(
    documents,
    embed_model=OpenAIEmbedding(model_name="text-embedding-3-small"),
    kg_extractors=[
        SchemaLLMPathExtractor(
            llm=OpenAI(model="gpt-3.5-turbo", temperature=0.0)
        )
    ],
    property_graph_store=graph_store,  # Uses in-memory store
    show_progress=True,
    use_async=True
)

print("βœ… Knowledge Graph created successfully with SimplePropertyGraphStore!")
print("βœ… No external database required!")

Components:

  • SimplePropertyGraphStore: In-memory graph database for storing entities and relationships
    • No external database setup required
    • Efficient for medium-sized datasets
  • SimpleDirectoryReader: Loads all text files from the dataset directory
    • Automatically processes multiple document formats
    • Maintains document metadata
  • PropertyGraphIndex Configuration:
    • Embedding Model: OpenAI's text-embedding-3-small for vector representations
    • Entity Extraction: SchemaLLMPathExtractor with GPT-3.5-turbo
    • Progress Tracking: Real-time progress indicators
    • Async Processing: Parallel processing for faster indexing

Knowledge Graph Structure:

Output:

Loading documents: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 68/68 [00:02<00:00, 25.43it/s]
Extracting entities: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 68/68 [01:15<00:00, 1.11s/it]
Creating graph structure: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 68/68 [00:45<00:00, 1.51it/s]
βœ… Knowledge Graph created successfully with SimplePropertyGraphStore!
βœ… No external database required!

8. Knowledge Graph Persistence & Storage Management

This section covers the persistence and loading mechanisms for the knowledge graph, enabling long-term storage and retrieval.

Storage Operations

Purpose: Save the knowledge graph to disk and reload it for future sessions

# Persist the knowledge graph to storage directory
index.storage_context.persist(persist_dir="./storage")

# Load index from storage in future sessions
from llama_index.core import StorageContext, load_index_from_storage

index = load_index_from_storage(
    StorageContext.from_defaults(persist_dir="./storage")
)

# Alternative: Load from existing graph store
index = PropertyGraphIndex.from_existing(
    property_graph_store=graph_store
)

Storage Operations:

  • Persistence: index.storage_context.persist()
    • Saves graph structure, embeddings, and metadata to disk
    • Creates organized storage directory structure
    • Enables session recovery without re-processing
  • Loading: load_index_from_storage()
    • Restores complete knowledge graph state
    • Maintains all entity relationships and embeddings
    • Fast initialization for subsequent sessions
  • Alternative Loading: PropertyGraphIndex.from_existing()
    • Direct loading from graph store instance
    • Useful for in-memory operations
    • Bypasses disk I/O for performance

Storage Directory Structure:

storage/
β”œβ”€β”€ graph_store.json
β”œβ”€β”€ index_store.json
β”œβ”€β”€ docstore.json
β”œβ”€β”€ vector_store.json
└── image__vector_store.json

Benefits:

9. Intelligent Retrieval from Knowledge Graph

This section demonstrates how to query the knowledge graph using sophisticated retrieval mechanisms for financial analysis.

Graph-Based Retrieval System

Purpose: Query the knowledge graph to extract relevant financial insights and relationships

# Define retriever with graph-only mode
retriever = index.as_retriever(
    include_text=False,  # Default is true
)

# Execute retrieval query
results = retriever.retrieve("What is the summary of the financial texts?")

# Display retrieved results
for record in results:
    print(record.text)

Retrieval Configuration:

  • Retriever Setup: index.as_retriever()
    • include_text=False: Focuses on graph structure only
    • Leverages entity relationships for retrieval
    • Uses semantic similarity in graph space
  • Query Execution: retriever.retrieve()
    • Processes natural language queries
    • Traverses knowledge graph relationships
    • Returns ranked results based on relevance
  • Result Processing: Iterates through retrieved records
    • Each record contains extracted entities and relationships
    • Structured format for easy consumption
    • Ready for further analysis or display

Example Query Results:

The financial texts analyze the electric vehicle sector in the US, focusing on:
- Market growth projections showing 35% CAGR through 2028
- Key players: Tesla, Ford, GM, and emerging startups like Rivian
- Investment trends: $120B committed to EV infrastructure
- Regulatory impacts: Federal tax incentives and state-level mandates
- Consumer adoption: Current 7% market share projected to reach 25% by 2030
- Supply chain challenges: Battery material shortages and manufacturing scaling

Advanced Retrieval Features:

Sample Queries for Financial Analysis:

"Show me companies investing in battery technology"
"What are the main challenges facing EV startups?"
"Compare market positions of Tesla and traditional automakers"
"Analyze the impact of government policies on EV adoption"
"Identify key growth drivers in the electric vehicle sector"

This retrieval system transforms the knowledge graph into an intelligent query engine that can provide comprehensive financial insights by leveraging the structured relationships between entities extracted from the original dataset.