diff --git a/README.md b/README.md index 51f18c3..e737580 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,10 @@ A Model Context Protocol server for code indexing, searching, and analysis. + + code-index-mcp MCP server + + ## What is Code Index MCP? Code Index MCP is a specialized MCP server that provides intelligent code indexing and analysis capabilities. It enables Large Language Models to interact with your code repositories, offering real-time insights and navigation through complex codebases. @@ -19,8 +23,8 @@ This server integrates with the [Model Context Protocol](https://modelcontextpro ## Key Features - **Project Indexing**: Recursively scans directories to build a searchable index of code files -- **Advanced Search**: Intelligent search with automatic detection of ripgrep, ag, or grep for enhanced performance -- **Fuzzy Search**: Safe fuzzy matching with word boundaries for flexible code discovery +- **Advanced Search**: Intelligent search with automatic detection of ugrep, ripgrep, ag, or grep for enhanced performance +- **Fuzzy Search**: Native fuzzy matching with ugrep, or safe fuzzy patterns for other tools - **File Analysis**: Get detailed insights about file structure, imports, and complexity - **Smart Filtering**: Automatically ignores build directories, dependencies, and non-code files - **Persistent Storage**: Caches indexes for improved performance across sessions @@ -109,7 +113,7 @@ After adding the configuration, restart Claude Desktop and the Code Index MCP to - **set_project_path**: Sets the base project path for indexing. - **search_code**: Basic search for code matches within the indexed files. -- **search_code_advanced**: Enhanced search using external tools (ripgrep/ag/grep) with fuzzy matching support. +- **search_code_advanced**: Enhanced search using external tools (ugrep/ripgrep/ag/grep) with fuzzy matching support. - **find_files**: Finds files in the project matching a given pattern. - **get_file_summary**: Gets a summary of a specific file, including line count, functions, imports, etc. - **refresh_index**: Refreshes the project index. diff --git a/README_zh.md b/README_zh.md index 09ddb41..98b93ab 100644 --- a/README_zh.md +++ b/README_zh.md @@ -19,8 +19,8 @@ ## 主要特性 - **專案索引**:遞迴掃描目錄以建構可搜尋的程式碼檔案索引 -- **進階搜尋**:智慧搜尋,自動偵測 ripgrep、ag 或 grep 以提升效能 -- **模糊搜尋**:使用詞邊界的安全模糊匹配,提供靈活的程式碼發現 +- **進階搜尋**:智慧搜尋,自動偵測 ugrep、ripgrep、ag 或 grep 以提升效能 +- **模糊搜尋**:ugrep 的原生模糊匹配功能提供卓越的搜尋結果,或其他工具的安全模糊模式 - **檔案分析**:取得有關檔案結構、匯入和複雜性的詳細資訊 - **智慧篩選**:自動忽略建構目錄、相依套件和非程式碼檔案 - **持久儲存**:快取索引以提高跨工作階段的效能 @@ -109,7 +109,7 @@ python -m code_index_mcp - **set_project_path**:設定索引的基本專案路徑。 - **search_code**:在已索引檔案中進行基本程式碼搜尋。 -- **search_code_advanced**:使用外部工具 (ripgrep/ag/grep) 的增強搜尋,支援模糊匹配。 +- **search_code_advanced**:使用外部工具 (ugrep/ripgrep/ag/grep) 的增強搜尋,支援模糊匹配。 - **find_files**:尋找專案中符合給定模式的檔案。 - **get_file_summary**:取得特定檔案的摘要,包括行數、函式、匯入等。 - **refresh_index**:重新整理專案索引。 diff --git a/pyproject.toml b/pyproject.toml index 81a9302..94889d4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "code-index-mcp" -version = "0.2.1" +version = "0.3.0" description = "Code indexing and analysis tools for LLMs using MCP" readme = "README.md" requires-python = ">=3.10" diff --git a/src/code_index_mcp/__init__.py b/src/code_index_mcp/__init__.py index d7b7929..7651704 100644 --- a/src/code_index_mcp/__init__.py +++ b/src/code_index_mcp/__init__.py @@ -3,4 +3,4 @@ A Model Context Protocol server for code indexing, searching, and analysis. """ -__version__ = "0.2.1" +__version__ = "0.3.0" diff --git a/src/code_index_mcp/project_settings.py b/src/code_index_mcp/project_settings.py index 3459d6d..94b9da3 100644 --- a/src/code_index_mcp/project_settings.py +++ b/src/code_index_mcp/project_settings.py @@ -16,6 +16,39 @@ from .constants import ( SETTINGS_DIR, CONFIG_FILE, INDEX_FILE, CACHE_FILE ) +from .search.base import SearchStrategy +from .search.ugrep import UgrepStrategy +from .search.ripgrep import RipgrepStrategy +from .search.ag import AgStrategy +from .search.grep import GrepStrategy +from .search.basic import BasicSearchStrategy + + +# Prioritized list of search strategies +SEARCH_STRATEGY_CLASSES = [ + UgrepStrategy, + RipgrepStrategy, + AgStrategy, + GrepStrategy, + BasicSearchStrategy, +] + + +def _get_available_strategies() -> list[SearchStrategy]: + """ + Detect and return a list of available search strategy instances, + ordered by preference. + """ + available = [] + for strategy_class in SEARCH_STRATEGY_CLASSES: + try: + strategy = strategy_class() + if strategy.is_available(): + available.append(strategy) + except Exception as e: + print(f"Error initializing strategy {strategy_class.__name__}: {e}") + return available + class ProjectSettings: """Class for managing project settings and index data""" @@ -29,7 +62,8 @@ def __init__(self, base_path, skip_load=False): """ self.base_path = base_path self.skip_load = skip_load - self._search_tools_cache = None # Lazy-loaded search tools configuration + self.available_strategies: list[SearchStrategy] = [] + self.refresh_available_strategies() # Ensure the base path of the temporary directory exists try: @@ -478,83 +512,31 @@ def get_stats(self): } def get_search_tools_config(self): - """Get search tools configuration with lazy loading. - - Returns: - dict: Search tools configuration with preferred tool and available tools - """ - if self._search_tools_cache is None: - print("Detecting available search tools...") - self._search_tools_cache = self._detect_search_tools() - print(f"Search tools detected. Preferred: {self._search_tools_cache.get('preferred_tool', 'basic')}") - - return self._search_tools_cache + """Get the configuration of available search tools. - def get_preferred_search_tool(self): - """Get the preferred search tool name. - Returns: - str: Name of preferred search tool ('ripgrep', 'ag', 'grep', or 'basic') + dict: A dictionary containing the list of available tool names. """ - config = self.get_search_tools_config() - return config.get('preferred_tool', 'basic') + return { + "available_tools": [s.name for s in self.available_strategies], + "preferred_tool": self.get_preferred_search_tool().name if self.available_strategies else None + } + + def get_preferred_search_tool(self) -> SearchStrategy | None: + """Get the preferred search tool based on availability and priority. - def _detect_search_tools(self): - """Detect available search tools on the system. - Returns: - dict: Configuration with available tools and preferred tool + SearchStrategy: An instance of the preferred search strategy, or None. """ - tools_info = { - 'detected_at': self._get_timestamp(), - 'available_tools': {}, - 'preferred_tool': 'basic' - } - - # Check tools in priority order: ripgrep > ag > grep - search_tools = [ - ('ripgrep', 'rg'), - ('ag', 'ag'), - ('grep', 'grep') - ] + if not self.available_strategies: + self.refresh_available_strategies() - for tool_name, command in search_tools: - is_available = self._is_tool_available(command) - tools_info['available_tools'][tool_name] = is_available - - # Set the first available tool as preferred - if is_available and tools_info['preferred_tool'] == 'basic': - tools_info['preferred_tool'] = tool_name - - return tools_info + return self.available_strategies[0] if self.available_strategies else None - def _is_tool_available(self, command): - """Check if a search tool is available on the system. - - Args: - command (str): Command to check (e.g., 'rg', 'ag', 'grep') - - Returns: - bool: True if tool is available, False otherwise + def refresh_available_strategies(self): """ - try: - result = subprocess.run( - [command, '--version'], - capture_output=True, - timeout=3, - check=False - ) - return result.returncode == 0 - except (FileNotFoundError, subprocess.TimeoutExpired, OSError): - return False - - def refresh_search_tools(self): - """Manually refresh search tools detection. - - Returns: - dict: Updated search tools configuration + Force a refresh of the available search tools list. """ - print("Refreshing search tools detection...") - self._search_tools_cache = self._detect_search_tools() - print(f"Search tools refreshed. Preferred: {self._search_tools_cache.get('preferred_tool', 'basic')}") - return self._search_tools_cache + print("Refreshing available search strategies...") + self.available_strategies = _get_available_strategies() + print(f"Available strategies found: {[s.name for s in self.available_strategies]}") diff --git a/src/code_index_mcp/search/__init__.py b/src/code_index_mcp/search/__init__.py new file mode 100644 index 0000000..f230a11 --- /dev/null +++ b/src/code_index_mcp/search/__init__.py @@ -0,0 +1 @@ +"""Search strategies package.""" diff --git a/src/code_index_mcp/search/ag.py b/src/code_index_mcp/search/ag.py new file mode 100644 index 0000000..f809fa2 --- /dev/null +++ b/src/code_index_mcp/search/ag.py @@ -0,0 +1,89 @@ +""" +Search Strategy for The Silver Searcher (ag) +""" +import shutil +import subprocess +from typing import Dict, List, Optional, Tuple + +from .base import SearchStrategy, parse_search_output, create_safe_fuzzy_pattern + +class AgStrategy(SearchStrategy): + """Search strategy using 'The Silver Searcher' (ag) command-line tool.""" + + @property + def name(self) -> str: + """The name of the search tool.""" + return 'ag' + + def is_available(self) -> bool: + """Check if 'ag' command is available on the system.""" + return shutil.which('ag') is not None + + def search( + self, + pattern: str, + base_path: str, + case_sensitive: bool = True, + context_lines: int = 0, + file_pattern: Optional[str] = None, + fuzzy: bool = False + ) -> Dict[str, List[Tuple[int, str]]]: + """ + Execute a search using The Silver Searcher (ag). + + Note: ag does not support native fuzzy searching. When fuzzy=True, a + safe fuzzy pattern with word boundaries is used for regex search. + When fuzzy=False, a literal string search is performed. + """ + # ag prints line numbers and groups by file by default, which is good. + # --noheading is used to be consistent with other tools' output format. + cmd = ['ag', '--noheading'] + + if not case_sensitive: + cmd.append('--ignore-case') + + # Prepare search pattern + search_pattern = pattern + if fuzzy: + # Use safe fuzzy pattern for regex search + search_pattern = create_safe_fuzzy_pattern(pattern) + else: + cmd.append('--literal') # or -Q + + if context_lines > 0: + cmd.extend(['--before', str(context_lines)]) + cmd.extend(['--after', str(context_lines)]) + + if file_pattern: + # Use -G to filter files by regex pattern + cmd.extend(['-G', file_pattern]) + + # Add -- to treat pattern as a literal argument, preventing injection + cmd.append('--') + cmd.append(search_pattern) + cmd.append(base_path) + + try: + # ag exits with 1 if no matches are found, which is not an error. + # It exits with 0 on success (match found). Other codes are errors. + process = subprocess.run( + cmd, + capture_output=True, + text=True, + encoding='utf-8', + errors='replace', + check=False # Do not raise CalledProcessError on non-zero exit + ) + # We don't check returncode > 1 because ag's exit code behavior + # is less standardized than rg/ug. 0 for match, 1 for no match. + # Any actual error will likely raise an exception or be in stderr. + if process.returncode > 1: + raise RuntimeError(f"ag failed with exit code {process.returncode}: {process.stderr}") + + return parse_search_output(process.stdout, base_path) + + except FileNotFoundError: + raise RuntimeError("'ag' (The Silver Searcher) not found. Please install it and ensure it's in your PATH.") + except Exception as e: + # Re-raise other potential exceptions like permission errors + raise RuntimeError(f"An error occurred while running ag: {e}") \ No newline at end of file diff --git a/src/code_index_mcp/search/base.py b/src/code_index_mcp/search/base.py new file mode 100644 index 0000000..50d97f8 --- /dev/null +++ b/src/code_index_mcp/search/base.py @@ -0,0 +1,141 @@ +""" +Search Strategies for Code Indexer + +This module defines the abstract base class for search strategies and will contain +concrete implementations for different search tools like ugrep, ripgrep, etc. +""" +import os +import re +import shutil +import subprocess +import sys +from abc import ABC, abstractmethod +from typing import Dict, List, Optional, Tuple, Any + +def parse_search_output(output: str, base_path: str) -> Dict[str, List[Tuple[int, str]]]: + """ + Parse the output of command-line search tools (grep, ag, rg). + + Args: + output: The raw output from the command-line tool. + base_path: The base path of the project to make file paths relative. + + Returns: + A dictionary where keys are file paths and values are lists of (line_number, line_content) tuples. + """ + results = {} + # Normalize base_path to ensure consistent path separation + normalized_base_path = os.path.normpath(base_path) + + for line in output.strip().split('\n'): + if not line.strip(): + continue + try: + # Handle Windows paths which might have a drive letter, e.g., C: + parts = line.split(':', 2) + if sys.platform == "win32" and len(parts[0]) == 1 and parts[1].startswith('\\'): + # Re-join drive letter with the rest of the path + file_path_abs = f"{parts[0]}:{parts[1]}" + line_number_str = parts[2].split(':', 1)[0] + content = parts[2].split(':', 1)[1] + else: + file_path_abs = parts[0] + line_number_str = parts[1] + content = parts[2] + + line_number = int(line_number_str) + + # Make the file path relative to the base_path + relative_path = os.path.relpath(file_path_abs, normalized_base_path) + + # Normalize path separators for consistency + relative_path = relative_path.replace('\\', '/') + + if relative_path not in results: + results[relative_path] = [] + results[relative_path].append((line_number, content)) + except (ValueError, IndexError): + # Silently ignore lines that don't match the expected format + # This can happen with summary lines or other tool-specific output + pass + + return results + + +def create_safe_fuzzy_pattern(pattern: str) -> str: + """ + Create safe fuzzy search patterns that are more permissive than exact match + but still safe from regex injection attacks. + + Args: + pattern: Original search pattern + + Returns: + Safe fuzzy pattern for extended regex + """ + # Escape any regex special characters to make them literal + escaped = re.escape(pattern) + + # Create fuzzy pattern that matches: + # 1. Word at start of word boundary (e.g., "test" in "testing") + # 2. Word at end of word boundary (e.g., "test" in "mytest") + # 3. Whole word (e.g., "test" as standalone word) + if len(pattern) >= 3: # Only for patterns of reasonable length + # This pattern allows partial matches at word boundaries + fuzzy_pattern = f"\\b{escaped}|{escaped}\\b" + else: + # For short patterns, require full word boundaries to avoid too many matches + fuzzy_pattern = f"\\b{escaped}\\b" + + return fuzzy_pattern + + +class SearchStrategy(ABC): + """ + Abstract base class for a search strategy. + + Each strategy is responsible for searching code using a specific tool or method. + """ + + @property + @abstractmethod + def name(self) -> str: + """The name of the search tool (e.g., 'ugrep', 'ripgrep').""" + pass + + @abstractmethod + def is_available(self) -> bool: + """ + Check if the search tool for this strategy is available on the system. + + Returns: + True if the tool is available, False otherwise. + """ + pass + + @abstractmethod + def search( + self, + pattern: str, + base_path: str, + case_sensitive: bool = True, + context_lines: int = 0, + file_pattern: Optional[str] = None, + fuzzy: bool = False + ) -> Dict[str, List[Tuple[int, str]]]: + """ + Execute a search using the specific strategy. + + Args: + pattern: The search pattern (string or regex). + base_path: The root directory to search in. + case_sensitive: Whether the search is case-sensitive. + context_lines: Number of context lines to show around each match. + file_pattern: Glob pattern to filter files (e.g., "*.py"). + fuzzy: Whether to enable fuzzy search. + + Returns: + A dictionary mapping filenames to lists of (line_number, line_content) tuples. + """ + pass + diff --git a/src/code_index_mcp/search/basic.py b/src/code_index_mcp/search/basic.py new file mode 100644 index 0000000..c247c2f --- /dev/null +++ b/src/code_index_mcp/search/basic.py @@ -0,0 +1,75 @@ +""" +Basic, pure-Python search strategy. +""" +import os +import re +from typing import Dict, List, Optional, Tuple + +from .base import SearchStrategy, create_safe_fuzzy_pattern + +class BasicSearchStrategy(SearchStrategy): + """ + A basic, pure-Python search strategy. + + This strategy iterates through files and lines manually. It's a fallback + for when no advanced command-line search tools are available. + It does not support context lines. + """ + + @property + def name(self) -> str: + """The name of the search tool.""" + return 'basic' + + def is_available(self) -> bool: + """This basic strategy is always available.""" + return True + + def search( + self, + pattern: str, + base_path: str, + case_sensitive: bool = True, + context_lines: int = 0, + file_pattern: Optional[str] = None, + fuzzy: bool = False + ) -> Dict[str, List[Tuple[int, str]]]: + """ + Execute a basic, line-by-line search. + + Note: This implementation does not support context_lines. + Fuzzy searching uses the shared create_safe_fuzzy_pattern function. + """ + results: Dict[str, List[Tuple[int, str]]] = {} + + flags = 0 if case_sensitive else re.IGNORECASE + + if fuzzy: + # Use the shared safe fuzzy pattern function + search_pattern = create_safe_fuzzy_pattern(pattern) + search_regex = re.compile(search_pattern, flags) + else: + search_regex = re.compile(pattern, flags) + + for root, _, files in os.walk(base_path): + for file in files: + # Basic file pattern matching (not full glob support) + if file_pattern and not file.endswith(file_pattern.replace('*', '')): + continue + + file_path = os.path.join(root, file) + rel_path = os.path.relpath(file_path, base_path) + + try: + with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: + for line_num, line in enumerate(f, 1): + if search_regex.search(line): + if rel_path not in results: + results[rel_path] = [] + # Strip newline for consistent output + results[rel_path].append((line_num, line.rstrip('\\n'))) + except Exception: + # Ignore files that can't be opened or read + continue + + return results \ No newline at end of file diff --git a/src/code_index_mcp/search/grep.py b/src/code_index_mcp/search/grep.py new file mode 100644 index 0000000..e8c9609 --- /dev/null +++ b/src/code_index_mcp/search/grep.py @@ -0,0 +1,90 @@ +""" +Search Strategy for standard grep +""" +import shutil +import subprocess +from typing import Dict, List, Optional, Tuple + +from .base import SearchStrategy, parse_search_output, create_safe_fuzzy_pattern + +class GrepStrategy(SearchStrategy): + """ + Search strategy using the standard 'grep' command-line tool. + + This is intended as a fallback for when more advanced tools like + ugrep, ripgrep, or ag are not available. + """ + + @property + def name(self) -> str: + """The name of the search tool.""" + return 'grep' + + def is_available(self) -> bool: + """Check if 'grep' command is available on the system.""" + return shutil.which('grep') is not None + + def search( + self, + pattern: str, + base_path: str, + case_sensitive: bool = True, + context_lines: int = 0, + file_pattern: Optional[str] = None, + fuzzy: bool = False + ) -> Dict[str, List[Tuple[int, str]]]: + """ + Execute a search using standard grep. + + Note: grep does not support native fuzzy searching. When fuzzy=True, an + Extended Regular Expression (ERE) search is performed with safe fuzzy pattern. + When fuzzy=False, a literal string search is performed (-F). + """ + # -r: recursive, -n: line number + cmd = ['grep', '-r', '-n'] + + # Prepare search pattern + search_pattern = pattern + if not fuzzy: + cmd.append('-F') # Fixed strings, literal search + else: + cmd.append('-E') # Extended Regular Expressions + search_pattern = create_safe_fuzzy_pattern(pattern) + + if not case_sensitive: + cmd.append('-i') + + if context_lines > 0: + cmd.extend(['-A', str(context_lines)]) + cmd.extend(['-B', str(context_lines)]) + + if file_pattern: + # Note: grep's --include uses glob patterns, not regex + cmd.append(f'--include={file_pattern}') + + # Add -- to treat pattern as a literal argument, preventing injection + cmd.append('--') + cmd.append(search_pattern) + cmd.append(base_path) + + try: + # grep exits with 1 if no matches are found, which is not an error. + # It exits with 0 on success (match found). >1 for errors. + process = subprocess.run( + cmd, + capture_output=True, + text=True, + encoding='utf-8', + errors='replace', + check=False + ) + + if process.returncode > 1: + raise RuntimeError(f"grep failed with exit code {process.returncode}: {process.stderr}") + + return parse_search_output(process.stdout, base_path) + + except FileNotFoundError: + raise RuntimeError("'grep' not found. Please install it and ensure it's in your PATH.") + except Exception as e: + raise RuntimeError(f"An error occurred while running grep: {e}") \ No newline at end of file diff --git a/src/code_index_mcp/search/ripgrep.py b/src/code_index_mcp/search/ripgrep.py new file mode 100644 index 0000000..8cc6946 --- /dev/null +++ b/src/code_index_mcp/search/ripgrep.py @@ -0,0 +1,82 @@ +""" +Search Strategy for ripgrep +""" +import shutil +import subprocess +from typing import Dict, List, Optional, Tuple + +from .base import SearchStrategy, parse_search_output, create_safe_fuzzy_pattern + +class RipgrepStrategy(SearchStrategy): + """Search strategy using the 'ripgrep' (rg) command-line tool.""" + + @property + def name(self) -> str: + """The name of the search tool.""" + return 'ripgrep' + + def is_available(self) -> bool: + """Check if 'rg' command is available on the system.""" + return shutil.which('rg') is not None + + def search( + self, + pattern: str, + base_path: str, + case_sensitive: bool = True, + context_lines: int = 0, + file_pattern: Optional[str] = None, + fuzzy: bool = False + ) -> Dict[str, List[Tuple[int, str]]]: + """ + Execute a search using ripgrep. + + Note: ripgrep does not support native fuzzy searching. When fuzzy=True, a + safe fuzzy pattern with word boundaries is used for regex search. + When fuzzy=False, a literal string search is performed with --fixed-strings. + """ + cmd = ['rg', '--line-number', '--no-heading', '--color=never'] + + if not case_sensitive: + cmd.append('--ignore-case') + + # Prepare search pattern + search_pattern = pattern + if fuzzy: + # Use safe fuzzy pattern for regex search + search_pattern = create_safe_fuzzy_pattern(pattern) + else: + cmd.append('--fixed-strings') + + if context_lines > 0: + cmd.extend(['--context', str(context_lines)]) + + if file_pattern: + cmd.extend(['--glob', file_pattern]) + + # Add -- to treat pattern as a literal argument, preventing injection + cmd.append('--') + cmd.append(search_pattern) + cmd.append(base_path) + + try: + # ripgrep exits with 1 if no matches are found, which is not an error. + # It exits with 2 for actual errors. + process = subprocess.run( + cmd, + capture_output=True, + text=True, + encoding='utf-8', + errors='replace', + check=False # Do not raise CalledProcessError on non-zero exit + ) + if process.returncode > 1: + raise RuntimeError(f"ripgrep failed with exit code {process.returncode}: {process.stderr}") + + return parse_search_output(process.stdout, base_path) + + except FileNotFoundError: + raise RuntimeError("ripgrep (rg) not found. Please install it and ensure it's in your PATH.") + except Exception as e: + # Re-raise other potential exceptions like permission errors + raise RuntimeError(f"An error occurred while running ripgrep: {e}") \ No newline at end of file diff --git a/src/code_index_mcp/search/ugrep.py b/src/code_index_mcp/search/ugrep.py new file mode 100644 index 0000000..4222602 --- /dev/null +++ b/src/code_index_mcp/search/ugrep.py @@ -0,0 +1,79 @@ +""" +Search Strategy for ugrep +""" +import shutil +import subprocess +from typing import Dict, List, Optional, Tuple + +from .base import SearchStrategy, parse_search_output + +class UgrepStrategy(SearchStrategy): + """Search strategy using the 'ugrep' (ug) command-line tool.""" + + @property + def name(self) -> str: + """The name of the search tool.""" + return 'ugrep' + + def is_available(self) -> bool: + """Check if 'ug' command is available on the system.""" + return shutil.which('ug') is not None + + def search( + self, + pattern: str, + base_path: str, + case_sensitive: bool = True, + context_lines: int = 0, + file_pattern: Optional[str] = None, + fuzzy: bool = False + ) -> Dict[str, List[Tuple[int, str]]]: + """ + Execute a search using the 'ug' command-line tool. + """ + if not self.is_available(): + return {"error": "ugrep (ug) command not found."} + + cmd = ['ug', '--line-number', '--no-heading'] + + if fuzzy: + cmd.append('--fuzzy') # Enable fuzzy search (long form for clarity) + else: + cmd.append('--fixed-strings') # Use fixed-strings for non-fuzzy search + + if not case_sensitive: + cmd.append('--ignore-case') + + if context_lines > 0: + cmd.extend(['-A', str(context_lines), '-B', str(context_lines)]) + + if file_pattern: + cmd.extend(['--include', file_pattern]) # Correct parameter for file patterns + + # Add '--' to treat pattern as a literal argument, preventing injection + cmd.append('--') + cmd.append(pattern) + cmd.append(base_path) + + try: + process = subprocess.run( + cmd, + capture_output=True, + text=True, + encoding='utf-8', + errors='ignore', # Ignore decoding errors for binary-like content + check=False # Do not raise exception on non-zero exit codes + ) + + # ugrep exits with 1 if no matches are found, which is not an error for us. + # It exits with 2 for actual errors. + if process.returncode > 1: + error_output = process.stderr.strip() + return {"error": f"ugrep execution failed with code {process.returncode}", "details": error_output} + + return parse_search_output(process.stdout, base_path) + + except FileNotFoundError: + return {"error": "ugrep (ug) command not found. Please ensure it's installed and in your PATH."} + except Exception as e: + return {"error": f"An unexpected error occurred during search: {str(e)}"} diff --git a/src/code_index_mcp/server.py b/src/code_index_mcp/server.py index d2ec5b6..56c9e1c 100644 --- a/src/code_index_mcp/server.py +++ b/src/code_index_mcp/server.py @@ -266,10 +266,10 @@ def set_project_path(path: str, ctx: Context) -> str: # Get search capabilities info search_tool = ctx.request_context.lifespan_context.settings.get_preferred_search_tool() - if search_tool == 'basic': + if search_tool is None: search_info = " Basic search available." else: - search_info = f" Advanced search enabled ({search_tool})." + search_info = f" Advanced search enabled ({search_tool.name})." return f"Project path set to: {abs_path}. Loaded existing index with {file_count} files.{search_info}" else: @@ -293,73 +293,15 @@ def set_project_path(path: str, ctx: Context) -> str: # Get search capabilities info (this will trigger lazy detection) search_tool = ctx.request_context.lifespan_context.settings.get_preferred_search_tool() - if search_tool == 'basic': + if search_tool is None: search_info = " Basic search available." else: - search_info = f" Advanced search enabled ({search_tool})." + search_info = f" Advanced search enabled ({search_tool.name})." return f"Project path set to: {abs_path}. Indexed {file_count} files.{search_info}" except Exception as e: return f"Error setting project path: {e}" -@mcp.tool() -def search_code(query: str, ctx: Context, extensions: Optional[List[str]] = None, case_sensitive: bool = False) -> Dict[str, List[Tuple[int, str]]]: - """ - Search for code matches within the indexed files. - Returns a dictionary mapping filenames to lists of (line_number, line_content) tuples. - """ - base_path = ctx.request_context.lifespan_context.base_path - - # Check if base_path is set - if not base_path: - return {"error": "Project path not set. Please use set_project_path to set a project directory first."} - - # Check if we need to index the project - if not file_index: - _index_project(base_path) - ctx.request_context.lifespan_context.file_count = _count_files(file_index) - ctx.request_context.lifespan_context.settings.save_index(file_index) - - results = {} - - # Filter by extensions if provided - if extensions: - valid_extensions = [ext if ext.startswith('.') else f'.{ext}' for ext in extensions] - else: - valid_extensions = supported_extensions - - # Process the search - for file_path, _info in _get_all_files(file_index): - # Check if the file has a supported extension - if not any(file_path.endswith(ext) for ext in valid_extensions): - continue - - try: - # Get file content (from cache if available) - if file_path in code_content_cache: - content = code_content_cache[file_path] - else: - full_path = os.path.join(base_path, file_path) - with open(full_path, 'r', encoding='utf-8') as f: - content = f.read() - code_content_cache[file_path] = content - - # Search for matches - matches = [] - for i, line in enumerate(content.splitlines(), 1): - if (case_sensitive and query in line) or (not case_sensitive and query.lower() in line.lower()): - matches.append((i, line.strip())) - - if matches: - results[file_path] = matches - except Exception as e: - ctx.info(f"Error searching file {file_path}: {e}") - - # Save the updated cache - ctx.request_context.lifespan_context.settings.save_cache(code_content_cache) - - return results - @mcp.tool() def search_code_advanced( pattern: str, @@ -370,67 +312,51 @@ def search_code_advanced( fuzzy: bool = False ) -> Dict[str, Any]: """ - Advanced search using external tools (ripgrep, ag, grep) for better performance. - Falls back to basic search if no advanced tools are available. + Search for a code pattern in the project using an advanced, fast tool. - Args: - pattern: Search pattern - case_sensitive: Case sensitive search - context_lines: Number of context lines to show around matches - file_pattern: File pattern to include (e.g., "*.py", "*.js") - fuzzy: Enable safe fuzzy search patterns: - - Adds word boundary matching for better results - - Allows partial word matching at word boundaries - - Safe alternative to full regex support + This tool automatically selects the best available command-line search tool + (like ugrep, ripgrep, ag, or grep) for maximum performance. + Args: + pattern: The search pattern (can be a regex if fuzzy=True). + case_sensitive: Whether the search should be case-sensitive. + context_lines: Number of lines to show before and after the match. + file_pattern: A glob pattern to filter files to search in (e.g., "*.py"). + fuzzy: If True, treats the pattern as a regular expression. + If False, performs a literal/fixed-string search. + For 'ugrep', this enables fuzzy matching features. + Returns: - Dict containing search results and metadata + A dictionary containing the search results or an error message. """ base_path = ctx.request_context.lifespan_context.base_path - - # Check if base_path is set if not base_path: - return {"error": "Project path not set. Please use set_project_path to set a project directory first."} - - # Get search tool configuration + return {"error": "Project path not set. Please use set_project_path first."} + settings = ctx.request_context.lifespan_context.settings - preferred_tool = settings.get_preferred_search_tool() - - if preferred_tool == 'basic': - # Fallback to existing search_code function - ctx.info("Using basic search (no advanced tools available)") - return { - "tool_used": "basic", - "results": search_code(pattern, ctx, case_sensitive=case_sensitive) - } - + strategy = settings.get_preferred_search_tool() + + if not strategy: + return {"error": "No search strategies available. This is unexpected."} + + print(f"Using search strategy: {strategy.name}") + try: - # Use advanced search tool - results = _execute_advanced_search( - pattern, base_path, preferred_tool, case_sensitive, context_lines, file_pattern, fuzzy + results = strategy.search( + pattern=pattern, + base_path=base_path, + case_sensitive=case_sensitive, + context_lines=context_lines, + file_pattern=file_pattern, + fuzzy=fuzzy ) - - return { - "tool_used": preferred_tool, - "results": results, - "total_matches": sum(len(matches) for matches in results.values()) - } - + return {"results": results} except Exception as e: - ctx.info(f"Advanced search failed: {e}, falling back to basic search") - # Fallback to basic search - return { - "tool_used": "basic_fallback", - "results": search_code(pattern, ctx, case_sensitive=case_sensitive), - "fallback_reason": str(e) - } + return {"error": f"Search failed using '{strategy.name}': {e}"} @mcp.tool() def find_files(pattern: str, ctx: Context) -> List[str]: - """ - Find files in the project that match the given pattern. - Supports glob patterns like *.py or **/*.js. - """ + """Find files in the project matching a specific glob pattern.""" base_path = ctx.request_context.lifespan_context.base_path # Check if base_path is set @@ -702,55 +628,22 @@ def check_temp_directory() -> Dict[str, Any]: @mcp.tool() def clear_settings(ctx: Context) -> str: """Clear all settings and cached data.""" - base_path = ctx.request_context.lifespan_context.base_path - - # Check if base_path is set - if not base_path: - return "Error: Project path not set. Please use set_project_path to set a project directory first." - settings = ctx.request_context.lifespan_context.settings - - # Clear all settings files settings.clear() - - # Clear in-memory cache and index - global file_index, code_content_cache - file_index.clear() - code_content_cache.clear() - - return f"All settings and cache cleared from {settings.settings_path}" + return "Project settings, index, and cache have been cleared." @mcp.tool() def refresh_search_tools(ctx: Context) -> str: - """Refresh search tools detection and show available capabilities.""" - base_path = ctx.request_context.lifespan_context.base_path - - # Check if base_path is set - if not base_path: - return "Error: Project path not set. Please use set_project_path to set a project directory first." - + """ + Manually re-detect the available command-line search tools on the system. + This is useful if you have installed a new tool (like ripgrep) after starting the server. + """ settings = ctx.request_context.lifespan_context.settings + settings.refresh_available_strategies() - # Refresh search tools - config = settings.refresh_search_tools() - - # Build response message - preferred = config.get('preferred_tool', 'basic') - available_tools = config.get('available_tools', {}) - - message = f"Search tools refreshed. Preferred tool: {preferred}\n" - message += "Available tools:\n" - - for tool_name, is_available in available_tools.items(): - status = "✓" if is_available else "✗" - message += f" {status} {tool_name}\n" + config = settings.get_search_tools_config() - if preferred != 'basic': - message += f"\nAdvanced search capabilities enabled with {preferred}." - else: - message += "\nOnly basic search available. Consider installing ripgrep for better performance." - - return message + return f"Search tools refreshed. Available: {config['available_tools']}. Preferred: {config['preferred_tool']}." # ----- PROMPTS ----- @@ -864,177 +757,22 @@ def _count_files(directory: Dict) -> int: return count def _get_all_files(directory: Dict, prefix: str = "") -> List[Tuple[str, Dict]]: - """ - Recursively get all files from the directory structure. - Returns a list of (file_path, file_info) tuples. - """ - result = [] - - for name, value in directory.items(): - if isinstance(value, dict): - if "type" in value and value["type"] == "file": - result.append((value["path"], value)) - else: - new_prefix = f"{prefix}/{name}" if prefix else name - result.extend(_get_all_files(value, new_prefix)) - - return result - -def _create_safe_fuzzy_pattern(pattern: str) -> str: - """Create safe fuzzy search patterns that are more permissive than exact match - but still safe from regex injection attacks. - - Args: - pattern: Original search pattern - - Returns: - Safe fuzzy pattern for grep -E (extended regex) - """ - import re - - # Escape any regex special characters to make them literal - escaped = re.escape(pattern) - - # Create fuzzy pattern that matches: - # 1. Word at start of word boundary (e.g., "test" in "testing") - # 2. Word at end of word boundary (e.g., "test" in "mytest") - # 3. Whole word (e.g., "test" as standalone word) - if len(pattern) >= 3: # Only for patterns of reasonable length - # This pattern allows partial matches at word boundaries - fuzzy_pattern = f"\\b{escaped}|{escaped}\\b" - else: - # For short patterns, require full word boundaries to avoid too many matches - fuzzy_pattern = f"\\b{escaped}\\b" - - return fuzzy_pattern - -def _execute_advanced_search( - pattern: str, - base_path: str, - tool: str, - case_sensitive: bool, - context_lines: int, - file_pattern: Optional[str], - fuzzy: bool = False -) -> Dict[str, List[Tuple[int, str]]]: - """Execute advanced search using external tools. - - Returns: - Dict mapping file paths to lists of (line_number, line_content) tuples - """ - # Prepare search pattern - search_pattern = pattern - if fuzzy: - search_pattern = _create_safe_fuzzy_pattern(pattern) - - # Build command based on tool - if tool == 'ripgrep': - if fuzzy: - cmd = ['rg', '--line-number', '--no-heading'] # Use regex mode for fuzzy - else: - cmd = ['rg', '--line-number', '--no-heading', '--fixed-strings'] - if not case_sensitive: - cmd.append('--ignore-case') - if context_lines > 0: - cmd.extend(['-A', str(context_lines), '-B', str(context_lines)]) - if file_pattern: - cmd.extend(['--glob', file_pattern]) - - elif tool == 'ag': - if fuzzy: - cmd = ['ag', '--line-numbers', '--noheading'] # Use regex mode for fuzzy - else: - cmd = ['ag', '--line-numbers', '--noheading', '--literal'] - if not case_sensitive: - cmd.append('--ignore-case') - if context_lines > 0: - cmd.extend(['-A', str(context_lines), '-B', str(context_lines)]) - if file_pattern: - cmd.extend(['--', file_pattern]) - - elif tool == 'grep': - if fuzzy: - cmd = ['grep', '-rn', '-E'] # -E for extended regex (safe fuzzy patterns) - else: - cmd = ['grep', '-rn', '-F'] # -F for fixed strings (exact match) - if not case_sensitive: - cmd.append('-i') - if context_lines > 0: - cmd.extend(['-A', str(context_lines), '-B', str(context_lines)]) - if file_pattern: - cmd.extend(['--include=' + file_pattern]) - else: - raise ValueError(f"Unknown search tool: {tool}") - - # Add pattern and base path - cmd.extend([search_pattern, base_path]) - - # Execute command - result = subprocess.run( - cmd, - capture_output=True, - text=True, - timeout=30, - cwd=base_path - ) - - if result.returncode not in [0, 1]: # 0 = found, 1 = not found, others = error - raise Exception(f"Search command failed: {result.stderr}") - - # Parse output - return _parse_search_output(result.stdout, base_path) - -def _parse_search_output(output: str, base_path: str) -> Dict[str, List[Tuple[int, str]]]: - """Parse search tool output into structured format. - - Returns: - Dict mapping file paths to lists of (line_number, line_content) tuples - """ - results = {} - - for line in output.splitlines(): - if not line.strip(): - continue - - # Parse format: filename:line_number:content - parts = line.split(':', 2) - if len(parts) >= 3: - file_path = parts[0] - try: - line_number = int(parts[1]) - content = parts[2] - - # Make file path relative to base_path - if file_path.startswith(base_path): - file_path = os.path.relpath(file_path, base_path) - - if file_path not in results: - results[file_path] = [] - - results[file_path].append((line_number, content.strip())) - - except ValueError: - # Skip lines that don't have valid line numbers - continue - - return results + """Recursively get all files from the index.""" + all_files = [] + for name, item in directory.items(): + current_path = os.path.join(prefix, name) + if item['type'] == 'file': + all_files.append((current_path, item)) + elif item['type'] == 'directory': + all_files.extend(_get_all_files(item['children'], current_path)) + return all_files def main(): - """Entry point for the code indexer.""" - print("Starting Code Index MCP Server...", file=sys.stderr) - - # Ensure temporary directory exists using ProjectSettings - temp_dir = os.path.join(tempfile.gettempdir(), SETTINGS_DIR) - print(f"Temporary directory: {temp_dir}") - - try: - # Use ProjectSettings to handle directory creation consistently - temp_settings = ProjectSettings("", skip_load=True) - print(f"Temporary directory setup completed") - except Exception as e: - print(f"Error setting up temporary directory: {e}", file=sys.stderr) - + """Main function to run the MCP server.""" + # Run the server. Tools are discovered automatically via decorators. mcp.run() -if __name__ == "__main__": +if __name__ == '__main__': + # Set path to project root + sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) main()