Skip to content

Commit 3e6ff5a

Browse files
committed
feat: Enhance file watcher and index service with improved logging and concurrency control
1 parent 604b4b8 commit 3e6ff5a

File tree

3 files changed

+95
-77
lines changed

3 files changed

+95
-77
lines changed

src/code_index_mcp/server.py

Lines changed: 17 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -270,12 +270,12 @@ def get_file_watcher_status(ctx: Context) -> Dict[str, Any]:
270270
file_watcher_error = None
271271
if hasattr(ctx.request_context.lifespan_context, 'file_watcher_error'):
272272
file_watcher_error = ctx.request_context.lifespan_context.file_watcher_error
273-
273+
274274
# Get file watcher service from context
275275
file_watcher_service = None
276276
if hasattr(ctx.request_context.lifespan_context, 'file_watcher_service'):
277277
file_watcher_service = ctx.request_context.lifespan_context.file_watcher_service
278-
278+
279279
# If there's an error, return error status with recommendation
280280
if file_watcher_error:
281281
status = {
@@ -285,39 +285,39 @@ def get_file_watcher_status(ctx: Context) -> Dict[str, Any]:
285285
"recommendation": "Use refresh_index tool for manual updates",
286286
"manual_refresh_required": True
287287
}
288-
288+
289289
# Add basic configuration if available
290290
if hasattr(ctx.request_context.lifespan_context, 'settings') and ctx.request_context.lifespan_context.settings:
291291
file_watcher_config = ctx.request_context.lifespan_context.settings.get_file_watcher_config()
292292
status["configuration"] = file_watcher_config
293-
293+
294294
return status
295-
295+
296296
# If no service and no error, it's not initialized
297297
if not file_watcher_service:
298298
return {
299299
"available": True,
300300
"active": False,
301-
"status": "not_initialized",
301+
"status": "not_initialized",
302302
"message": "File watcher service not initialized. Set project path to enable auto-refresh.",
303303
"recommendation": "Use set_project_path tool to initialize file watcher"
304304
}
305-
305+
306306
# Get status from file watcher service
307307
status = file_watcher_service.get_status()
308-
308+
309309
# Add index service status
310310
index_service = IndexService(ctx)
311311
rebuild_status = index_service.get_rebuild_status()
312312
status["rebuild_status"] = rebuild_status
313-
313+
314314
# Add configuration
315315
if hasattr(ctx.request_context.lifespan_context, 'settings') and ctx.request_context.lifespan_context.settings:
316316
file_watcher_config = ctx.request_context.lifespan_context.settings.get_file_watcher_config()
317317
status["configuration"] = file_watcher_config
318-
318+
319319
return status
320-
320+
321321
except Exception as e:
322322
return {"status": "error", "message": f"Failed to get file watcher status: {e}"}
323323

@@ -334,9 +334,9 @@ def configure_file_watcher(
334334
# Get settings from context
335335
if not hasattr(ctx.request_context.lifespan_context, 'settings') or not ctx.request_context.lifespan_context.settings:
336336
return "Settings not available - project path not set"
337-
337+
338338
settings = ctx.request_context.lifespan_context.settings
339-
339+
340340
# Build updates dictionary
341341
updates = {}
342342
if enabled is not None:
@@ -345,17 +345,17 @@ def configure_file_watcher(
345345
updates["debounce_seconds"] = debounce_seconds
346346
if additional_exclude_patterns is not None:
347347
updates["additional_exclude_patterns"] = additional_exclude_patterns
348-
348+
349349
if not updates:
350350
return "No configuration changes specified"
351-
351+
352352
# Update configuration
353353
settings.update_file_watcher_config(updates)
354-
354+
355355
# If file watcher is running, we would need to restart it for changes to take effect
356356
# For now, just return success message with note about restart
357357
return f"File watcher configuration updated: {updates}. Restart may be required for changes to take effect."
358-
358+
359359
except Exception as e:
360360
return f"Failed to update file watcher configuration: {e}"
361361

@@ -419,22 +419,6 @@ def set_project() -> list[types.PromptMessage]:
419419

420420
def main():
421421
"""Main function to run the MCP server."""
422-
# Configure logging for debugging
423-
import logging
424-
logging.basicConfig(
425-
level=logging.DEBUG,
426-
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s',
427-
handlers=[
428-
logging.StreamHandler(),
429-
logging.FileHandler('mcp_server_debug.log', mode='w')
430-
]
431-
)
432-
433-
# Enable debug logging for file watcher and index services
434-
logging.getLogger('code_index_mcp.services.file_watcher_service').setLevel(logging.DEBUG)
435-
logging.getLogger('code_index_mcp.services.index_service').setLevel(logging.DEBUG)
436-
437-
# Run the server. Tools are discovered automatically via decorators.
438422
mcp.run()
439423

440424
if __name__ == '__main__':

src/code_index_mcp/services/file_watcher_service.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ def start_monitoring(self, rebuild_callback: Callable) -> bool:
111111

112112
# Log detailed Observer setup
113113
watch_path = str(self.base_path)
114-
self.logger.info(f"Scheduling Observer for path: {watch_path}")
114+
self.logger.debug(f"Scheduling Observer for path: {watch_path}")
115115

116116
self.observer.schedule(
117117
self.event_handler,
@@ -120,20 +120,20 @@ def start_monitoring(self, rebuild_callback: Callable) -> bool:
120120
)
121121

122122
# Log Observer start
123-
self.logger.info("Starting Observer...")
123+
self.logger.debug("Starting Observer...")
124124
self.observer.start()
125125
self.is_monitoring = True
126126
self.restart_attempts = 0
127127

128128
# Log Observer thread info
129129
if hasattr(self.observer, '_thread'):
130130
thread_info = f"Observer thread: {self.observer._thread}"
131-
self.logger.info(thread_info)
131+
self.logger.debug(thread_info)
132132

133133
# Verify observer is actually running
134134
if self.observer.is_alive():
135135
self.logger.info(
136-
"File watcher started successfully - Observer is alive",
136+
"File watcher started successfully",
137137
extra={
138138
"debounce_seconds": debounce_seconds,
139139
"monitored_path": str(self.base_path),
@@ -143,14 +143,14 @@ def start_monitoring(self, rebuild_callback: Callable) -> bool:
143143

144144
# Add diagnostic test - create a test event to verify Observer works
145145
import os
146-
self.logger.info(f"Observer thread is alive: {self.observer.is_alive()}")
147-
self.logger.info(f"Monitored path exists: {os.path.exists(str(self.base_path))}")
148-
self.logger.info(f"Event handler is set: {self.event_handler is not None}")
146+
self.logger.debug(f"Observer thread is alive: {self.observer.is_alive()}")
147+
self.logger.debug(f"Monitored path exists: {os.path.exists(str(self.base_path))}")
148+
self.logger.debug(f"Event handler is set: {self.event_handler is not None}")
149149

150150
# Log current directory for comparison
151151
current_dir = os.getcwd()
152-
self.logger.info(f"Current working directory: {current_dir}")
153-
self.logger.info(f"Are paths same: {os.path.normpath(current_dir) == os.path.normpath(str(self.base_path))}")
152+
self.logger.debug(f"Current working directory: {current_dir}")
153+
self.logger.debug(f"Are paths same: {os.path.normpath(current_dir) == os.path.normpath(str(self.base_path))}")
154154

155155
return True
156156
else:
@@ -350,7 +350,7 @@ def on_any_event(self, event: FileSystemEvent) -> None:
350350

351351
if should_process:
352352
process_msg = f"Processing file system event: {event.event_type} - {event.src_path}"
353-
self.logger.info(process_msg)
353+
self.logger.debug(process_msg)
354354
self.reset_debounce_timer()
355355
else:
356356
filter_msg = f"Event filtered out: {event.event_type} - {event.src_path}"
@@ -399,7 +399,7 @@ def should_process_event(self, event: FileSystemEvent) -> bool:
399399
if is_temp:
400400
return False
401401

402-
self.logger.info(f"Event will be processed: {event.src_path}")
402+
self.logger.debug(f"Event will be processed: {event.src_path}")
403403
return True
404404

405405
def is_excluded_path(self, path: Path) -> bool:
@@ -473,7 +473,7 @@ def reset_debounce_timer(self) -> None:
473473
self.logger.debug("Previous debounce timer cancelled")
474474

475475
timer_msg = f"Starting debounce timer for {self.debounce_seconds} seconds"
476-
self.logger.info(timer_msg)
476+
self.logger.debug(timer_msg)
477477

478478
self.debounce_timer = Timer(
479479
self.debounce_seconds,
@@ -491,12 +491,12 @@ def trigger_rebuild(self) -> None:
491491
if self.rebuild_callback:
492492
try:
493493
callback_msg = "Calling rebuild callback..."
494-
self.logger.info(callback_msg)
494+
self.logger.debug(callback_msg)
495495

496496
result = self.rebuild_callback()
497497

498498
result_msg = f"Rebuild callback completed with result: {result}"
499-
self.logger.info(result_msg)
499+
self.logger.debug(result_msg)
500500
except Exception as e:
501501
error_msg = f"Rebuild callback failed: {e}"
502502
self.logger.error(error_msg)

src/code_index_mcp/services/index_service.py

Lines changed: 64 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import fnmatch
99
import json
1010
import logging
11+
import threading
1112
import time
1213
from typing import Dict, Any, Optional, Callable
1314
from mcp.server.fastmcp import Context
@@ -37,6 +38,8 @@ def __init__(self, ctx: Context):
3738
super().__init__(ctx)
3839
self.logger = logging.getLogger(__name__)
3940
self.is_rebuilding = False
41+
self._rebuild_lock = threading.Lock()
42+
self._executor = None
4043

4144
def rebuild_index(self) -> str:
4245
"""
@@ -178,30 +181,40 @@ def start_background_rebuild(self) -> bool:
178181
Start index rebuild in background without blocking searches.
179182
180183
This method can be called from any thread (including file watcher threads).
181-
Uses ThreadPoolExecutor to run rebuild in a separate thread.
184+
Uses a persistent ThreadPoolExecutor with proper concurrency control.
182185
183186
Returns:
184187
True if rebuild started, False if already in progress
185188
"""
186-
self.logger.info("start_background_rebuild called")
189+
self.logger.debug("start_background_rebuild called")
187190

188-
if self.is_rebuilding:
189-
self.logger.info("Rebuild already in progress, skipping")
190-
return False
191+
# Atomic check-and-set with proper locking
192+
with self._rebuild_lock:
193+
if self.is_rebuilding:
194+
self.logger.debug("Rebuild already in progress, skipping")
195+
return False
196+
197+
# Atomically set rebuilding flag
198+
self.is_rebuilding = True
191199

192200
try:
193201
import concurrent.futures
194-
import threading
195202

196-
self.logger.info("Starting background rebuild in thread pool")
203+
# Initialize executor if needed
204+
if self._executor is None:
205+
self._executor = concurrent.futures.ThreadPoolExecutor(
206+
max_workers=1,
207+
thread_name_prefix="index_rebuild"
208+
)
209+
self.logger.debug("Created persistent ThreadPoolExecutor for rebuilds")
210+
211+
self.logger.debug("Starting background rebuild in thread pool")
197212

198213
def run_sync_rebuild():
199214
"""Run the rebuild synchronously in a background thread."""
200215
try:
201-
self.is_rebuilding = True
202216
start_time = time.time()
203-
204-
self.logger.info("Starting sync index rebuild...")
217+
self.logger.debug("Starting sync index rebuild...")
205218

206219
# Build new index using existing sync logic
207220
file_count = self._index_project(self.base_path)
@@ -219,43 +232,64 @@ def run_sync_rebuild():
219232
self.logger.error("Traceback: %s", traceback.format_exc())
220233
return False
221234
finally:
222-
self.is_rebuilding = False
223-
self.logger.info("Background rebuild finished, is_rebuilding set to False")
235+
# Thread-safe flag reset
236+
with self._rebuild_lock:
237+
self.is_rebuilding = False
238+
self.logger.debug("Background rebuild finished, is_rebuilding set to False")
239+
240+
# Submit to persistent executor
241+
future = self._executor.submit(run_sync_rebuild)
242+
243+
# Add completion callback
244+
def on_complete(fut):
245+
try:
246+
result = fut.result()
247+
if result:
248+
self.logger.debug("Background rebuild completed successfully")
249+
else:
250+
self.logger.error("Background rebuild failed")
251+
except Exception as e:
252+
self.logger.error(f"Background rebuild thread failed: {e}")
224253

225-
# Submit to thread pool and don't wait for completion
226-
with concurrent.futures.ThreadPoolExecutor(max_workers=1, thread_name_prefix="rebuild") as executor:
227-
future = executor.submit(run_sync_rebuild)
228-
229-
# Add completion callback
230-
def on_complete(fut):
231-
try:
232-
result = fut.result()
233-
if result:
234-
self.logger.info("Background rebuild completed successfully")
235-
else:
236-
self.logger.error("Background rebuild failed")
237-
except Exception as e:
238-
self.logger.error(f"Background rebuild thread failed: {e}")
239-
240-
future.add_done_callback(on_complete)
254+
future.add_done_callback(on_complete)
241255

242-
self.logger.info("Background rebuild scheduled successfully")
256+
self.logger.debug("Background rebuild scheduled successfully")
243257
return True
244258

245259
except Exception as e:
260+
# Reset flag on error
261+
with self._rebuild_lock:
262+
self.is_rebuilding = False
263+
246264
self.logger.error(f"Failed to start background rebuild: {e}")
247265
import traceback
248266
self.logger.error("Traceback: %s", traceback.format_exc())
249267
return False
250268

269+
def shutdown(self) -> None:
270+
"""
271+
Shutdown the index service and cleanup resources.
272+
273+
This should be called when the service is no longer needed to ensure
274+
proper cleanup of the ThreadPoolExecutor.
275+
"""
276+
if self._executor is not None:
277+
self.logger.debug("Shutting down ThreadPoolExecutor...")
278+
self._executor.shutdown(wait=True) # Wait for current rebuilds to complete
279+
self._executor = None
280+
self.logger.debug("ThreadPoolExecutor shutdown completed")
281+
251282
def get_rebuild_status(self) -> dict:
252283
"""
253284
Get current rebuild status information.
254285
255286
Returns:
256287
Dictionary with rebuild status
257288
"""
289+
with self._rebuild_lock:
290+
is_rebuilding = self.is_rebuilding
291+
258292
return {
259-
"is_rebuilding": self.is_rebuilding,
293+
"is_rebuilding": is_rebuilding,
260294
"index_cache_size": len(self.index_cache.get('files', [])) if self.index_cache else 0
261295
}

0 commit comments

Comments
 (0)