|
| 1 | +import requests |
| 2 | +import json |
| 3 | +import os |
| 4 | +import csv |
| 5 | +import argparse |
| 6 | +from typing import List, Dict, Tuple, Any |
| 7 | + |
| 8 | +SETTING_ROUTE = 'setting.json' |
| 9 | +DEFAULT_CX = 'b0264518c3d104eda' |
| 10 | + |
| 11 | + |
| 12 | +def load_settings(api_key: str | None = None) -> Dict[str, str]: |
| 13 | + """ |
| 14 | + Load API settings from setting.json, or create it if missing. |
| 15 | + """ |
| 16 | + if os.path.exists(SETTING_ROUTE): |
| 17 | + with open(SETTING_ROUTE, 'r', encoding="utf-8") as f: |
| 18 | + settings = json.load(f) |
| 19 | + |
| 20 | + if not settings.get("API_KEY"): |
| 21 | + if api_key: |
| 22 | + settings["API_KEY"] = api_key |
| 23 | + with open(SETTING_ROUTE, 'w', encoding="utf-8") as f: |
| 24 | + json.dump(settings, f, indent=4) |
| 25 | + else: |
| 26 | + raise ValueError("API_KEY is missing in setting.json. Use --add_api_key to add one.") |
| 27 | + else: |
| 28 | + if not api_key: |
| 29 | + raise FileNotFoundError("No setting.json found. Please run with --add_api_key to create one.") |
| 30 | + settings = {"API_KEY": api_key, "CX": DEFAULT_CX} |
| 31 | + with open(SETTING_ROUTE, 'w', encoding="utf-8") as f: |
| 32 | + json.dump(settings, f, indent=4) |
| 33 | + |
| 34 | + return settings |
| 35 | + |
| 36 | + |
| 37 | +def scrape(search_query: str, api_key: str, cx: str, pages: int = 1) -> Tuple[List[Dict[str, Any]], float]: |
| 38 | + """ |
| 39 | + Perform a Google Custom Search and return results. |
| 40 | + """ |
| 41 | + results = [] |
| 42 | + search_time = 0.0 |
| 43 | + |
| 44 | + for page in range(pages): |
| 45 | + start = page * 10 + 1 |
| 46 | + url = ( |
| 47 | + f"https://www.googleapis.com/customsearch/v1" |
| 48 | + f"?key={api_key}&q={search_query}&cx={cx}&start={start}" |
| 49 | + ) |
| 50 | + |
| 51 | + response = requests.get(url) |
| 52 | + if response.status_code != 200: |
| 53 | + raise RuntimeError(f"API request failed: {response.status_code} {response.text}") |
| 54 | + |
| 55 | + data = response.json() |
| 56 | + |
| 57 | + if "items" not in data: |
| 58 | + print("No results found or error:", data) |
| 59 | + break |
| 60 | + |
| 61 | + results.extend(data["items"]) |
| 62 | + search_time += float(data['searchInformation']['searchTime']) |
| 63 | + |
| 64 | + return results, search_time |
| 65 | + |
| 66 | + |
| 67 | +def export_to_csv(results: List[Dict[str, Any]], filename: str = "output.csv") -> None: |
| 68 | + """ |
| 69 | + Export search results to a CSV file. |
| 70 | + """ |
| 71 | + rows = [[i + 1, item.get("title", ""), item.get("link", "")] for i, item in enumerate(results)] |
| 72 | + |
| 73 | + with open(filename, "w", encoding="utf-8", newline="") as f: |
| 74 | + writer = csv.writer(f) |
| 75 | + writer.writerow(["#", "Title", "Link"]) |
| 76 | + writer.writerows(rows) |
| 77 | + |
| 78 | + print(f"Exported {len(results)} results to {filename}") |
| 79 | + |
| 80 | + |
| 81 | +def main(): |
| 82 | + parser = argparse.ArgumentParser(description="Google Custom Search scraper") |
| 83 | + parser.add_argument("-sq", "--search_query", required=True, help="Search query to search for") |
| 84 | + parser.add_argument("--add_api_key", type=str, help="Your Google API key") |
| 85 | + parser.add_argument("--pages", type=int, default=1, help="Number of pages of results to fetch") |
| 86 | + args = parser.parse_args() |
| 87 | + |
| 88 | + settings = load_settings(args.add_api_key) |
| 89 | + api_key = settings["API_KEY"] |
| 90 | + cx = settings["CX"] |
| 91 | + |
| 92 | + print(f"Using API key: {api_key}") |
| 93 | + |
| 94 | + results, elapsed_time = scrape(args.search_query, api_key, cx, args.pages) |
| 95 | + |
| 96 | + export_to_csv(results) |
| 97 | + print(f"Completed in {elapsed_time:.2f} seconds.") |
| 98 | + |
| 99 | + |
| 100 | +if __name__ == "__main__": |
| 101 | + main() |
0 commit comments