Build an AI-Powered MCP pipeline with Mage Pro — Part II: Building the intelligence layer, connect your documents to Claude via MCP

Build an AI-Powered MCP pipeline with Mage Pro — Part II: Building the intelligence layer, connect your documents to Claude via MCP

Mage Pro

Your AI data engineer

Share on LinkedIn

June 4, 2025

TLDR

We’re continuing our goal to build “Attractor,” an AI system that answers questions about Space and Physics using a research paper by Christopher Cillian O’neill, “Evidence of the Great Attractor and Great Repeller from Artificial Neural Network Imputation of Sloan Digital Sky Survey.” In Part I, we built the data foundation, extracting, cleaning, and chunking PDF content to prepare it for AI integration.

In Part II we’ll focus on implementing an intelligence layer that connects the processed documents to Anthropic’s API through a Model Context Protocol (MCP) server. We’ll implement the core functionality where an MCP server manages document search and retrieval, then sends relevant content to Claude for analysis. The we’ll return an intelligent answer.

Table of contents

  • Introduction

  • Understanding MCP in practice

  • Implement an MCP server in Mage Pro

  • Call Anthropic’s Claude

  • Export the answer for external systems

  • Conclusion

Introduction

In Part I, we built the data processing foundation of our Model Context Protocol (MCP) pipeline - extracting, cleaning, and chunking PDF content about the Great Attractor. Now we'll bring our "Attractor" AI to life by integrating Anthropic's Claude API. Part II focuses on the intelligence layer where we’ll connect our processed document chunks to Claude through an MCP server. Instead of relying on Claude's general training data, our AI will answer questions based strictly on the research paper we processed. This will create more reliable and traceable responses which is perfect for internal or external business applications that need to answer questions from company knowledge assets.


Source: GIPHY

Understanding MCP in practice

Unlike traditional RAG that directly embed and query document chunks, leveraging MCP creates a structured, server-client architecture for document interaction. Attractor, the name of our AI tool, acts as a dedicated document service that organizes PDF chunks into searchable content. It provides tools that can understand questions and find relevant answers from the document.

When a question is asked, the process follows the MCP protocol in several steps:

  1. Connection: The client establishes a JSON-RPC connection with the document server

  2. Discovery: The client discovers available tools and resources from the server

  3. Search: The query is processed through the search tool to find relevant chunks

  4. Retrieval: Specific chunks are retrieved using their resource URIs

  5. Analysis: The retrieved content is sent to Claude via the Anthropic API for final analysis

Taking this approach separates the document handling from the AI Processing and follows the new MCP standard. It will work well with other tools and only send the most relevant document sections to Claude. This will save costs and give better answers. Check out the code below to build you MCP server if you are following along with this project:

#!/usr/bin/env python3
import asyncio
import json
import os
import sys

try:
    from mcp.server import Server, NotificationOptions
    from mcp.server.models import InitializationOptions
    from mcp.server.stdio import stdio_server
    from mcp.types import (
        Resource,
        Tool,
        TextContent,
    )
    print("All imports successful", file=sys.stderr)
except ImportError as e:
    print(f"Import error: {e}", file=sys.stderr)
    sys.exit(1)

# Initialize the MCP server
server = Server("simple-document-server")
DOCUMENT_CHUNKS = []

@server.list_resources()
async def handle_list_resources():
    """List available document resources"""
    print(f"Listing {len(DOCUMENT_CHUNKS)} resources", file=sys.stderr)
    resources = []
    for chunk in DOCUMENT_CHUNKS:
        resources.append(
            Resource(
                uri=f"document://great_attractor/chunk_{chunk['chunk_id']}",
                name=f"Document Chunk {chunk['chunk_id']}",
                description=f"Section {chunk['chunk_id']} of the Great Attractor research paper",
                mimeType="text/plain"
            )
        )
    return resources

@server.read_resource()
async def handle_read_resource(uri: str):
    """Read content from a specific document resource"""
    # Convert URI to string if it's a Pydantic URL object
    uri_str = str(uri)
    print(f"Reading resource: {uri_str}", file=sys.stderr)
    
    if uri_str.startswith("document://great_attractor/chunk_"):
        try:
            chunk_id = int(uri_str.split("_")[-1])
            print(f"Looking for chunk_id: {chunk_id}", file=sys.stderr)
            
            # Ensure we have the right data structure
            chunks_to_search = DOCUMENT_CHUNKS
            if isinstance(DOCUMENT_CHUNKS, dict) and 'chunks' in DOCUMENT_CHUNKS:
                chunks_to_search = DOCUMENT_CHUNKS['chunks']
            
            for chunk in chunks_to_search:
                if chunk.get("chunk_id") == chunk_id:
                    content = chunk.get("text", "")
                    print(f"Found chunk {chunk_id}, returning {len(content)} characters", file=sys.stderr)
                    return content
            
            print(f"Chunk {chunk_id} not found in {len(chunks_to_search)} available chunks", file=sys.stderr)
            raise ValueError(f"Chunk {chunk_id} not found")
            
        except (ValueError, IndexError) as e:
            print(f"Error parsing chunk_id from {uri_str}: {e}", file=sys.stderr)
            raise ValueError(f"Invalid chunk URI: {uri_str}")
    
    raise ValueError(f"Resource not found: {uri_str}")

@server.list_tools()
async def handle_list_tools():
    """List available tools"""
    return [
        Tool(
            name="search_document",
            description="Search for relevant document chunks",
            inputSchema={
                "type": "object",
                "properties": {
                    "query": {"type": "string"},
                    "max_results": {"type": "integer", "default": 3}
                },
                "required": ["query"]
            }
        )
    ]

@server.call_tool()
async def handle_call_tool(name: str, arguments: dict):
    """Handle tool calls"""
    print(f"Tool called: {name} with args: {arguments}", file=sys.stderr)
    print(f"Available chunks: {len(DOCUMENT_CHUNKS)}", file=sys.stderr)
    
    if name == "search_document":
        query = arguments.get("query", "")
        max_results = arguments.get("max_results", 3)
        
        print(f"Searching for: {query} with max_results: {max_results}", file=sys.stderr)
        print(f"DOCUMENT_CHUNKS contains: {len(DOCUMENT_CHUNKS)} chunks", file=sys.stderr)
        
        try:
            # Ensure DOCUMENT_CHUNKS is a list
            chunks_to_search = DOCUMENT_CHUNKS
            if isinstance(DOCUMENT_CHUNKS, dict):
                if 'chunks' in DOCUMENT_CHUNKS:
                    chunks_to_search = DOCUMENT_CHUNKS['chunks']
                else:
                    # Convert dict values to list if it's a dict of chunks
                    chunks_to_search = list(DOCUMENT_CHUNKS.values())
            
            if not chunks_to_search:
                print("WARNING: No document chunks available for search!", file=sys.stderr)
                return [TextContent(
                    type="text",
                    text=json.dumps({
                        "chunk_ids": [],
                        "total_matches": 0,
                        "error": "No document chunks loaded"
                    })
                )]
            
            print(f"Searching through {len(chunks_to_search)} chunks", file=sys.stderr)
            
            # Simple search - return first few chunks that exist
            chunk_ids = []
            for i, chunk in enumerate(chunks_to_search):
                if i >= max_results:
                    break
                chunk_id = chunk.get("chunk_id", i) if isinstance(chunk, dict) else i
                chunk_ids.append(chunk_id)
            
            result = {
                "chunk_ids": chunk_ids,
                "total_matches": len(chunks_to_search),
                "query": query,
                "debug_chunks_type": str(type(chunks_to_search))
            }
            
            print(f"Search result: {result}", file=sys.stderr)
            
            return [TextContent(
                type="text",
                text=json.dumps(result)
            )]
            
        except Exception as e:
            print(f"Search error: {e}", file=sys.stderr)
            import traceback
            print(f"Full traceback: {traceback.format_exc()}", file=sys.stderr)
            return [TextContent(
                type="text",
                text=json.dumps({"error": str(e), "query": query})
            )]
    
    raise ValueError(f"Unknown tool: {name}")

async def main():
    print("Starting MCP server", file=sys.stderr)
    
    # Load document chunks from environment variable
    chunks_env = os.getenv("DOCUMENT_CHUNKS")
    global DOCUMENT_CHUNKS
    
    if chunks_env:
        try:
            DOCUMENT_CHUNKS = json.loads(chunks_env)
            print(f"Loaded {len(DOCUMENT_CHUNKS)} chunks from environment", file=sys.stderr)
            
            # Debug: print first chunk info
            if DOCUMENT_CHUNKS:
                print(f"DOCUMENT_CHUNKS type: {type(DOCUMENT_CHUNKS)}", file=sys.stderr)
                
                if isinstance(DOCUMENT_CHUNKS, list):
                    first_chunk = DOCUMENT_CHUNKS[0]
                    print(f"First chunk keys: {list(first_chunk.keys())}", file=sys.stderr)
                    print(f"First chunk ID: {first_chunk.get('chunk_id', 'NO_ID')}", file=sys.stderr)
                    print(f"First chunk text preview: {first_chunk.get('text', 'NO_TEXT')[:100]}...", file=sys.stderr)
                elif isinstance(DOCUMENT_CHUNKS, dict):
                    print(f"DOCUMENT_CHUNKS is a dict with keys: {list(DOCUMENT_CHUNKS.keys())}", file=sys.stderr)
                    # Convert dict to list if needed
                    if 'chunks' in DOCUMENT_CHUNKS:
                        DOCUMENT_CHUNKS = DOCUMENT_CHUNKS['chunks']
                        print(f"Extracted chunks list with {len(DOCUMENT_CHUNKS)} items", file=sys.stderr)
                    else:
                        print("DOCUMENT_CHUNKS is a dict but no 'chunks' key found", file=sys.stderr)
                else:
                    print(f"DOCUMENT_CHUNKS is unexpected type: {type(DOCUMENT_CHUNKS)}", file=sys.stderr)
            else:
                print("DOCUMENT_CHUNKS is empty", file=sys.stderr)
            
        except json.JSONDecodeError as e:
            print(f"Failed to parse DOCUMENT_CHUNKS: {e}", file=sys.stderr)
            print(f"Raw DOCUMENT_CHUNKS: {chunks_env[:200]}...", file=sys.stderr)
            sys.exit(1)
    else:
        print("No DOCUMENT_CHUNKS environment variable found", file=sys.stderr)
        print("Available env vars:", [k for k in os.environ.keys() if 'CHUNK' in k.upper()], file=sys.stderr)
    
    # Run the server
    try:
        print("Creating stdio server streams", file=sys.stderr)
        async with stdio_server() as (read_stream, write_stream):
            print("Server streams created, running server", file=sys.stderr)
            
            # Create server capabilities
            capabilities = server.get_capabilities(
                notification_options=NotificationOptions(),
                experimental_capabilities={},
            )
            print(f"Server capabilities created: {type(capabilities)}", file=sys.stderr)
            
            # Initialize and run server
            init_options = InitializationOptions(
                server_name="simple-document-server",
                server_version="1.0.0",
                capabilities=capabilities,
            )
            print("Starting server.run()", file=sys.stderr)
            
            await server.run(read_stream, write_stream, init_options)
            print("Server.run() completed", file=sys.stderr)
            
    except EOFError:
        print("Server received EOF (stdin closed) - this is normal for testing", file=sys.stderr)
    except KeyboardInterrupt:
        print("Server interrupted by user", file=sys.stderr)
    except Exception as e:
        print(f"Server error: {e}", file=sys.stderr)
        print(f"Error type: {type(e)}", file=sys.stderr)
        import traceback
        print(f"Full traceback: {traceback.format_exc()}", file=sys.stderr)
        raise

if __name__ == "__main__":
    try:
        print("Starting asyncio main loop", file=sys.stderr)
        asyncio.run(main())
        print("Main loop completed normally", file=sys.stderr)
    except KeyboardInterrupt:
        print("Server interrupted by user", file=sys.stderr)
        sys.exit(0)
    except Exception as e:
        print(f"Fatal error: {e}", file=sys.stderr)
        print(f"Error type: {type(e)}", file=sys.stderr)
        import traceback
        print(f"Full traceback: {traceback.format_exc()}", file=sys.stderr)
        sys.exit(1)

Implement an MCP server in Mage Pro

Once you’ve written or copied the code above you’ll need to create and store a file in Mage Pro for your pipeline to reference. Take the following steps to complete this process:

  1. Navigate to your folders in Mage Pro and right click on the folder where you want to store the new python file

  2. Give the python file a name similar to mcp_document_server.py

  3. Click on the new file to open it in the editor and then copy on the MCP server code from above.

  4. Save the file.

  5. You may need to adjust the code in the Call Anthropic’s Claude section depending on where you stored the file, and what you named the file.

    1. The line of code where you refer to the file starts on lines 17 and 24 of the code. You will need to adjust the file path to match where you saved the MCP document server code.

       env = os.environ.copy()
       env["DOCUMENT_CHUNKS"] = json.dumps(data.to_dict() if hasattr(data, 'to_dict') else data)
       env["PATH"] = "/usr/local/bin:/usr/bin:/bin"
       
       try:
           print(f"Asking MCP server: {question}")
           
           # Start MCP server
           server_process = subprocess.Popen([
               "python", "/home/src/demo/mcp_document_server.py"
           ],

Once you complete the steps above you can add a new block of code that will call anthropic to return an analytical answer based on the document provided.

Call Anthropic’s Claude

The final transformation block serves as the intelligence layer of our MCP system. it orchestrates communication between the processed document chunks and the Anathropic API. This block:

  • Establishes a JSON → RPC connection with the MCP server

  • Searches for relevant document chunks using the "search_document" tool

  • Retrieves specific content through resource URIs

  • Sends the retrieved document content to Claude with explicit instructions to answer questions strictly based on the provided PDF context

  • Prevents the AI from using external knowledge outside the document

  • Returns a structured response containing the original question, Claude's document-grounded analysis, and method metadata

This creates a complete audit trail from source document to AI-generated insights that exemplifies the core principles of MCP.

from mage_ai.data_preparation.shared.secrets import get_secret_value
import subprocess
import json
import os
import time
import anthropic

@transformer
def mcp_retrieval_and_claude_analysis(data, **kwargs):
   """Get MCP content and analyze with Claude API in one step"""
   
   question = "What is the Great Attractor and what did the research discover about it?"
   
   # Set up environment
   env = os.environ.copy()
   env["DOCUMENT_CHUNKS"] = json.dumps(data.to_dict() if hasattr(data, 'to_dict') else data)
   env["PATH"] = "/usr/local/bin:/usr/bin:/bin"
   
   try:
       print(f"Asking MCP server: {question}")
       
       # Start MCP server
       server_process = subprocess.Popen([
           "python", "/home/src/demo/mcp_document_server.py"
       ], 
       stdin=subprocess.PIPE, 
       stdout=subprocess.PIPE, 
       stderr=subprocess.PIPE,
       env=env,
       text=True
       )
       
       time.sleep(2)
       
       # Initialize MCP session
       init_msg = {"jsonrpc": "2.0", "id": 1, "method": "initialize", "params": {"protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": {"name": "mage-client", "version": "1.0.0"}}}
       server_process.stdin.write(json.dumps(init_msg) + "\\n")
       server_process.stdin.flush()
       server_process.stdout.readline()
       
       server_process.stdin.write(json.dumps({"jsonrpc": "2.0", "method": "notifications/initialized"}) + "\\n")
       server_process.stdin.flush()
       
       search_msg = {"jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": {"name": "search_document", "arguments": {"query": "Great Attractor research", "max_results": 3}}}
       server_process.stdin.write(json.dumps(search_msg) + "\\n")
       server_process.stdin.flush()
       
       search_response = server_process.stdout.readline()
       
       # Extract document content
       document_content = ""
       if search_response.strip():
           search_data = json.loads(search_response.strip())
           if "result" in search_data:
               search_result = json.loads(search_data["result"]["content"][0]["text"])
               chunk_ids = search_result.get("chunk_ids", [])
               
               for chunk_id in chunk_ids[:2]:
                   read_msg = {"jsonrpc": "2.0", "id": 10+chunk_id, "method": "resources/read", "params": {"uri": f"document://great_attractor/chunk_{chunk_id}"}}
                   server_process.stdin.write(json.dumps(read_msg) + "\\n")
                   server_process.stdin.flush()
                   
                   chunk_response = server_process.stdout.readline()
                   if chunk_response.strip():
                       try:
                           chunk_data = json.loads(chunk_response.strip())
                           if "result" in chunk_data and "contents" in chunk_data["result"]:
                               content = chunk_data["result"]["contents"][0]["text"]
                               document_content += content + "\\n\\n"
                       except json.JSONDecodeError:
                           continue
       
       server_process.terminate()
       server_process.wait(timeout=3)
       
       # Now call Claude API with the retrieved content
       if document_content.strip():
           client = anthropic.Anthropic(
               api_key=get_secret_value('CLAUDE_API_KEY')
           )
           
           message = client.messages.create(
               model="claude-3-5-sonnet-20241022",
               max_tokens=1000,
               messages=[
                   {
                       "role": "user",
                       "content": f"""You must answer this question ONLY using information from the provided document. Do not use any external knowledge or training data.

Question: {question}

Document content:
{document_content}

Instructions:
- Only use information explicitly stated in the document above
- If information is not in the document, say "Not mentioned in the document"
- Do not add external knowledge about the Great Attractor
- Base your analysis solely on what this research paper contains

Please analyze what this specific research discovered."""
                   }
               ]
           )
           
           claude_analysis = message.content[0].text
       else:
           claude_analysis = "No document content retrieved from MCP server"
       
       return {
           "question": question,
           "claude_analysis": claude_analysis,
           "method": "MCP + Claude API (PDF-only)"
       }
       
   except Exception as e:
       return {
           "question": question,
           "claude_analysis": f"Error: {str(e)}",
           "method": "MCP + Claude API (PDF-only)"
       }

Export the answer for external systems

For production pipelines, the data exporter block can serve as a gateway to write data to multiple locations if needed. Data can be written to:

Vector database storage: store your responses in a vector database to build a searchable knowledge base of previously answered questions.

Caching layer: cache your responses in Redis using question hashes to improve response times for repeat questions

Analytics pipeline: export responses to data warehouses like Snowflake or BigQuery to track performance metrics over time.

This clean separation between processing and output ensures that our AI system can integrate seamlessly with various downstream applications, from notebooks to web interfaces, making the knowledge contained in our PDF document accessible through a simple, consistent API response format.

if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter

@data_exporter
def export_claude_analysis(data, **kwargs):
    """Export only Claude's analysis"""
    
    print(data['claude_analysis'])

Conclusion

MCP powered document analysis is a significant improvement over traditional RAG approaches because it implements structured protocols for document interaction. The separation between document handling and AI processing creates more reliable responses. It also maintains clear traceability from source material through final analysis.

Some key advantages include:

  • Document-constrained AI responses that prevent hallucination by grounding Claude's analysis in specific source material

  • Modular architecture that scales across multiple documents and use cases

  • Seamless integration with existing business systems and workflows

  • Trustworthy knowledge systems for internal research, customer support, and decision-making processes

Whether you are processing research papers, creating a chat bot based on technical documentation, or building internal knowledge bases from proprietary business documents, this MCP approach provides the reliability and transparency that production AI applications will need.

Want to build a RAG pipeline using MCP methods discussed above? Schedule a free demo with Mage to get started today.

Your AI data engineer

Power data, streamline workflows, and scale effortlessly.