diff --git a/.gitignore b/.gitignore index 646a869..64849ec 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,5 @@ /.idea/ -**/__pycache__/ \ No newline at end of file +**/__pycache__/ +/webcollector.egg-info/ +/dist/ +**/*.p \ No newline at end of file diff --git a/README.md b/README.md index cb86568..69c0707 100644 --- a/README.md +++ b/README.md @@ -44,7 +44,8 @@ class NewsCrawler(wc.RamCrawler): super().__init__(auto_detect=True) self.num_threads = 10 self.add_seed("https://github.blog/") - self.add_regex("https://github.blog/[0-9]+.*") + self.add_regex("+https://github.blog/[0-9]+.*") + self.add_regex("-.*#.*") # do not detect urls that contain "#" def visit(self, page, detected): if page.match_url("https://github.blog/[0-9]+.*"): @@ -57,7 +58,6 @@ class NewsCrawler(wc.RamCrawler): crawler = NewsCrawler() crawler.start(10) - ``` ### Manually Detecting URLs @@ -87,6 +87,150 @@ class NewsCrawler(wc.RamCrawler): print("CONTENT: ", content[:50], "...") +crawler = NewsCrawler() +crawler.start(10) +``` + +### Filter Detected URLs by detected_filter Plugin + +[demo_detected_filter.py](examples/demo_detected_filter.py): + +```python +# coding=utf-8 +import webcollector as wc +from webcollector.filter import Filter +import re + + +class RegexDetectedFilter(Filter): + def filter(self, crawl_datum): + if re.fullmatch("https://github.blog/2019-02.*", crawl_datum.url): + return crawl_datum + else: + print("filtered by detected_filter: {}".format(crawl_datum.brief_info())) + return None + + +class NewsCrawler(wc.RamCrawler): + def __init__(self): + super().__init__(auto_detect=True, detected_filter=RegexDetectedFilter()) + self.num_threads = 10 + self.add_seed("https://github.blog/") + + def visit(self, page, detected): + + detected.extend(page.links("https://github.blog/[0-9]+.*")) + + if page.match_url("https://github.blog/[0-9]+.*"): + title = page.select("h1.lh-condensed")[0].text.strip() + content = page.select("div.markdown-body")[0].text.replace("\n", " ").strip() + print("\nURL: ", page.url) + print("TITLE: ", title) + print("CONTENT: ", content[:50], "...") + + +crawler = NewsCrawler() +crawler.start(10) +``` + + +### Resume Crawling by RedisCrawler + +[demo_redis_crawler.py](examples/demo_redis_crawler.py): + + +```python +# coding=utf-8 +from redis import StrictRedis +import webcollector as wc + + +class NewsCrawler(wc.RedisCrawler): + + def __init__(self): + super().__init__(redis_client=StrictRedis("127.0.0.1"), + db_prefix="news", + auto_detect=True) + self.num_threads = 10 + self.resumable = True # you can resume crawling after shutdown + self.add_seed("https://github.blog/") + self.add_regex("+https://github.blog/[0-9]+.*") + self.add_regex("-.*#.*") # do not detect urls that contain "#" + + def visit(self, page, detected): + if page.match_url("https://github.blog/[0-9]+.*"): + title = page.select("h1.lh-condensed")[0].text.strip() + content = page.select("div.markdown-body")[0].text.replace("\n", " ").strip() + print("\nURL: ", page.url) + print("TITLE: ", title) + print("CONTENT: ", content[:50], "...") + + +crawler = NewsCrawler() +crawler.start(10) + +``` + +### Custom Http Request with Requests + +[demo_custom_http_request.py](examples/demo_custom_http_request.py): + + +```python +# coding=utf-8 + +import webcollector as wc +from webcollector.model import Page +from webcollector.plugin.net import HttpRequester + +import requests + + +class MyRequester(HttpRequester): + def get_response(self, crawl_datum): + # custom http request + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36" + } + + print("sending request with MyRequester") + + # send request and get response + response = requests.get(crawl_datum.url, headers=headers) + + # update code + crawl_datum.code = response.status_code + + # wrap http response as a Page object + page = Page(crawl_datum, + response.content, + content_type=response.headers["Content-Type"], + http_charset=response.encoding) + + return page + + +class NewsCrawler(wc.RamCrawler): + def __init__(self): + super().__init__(auto_detect=True) + self.num_threads = 10 + + # set requester to enable MyRequester + self.requester = MyRequester() + + self.add_seed("https://github.blog/") + self.add_regex("+https://github.blog/[0-9]+.*") + self.add_regex("-.*#.*") # do not detect urls that contain "#" + + def visit(self, page, detected): + if page.match_url("https://github.blog/[0-9]+.*"): + title = page.select("h1.lh-condensed")[0].text.strip() + content = page.select("div.markdown-body")[0].text.replace("\n", " ").strip() + print("\nURL: ", page.url) + print("TITLE: ", title) + print("CONTENT: ", content[:50], "...") + + crawler = NewsCrawler() crawler.start(10) ``` \ No newline at end of file diff --git a/examples/demo_auto_news_crawler.py b/examples/demo_auto_news_crawler.py index f428f07..3af08f2 100644 --- a/examples/demo_auto_news_crawler.py +++ b/examples/demo_auto_news_crawler.py @@ -7,7 +7,8 @@ def __init__(self): super().__init__(auto_detect=True) self.num_threads = 10 self.add_seed("https://github.blog/") - self.add_regex("https://github.blog/[0-9]+.*") + self.add_regex("+https://github.blog/[0-9]+.*") + self.add_regex("-.*#.*") # do not detect urls that contain "#" def visit(self, page, detected): if page.match_url("https://github.blog/[0-9]+.*"): diff --git a/examples/demo_custom_http_request.py b/examples/demo_custom_http_request.py new file mode 100644 index 0000000..d9068e4 --- /dev/null +++ b/examples/demo_custom_http_request.py @@ -0,0 +1,56 @@ +# coding=utf-8 + +import webcollector as wc +from webcollector.model import Page +from webcollector.plugin.net import HttpRequester + +import requests + + +class MyRequester(HttpRequester): + def get_response(self, crawl_datum): + # custom http request + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36" + } + + print("sending request with MyRequester") + + # send request and get response + response = requests.get(crawl_datum.url, headers=headers) + + # update code + crawl_datum.code = response.status_code + + # wrap http response as a Page object + page = Page(crawl_datum, + response.content, + content_type=response.headers["Content-Type"], + http_charset=response.encoding) + + return page + + +class NewsCrawler(wc.RamCrawler): + def __init__(self): + super().__init__(auto_detect=True) + self.num_threads = 10 + + # set requester to enable MyRequester + self.requester = MyRequester() + + self.add_seed("https://github.blog/") + self.add_regex("+https://github.blog/[0-9]+.*") + self.add_regex("-.*#.*") # do not detect urls that contain "#" + + def visit(self, page, detected): + if page.match_url("https://github.blog/[0-9]+.*"): + title = page.select("h1.lh-condensed")[0].text.strip() + content = page.select("div.markdown-body")[0].text.replace("\n", " ").strip() + print("\nURL: ", page.url) + print("TITLE: ", title) + print("CONTENT: ", content[:50], "...") + + +crawler = NewsCrawler() +crawler.start(10) \ No newline at end of file diff --git a/examples/demo_detected_filter.py b/examples/demo_detected_filter.py new file mode 100644 index 0000000..2d44ff4 --- /dev/null +++ b/examples/demo_detected_filter.py @@ -0,0 +1,35 @@ +# coding=utf-8 +import webcollector as wc +from webcollector.filter import Filter +import re + + +class RegexDetectedFilter(Filter): + def filter(self, crawl_datum): + if re.fullmatch("https://github.blog/2019-02.*", crawl_datum.url): + return crawl_datum + else: + print("filtered by detected_filter: {}".format(crawl_datum.brief_info())) + return None + + +class NewsCrawler(wc.RamCrawler): + def __init__(self): + super().__init__(auto_detect=True, detected_filter=RegexDetectedFilter()) + self.num_threads = 10 + self.add_seed("https://github.blog/") + + def visit(self, page, detected): + + detected.extend(page.links("https://github.blog/[0-9]+.*")) + + if page.match_url("https://github.blog/[0-9]+.*"): + title = page.select("h1.lh-condensed")[0].text.strip() + content = page.select("div.markdown-body")[0].text.replace("\n", " ").strip() + print("\nURL: ", page.url) + print("TITLE: ", title) + print("CONTENT: ", content[:50], "...") + + +crawler = NewsCrawler() +crawler.start(10) diff --git a/examples/demo_redis_crawler.py b/examples/demo_redis_crawler.py new file mode 100644 index 0000000..08d689a --- /dev/null +++ b/examples/demo_redis_crawler.py @@ -0,0 +1,28 @@ +# coding=utf-8 +from redis import StrictRedis +import webcollector as wc + + +class NewsCrawler(wc.RedisCrawler): + + def __init__(self): + super().__init__(redis_client=StrictRedis("127.0.0.1"), + db_prefix="news", + auto_detect=True) + self.num_threads = 10 + self.resumable = True # you can resume crawling after shutdown + self.add_seed("https://github.blog/") + self.add_regex("+https://github.blog/[0-9]+.*") + self.add_regex("-.*#.*") # do not detect urls that contain "#" + + def visit(self, page, detected): + if page.match_url("https://github.blog/[0-9]+.*"): + title = page.select("h1.lh-condensed")[0].text.strip() + content = page.select("div.markdown-body")[0].text.replace("\n", " ").strip() + print("\nURL: ", page.url) + print("TITLE: ", title) + print("CONTENT: ", content[:50], "...") + + +crawler = NewsCrawler() +crawler.start(10) diff --git a/examples/demo_server.py b/examples/demo_server.py new file mode 100644 index 0000000..dcde4c9 --- /dev/null +++ b/examples/demo_server.py @@ -0,0 +1,12 @@ +# coding=utf-8 +from flask import Flask +import time +import random +app = Flask(__name__) + +@app.route("/") +def index(): + time.sleep(2) + return "ok" + +app.run() diff --git a/examples/demo_speed.py b/examples/demo_speed.py new file mode 100644 index 0000000..9e894e5 --- /dev/null +++ b/examples/demo_speed.py @@ -0,0 +1,21 @@ +# coding=utf-8 +import webcollector as wc +import time + + +class RubyChinaCrawler(wc.RamCrawler): + def __init__(self): + super().__init__(auto_detect=False) + self.num_threads = 10 + self.add_seeds(["https://ruby-china.org/topics?page={}".format(i) for i in range(1, 40)]) + + def visit(self, page, detected): + print("start_visit", page.url) + # time.sleep(4) + print("end_visit", page.url) + + +crawler = RubyChinaCrawler() +start = time.time() +crawler.start(10) +print(time.time() - start) diff --git a/setup.py b/setup.py index f200231..ecf1e73 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name="webcollector", - version="0.0.1-alpha", + version="0.0.5-alpha", author="Jun Hu", packages=find_packages( exclude=[ @@ -12,11 +12,11 @@ install_requires=[ "html5lib", "aiohttp", - "BeautifulSoup4" + "BeautifulSoup4", + "redis", + "requests" ], - description=""" - An open source web crawler framework. - """, + description="WebCollector-Python is an open source web crawler framework based on Python.It provides some simple interfaces for crawling the Web,you can setup a multi-threaded web crawler in less than 5 minutes.", license="GNU General Public License v3.0 (See LICENSE)", url="https://github.com/CrawlScript/WebCollector-Python" ) \ No newline at end of file diff --git a/test.py b/test.py new file mode 100644 index 0000000..7f22256 --- /dev/null +++ b/test.py @@ -0,0 +1,95 @@ +# coding=utf-8 + +import asyncio +import requests +import random +import threading + + + +import asyncio +import requests +from concurrent.futures import ThreadPoolExecutor +import time + +url = "http://127.0.0.1:5000" + +loop = asyncio.get_event_loop() + + +async def cor(): + print("c-start") + await asyncio.sleep(4) + print("c-end") + + +async def main(): + tasks = [loop.create_task(cor()) for _ in range(10)] + print("finish tasks======") + for i, task in enumerate(tasks): + print("start", i) + time.sleep(5) + await task + print("end", i) + +loop.run_until_complete(main()) + +adfads + +pool = ThreadPoolExecutor(20) + + + +def request(i): + print("start", i) + time.sleep(5) + text = requests.get(url).text + print("content:", i, text) + print(threading.get_ident()) + return text + + +# f0 = loop.run_in_executor(pool, request) +# f1 = loop.run_in_executor(pool, request) +# futures = [loop.run_in_executor(pool, request, i) for i in range(20)] +# futures = [loop.run_in_executor(None, requests.get, "http://127.0.0.1:5000") for _ in range(10)] +# print("======") + + +async def cor(i): + for j in range(20): + future = loop.run_in_executor(pool, request, "{}_{}".format(i, j)) + await future + print("cor", i) + print("end-cor", i) + +loop.run_until_complete(asyncio.gather(*[cor(i) for i in range(10)])) + + +# async def main(): +# # for future in futures: +# for i in range(10): +# # future = loop.run_in_executor(pool, request) +# await futures[i] +# print("end", i) +# +# loop.run_until_complete(main()) + +# async def test(): +# print("start") +# # await asyncio.sleep(2) +# request_future = loop.run_in_executor(pool, request) +# result = await request_future +# print("end") +# +# +# +# +# async def main(): +# tasks = [loop.create_task(test()) for _ in range(10)] +# for i in range(10): +# await tasks[i] +# print("task end", i) +# +# +# loop.run_until_complete(main()) \ No newline at end of file diff --git a/webcollector/__init__.py b/webcollector/__init__.py index 38184e2..d25ed3f 100644 --- a/webcollector/__init__.py +++ b/webcollector/__init__.py @@ -2,8 +2,10 @@ import logging import sys +from webcollector.plugin.redis import RedisCrawler from webcollector.plugin.ram import RamCrawler + logging.basicConfig( stream=sys.stdout, level=logging.INFO, diff --git a/webcollector/config.py b/webcollector/config.py new file mode 100644 index 0000000..35e24c5 --- /dev/null +++ b/webcollector/config.py @@ -0,0 +1,2 @@ +# coding=utf-8 +DEFAULT_USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36" diff --git a/webcollector/crawler.py b/webcollector/crawler.py index e6e27e9..3544d0c 100644 --- a/webcollector/crawler.py +++ b/webcollector/crawler.py @@ -4,6 +4,7 @@ from webcollector.fetch import Fetcher from webcollector.generate import StatusGeneratorFilter from webcollector.model import Page, CrawlDatums +from webcollector.plugin.net import HttpRequester from webcollector.utils import RegexRule import logging @@ -13,23 +14,39 @@ class Crawler(object): - def __init__(self, db_manager, generator_filter=StatusGeneratorFilter()): + def __init__(self, + db_manager, + requester=HttpRequester(), + generator_filter=StatusGeneratorFilter(), + detected_filter=None): self.db_manager = db_manager + self.requester = requester self.generator_filter = generator_filter + self.detected_filter = detected_filter self.fetcher = None self.num_threads = 10 + self.resumable = None + self.seeds = CrawlDatums() + self.forced_seeds = CrawlDatums() - def add_seed(self, url_or_datum, type=None): - return self.seeds.append(url_or_datum).set_type(type) + def add_seed(self, url_or_datum, type=None, forced=False): + if forced: + return self.forced_seeds.append(url_or_datum).set_type(type) + else: + return self.seeds.append(url_or_datum).set_type(type) - def add_seeds(self, urls_or_datums, type=None): + def add_seeds(self, urls_or_datums, type=None, forced=False): crawl_datums = [] for url_or_datum in urls_or_datums: - crawl_datum = self.add_seed(url_or_datum, type=type) + crawl_datum = self.add_seed(url_or_datum, type=type, forced=forced) crawl_datums.append(crawl_datum) return crawl_datums + def inject(self): + self.db_manager.inject(self.seeds, forced=False) + self.db_manager.inject(self.forced_seeds, forced=True) + # def add_seed_and_return(self, url_or_datum): # crawl_datum = CrawlDatum.convert_from_item(url_or_datum) # self.seeds.append(crawl_datum) @@ -47,16 +64,20 @@ def start_once(self, depth_index): self.db_manager.merge() self.fetcher = Fetcher( self.db_manager, + self.requester, execute_func=self.execute, generator_filter=self.generator_filter, + detected_filter=self.detected_filter, num_threads=self.num_threads ) return self.fetcher.start() def start(self, depth): - if len(self.seeds) == 0: + if not self.resumable: + self.db_manager.clear() + if len(self.seeds) == 0 and len(self.forced_seeds) == 0: raise Exception("Please add at least one seed") - self.db_manager.inject(self.seeds) + self.inject() for depth_index in range(depth): print("start depth {}".format(depth_index)) start_time = time.time() @@ -70,8 +91,8 @@ def start(self, depth): class AutoDetectCrawler(Crawler): - def __init__(self, db_manager, auto_detect): - super().__init__(db_manager) + def __init__(self, db_manager, auto_detect, **kwargs): + super().__init__(db_manager, **kwargs) self.auto_detect = auto_detect self.regex_rule = RegexRule() diff --git a/webcollector/db_manager.py b/webcollector/db_manager.py index 7fec29e..f3990fb 100644 --- a/webcollector/db_manager.py +++ b/webcollector/db_manager.py @@ -3,10 +3,7 @@ class DBManager(object): - def inject(self, seeds): - pass - - def write_crawl(self, crawl_datum): + def inject(self, seeds, forced=False): pass def init_fetch_and_detect(self): @@ -23,3 +20,12 @@ def merge(self): def create_generator(self): return None + + def clear(self): + pass + + def open(self): + pass + + def close(self): + pass diff --git a/webcollector/fetch.py b/webcollector/fetch.py index dcfca79..aa88016 100644 --- a/webcollector/fetch.py +++ b/webcollector/fetch.py @@ -1,7 +1,6 @@ # coding=utf-8 import queue import asyncio -import aiohttp import logging from webcollector.model import Page, CrawlDatums, CrawlDatum @@ -9,31 +8,48 @@ class Fetcher(object): - def __init__(self, db_manager, execute_func, generator_filter=None, num_threads=10): + def __init__(self, + db_manager, + requester, + execute_func, + generator_filter=None, + detected_filter=None, + num_threads=10): self.fetch_queue = None self.feed_stopped = None self.generator = None self.generator_filter = generator_filter + self.detected_filter = detected_filter self.feeder = None self.buffer_size = 1000 self.db_manager = db_manager + self.requester = requester self.execute_func = execute_func self.num_threads = num_threads + self.loop = None + + async def async_start(self): self.fetch_queue = queue.Queue() self.feed_stopped = False + self.db_manager.open() self.db_manager.init_fetch_and_detect() self.generator = self.db_manager.create_generator() self.generator.generator_filter = self.generator_filter - async with aiohttp.ClientSession() as session: - coroutines = [self.fetch_coroutine(session, self.execute_func) for _ in range(self.num_threads)] + + # async with self.requester.create_async_context_manager(): + # coroutines = [self.fetch_coroutine(self.execute_func) for _ in range(self.num_threads)] + # await asyncio.gather(*coroutines) + with self.requester: + coroutines = [self.fetch_coroutine(self.execute_func) for _ in range(self.num_threads)] await asyncio.gather(*coroutines) + self.db_manager.close() def start(self): - loop = asyncio.get_event_loop() - loop.run_until_complete(self.async_start()) + self.loop = asyncio.get_event_loop() + self.loop.run_until_complete(self.async_start()) return self.generator.num_generated def feed(self): @@ -45,7 +61,7 @@ def feed(self): else: self.fetch_queue.put(crawl_datum) - async def fetch_coroutine(self, session, execute_func): + async def fetch_coroutine(self, execute_func): while True: if self.fetch_queue.empty(): if self.feed_stopped: @@ -54,24 +70,30 @@ async def fetch_coroutine(self, session, execute_func): else: crawl_datum = self.fetch_queue.get(block=False) try: - async with session.get(crawl_datum.url) as response: - code = response.status - content = await response.content.read() - encoding = response.get_encoding() - content_type = response.content_type - crawl_datum.code = code - page = Page(crawl_datum, content, content_type=content_type, http_charset=encoding) + # loop = asyncio.get_event_loop() + request_future = self.loop.run_in_executor(None, self.requester.get_response, crawl_datum) + page = await request_future + # page = await self.requester.get_response(crawl_datum) detected = CrawlDatums() execute_func(page, detected) crawl_datum.status = CrawlDatum.STATUS_DB_SUCCESS - self.db_manager.write_fetch(crawl_datum) - for detected_crawl_datum in detected: + if self.detected_filter is not None: + filtered_detected = CrawlDatums() + for detected_crawl_datum in detected: + detected_crawl_datum = self.detected_filter.filter(detected_crawl_datum) + if detected_crawl_datum is not None: + filtered_detected.append(detected_crawl_datum) + else: + filtered_detected = detected + + for detected_crawl_datum in filtered_detected: self.db_manager.write_detect(detected_crawl_datum) logger.info("done: {}".format(crawl_datum.brief_info())) except Exception as e: logger.error("failed: {}".format(crawl_datum.brief_info()), exc_info=True) + crawl_datum.status = CrawlDatum.STATUS_DB_FAILED - - + crawl_datum.num_fetched += 1 + self.db_manager.write_fetch(crawl_datum) diff --git a/webcollector/filter.py b/webcollector/filter.py new file mode 100644 index 0000000..372dfe2 --- /dev/null +++ b/webcollector/filter.py @@ -0,0 +1,15 @@ +# coding=utf-8 +class Filter(object): + def filter(self, crawl_datum): + return None + + +class HistoryFilter(Filter): + def __init__(self, history): + self.history = history + + def filter(self, crawl_datum): + if crawl_datum.key in self.history: + return crawl_datum + else: + return None diff --git a/webcollector/generate.py b/webcollector/generate.py index 9d83012..510772a 100644 --- a/webcollector/generate.py +++ b/webcollector/generate.py @@ -1,11 +1,13 @@ # coding=utf-8 from webcollector.model import CrawlDatum +from webcollector.filter import Filter class Generator(object): def __init__(self): self.num_generated = 0 + self.generator_filter = None def next(self): while True: @@ -27,12 +29,7 @@ def _next(self): return None -class GeneratorFilter(object): - def filter(self, crawl_datum): - pass - - -class StatusGeneratorFilter(GeneratorFilter): +class StatusGeneratorFilter(Filter): def filter(self, crawl_datum): if crawl_datum.status != CrawlDatum.STATUS_DB_SUCCESS: return crawl_datum diff --git a/webcollector/model.py b/webcollector/model.py index cd70182..ec68667 100644 --- a/webcollector/model.py +++ b/webcollector/model.py @@ -2,6 +2,7 @@ from urllib.parse import urljoin import chardet from bs4 import BeautifulSoup +import json # A CrawlDatum corresponds to a task description (usually for a webpage) @@ -16,13 +17,20 @@ class CrawlDatum(object): META_KEY_SYS_TYPE = "sys_type" - def __init__(self, url, key=None, type=None, meta_dict=None, code=CODE_NOT_SET, status=STATUS_DB_UNEXECUTED): + def __init__(self, url, + key=None, + type=None, + meta_dict=None, + code=CODE_NOT_SET, + status=STATUS_DB_UNEXECUTED, + num_fetched=0): self.url = url self.key = key if key is not None else url self.type = type self.meta_dict = meta_dict self.code = code self.status = status + self.num_fetched = num_fetched def set_key(self, key): self.key = key @@ -66,6 +74,37 @@ def brief_info(self): infos.append("Key: {} (URL: {})".format(self.key, self.url)) return " ".join(infos) + def to_dict(self): + dict_data = { + "url": self.url, + "key": self.key, + "type": self.type, + "meta_dict": self.meta_dict, + "code": self.code, + "status": self.status, + "num_fetched": self.num_fetched + } + return dict_data + + @classmethod + def from_dict(cls, dict_data): + return CrawlDatum( + url=dict_data["url"], + key=dict_data["key"], + type=dict_data["type"], + meta_dict=dict_data["meta_dict"], + code=dict_data["code"], + status=dict_data["status"], + num_fetched=dict_data["num_fetched"] + ) + + def to_json(self): + return json.dumps(self.to_dict()) + + @classmethod + def from_json(cls, json_str): + return CrawlDatum.from_dict(json.loads(json_str)) + class CrawlDatums(list): diff --git a/webcollector/net.py b/webcollector/net.py index 87062c3..684aa7b 100644 --- a/webcollector/net.py +++ b/webcollector/net.py @@ -1,3 +1,13 @@ # coding=utf-8 +class Requester(object): + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + def get_response(self, crawl_datum): + raise NotImplementedError() \ No newline at end of file diff --git a/webcollector/plugin/net.py b/webcollector/plugin/net.py new file mode 100644 index 0000000..eb9c3a0 --- /dev/null +++ b/webcollector/plugin/net.py @@ -0,0 +1,52 @@ +# coding=utf-8 +from webcollector.config import DEFAULT_USER_AGENT +from webcollector.model import Page +from webcollector.net import Requester +import requests + + +class HttpRequester(Requester): + + def get_response(self, crawl_datum): + headers = {"User-Agent": DEFAULT_USER_AGENT} + response = requests.get(crawl_datum.url, headers=headers) + + code = response.status_code + content = response.content + encoding = response.encoding + content_type = response.headers["Content-Type"] + + crawl_datum.code = code + page = Page(crawl_datum, content, content_type=content_type, http_charset=encoding) + + return page + + +# class AioHttpRequester(Requester): +# +# def __init__(self): +# self.session = None +# +# def create_async_context_manager(self): +# self.session = aiohttp.ClientSession() +# return self.session +# +# def request(self, crawl_datum): +# return self.session.get( +# crawl_datum.url, +# headers={"User-Agent": DEFAULT_USER_AGENT} +# ) +# +# async def get_response(self, crawl_datum): +# # async with self.session.get(crawl_datum.url) as response: +# async with self.request(crawl_datum) as response: +# code = response.status +# content = await response.content.read() +# encoding = response.get_encoding() +# content_type = response.content_type +# crawl_datum.code = code +# page = Page(crawl_datum, content, content_type=content_type, http_charset=encoding) +# return page + + + diff --git a/webcollector/plugin/ram.py b/webcollector/plugin/ram.py index 10bd7ec..26e25eb 100644 --- a/webcollector/plugin/ram.py +++ b/webcollector/plugin/ram.py @@ -17,11 +17,9 @@ class RamDBGenerator(Generator): def __init__(self, ram_db): super().__init__() self.ram_db = ram_db - self.iter = None + self.iter = iter(self.ram_db.crawl_db.values()) def _next(self) -> CrawlDatum: - if self.iter is None: - self.iter = iter(self.ram_db.crawl_db.values()) try: return next(self.iter) except StopIteration: @@ -32,19 +30,17 @@ class RamDBManager(DBManager): def __init__(self, ram_db): self.ram_db = ram_db - def inject(self, seeds): + def inject(self, seeds, forced=False): for seed in seeds: if isinstance(seed, str): seed = CrawlDatum(seed) + if not forced and seed.key in self.ram_db.crawl_db: + continue self.ram_db.crawl_db[seed.key] = seed def create_generator(self): return RamDBGenerator(self.ram_db) - def write_crawl(self, crawl_datum): - if crawl_datum.key not in self.ram_db.crawl_db: - self.ram_db.crawl_db[crawl_datum.key] = crawl_datum - def init_fetch_and_detect(self): self.ram_db.fetch_db = {} self.ram_db.detect_db = {} @@ -70,7 +66,7 @@ def merge(self): class RamCrawler(AutoDetectCrawler): - def __init__(self, auto_detect): + def __init__(self, auto_detect, **kwargs): self.ram_db = RamDB() - super().__init__(RamDBManager(self.ram_db), auto_detect) + super().__init__(RamDBManager(self.ram_db), auto_detect, **kwargs) diff --git a/webcollector/plugin/redis.py b/webcollector/plugin/redis.py new file mode 100644 index 0000000..03de39e --- /dev/null +++ b/webcollector/plugin/redis.py @@ -0,0 +1,90 @@ +# coding=utf-8 +from redis import StrictRedis + +from webcollector.crawler import AutoDetectCrawler +from webcollector.db_manager import DBManager +from webcollector.generate import Generator +from webcollector.model import CrawlDatum + + +class RedisDBGenerator(Generator): + + def __init__(self, redis_db_manager): + super().__init__() + self.history_keys = set() + self.iter = redis_db_manager.redis_client.hscan_iter( + redis_db_manager.crawl_db + ) + + def _next(self) -> CrawlDatum: + try: + while True: + key, crawl_datum_json = next(self.iter) + if key in self.history_keys: + continue + else: + self.history_keys.add(key) + return CrawlDatum.from_json(crawl_datum_json) + except StopIteration: + return None + + +class RedisDBManager(DBManager): + def __init__(self, redis_client: StrictRedis, db_prefix): + self.redis_client = redis_client + self.db_prefix = db_prefix + self.crawl_db = "{}_crawl".format(db_prefix) + self.fetch_db = "{}_fetch".format(db_prefix) + self.detect_db = "{}_detect".format(db_prefix) + + def open(self): + pass + + def close(self): + pass + + def clear(self): + self.redis_client.delete(self.crawl_db) + self.redis_client.delete(self.fetch_db) + self.redis_client.delete(self.detect_db) + + def inject(self, seeds, forced=False): + for seed in seeds: + if isinstance(seed, str): + seed = CrawlDatum(seed) + if not forced and self.redis_client.hexists(self.crawl_db, seed.key): + continue + self.redis_client.hset(self.crawl_db, seed.key, seed.to_json()) + + def create_generator(self): + return RedisDBGenerator(self) + + def init_fetch_and_detect(self): + pass + + def write_fetch(self, crawl_datum): + self.redis_client.hset(self.fetch_db, crawl_datum.key, crawl_datum.to_json()) + + def write_detect(self, crawl_datum): + self.redis_client.hset(self.detect_db, crawl_datum.key, crawl_datum.to_json()) + + def merge(self): + print("merging......") + if self.redis_client.exists(self.fetch_db): + for _, crawl_datum_json in self.redis_client.hscan_iter(self.fetch_db): + crawl_datum = CrawlDatum.from_json(crawl_datum_json) + self.redis_client.hset(self.crawl_db, crawl_datum.key, crawl_datum.to_json()) + self.redis_client.delete(self.fetch_db) + + if self.redis_client.exists(self.detect_db): + for key, crawl_datum_json in self.redis_client.hscan_iter(self.detect_db): + if not self.redis_client.hexists(self.crawl_db, key): + crawl_datum = CrawlDatum.from_json(crawl_datum_json) + self.redis_client.hset(self.crawl_db, crawl_datum.key, crawl_datum.to_json()) + self.redis_client.delete(self.detect_db) + + +class RedisCrawler(AutoDetectCrawler): + def __init__(self, redis_client, db_prefix, auto_detect, **kwargs): + super().__init__(RedisDBManager(redis_client, db_prefix), auto_detect, **kwargs) +