Skip to content

Commit

Permalink
feat: add mongo pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
ConlinH committed Nov 22, 2022
1 parent 32d6a37 commit 10e2819
Show file tree
Hide file tree
Showing 10 changed files with 341 additions and 12 deletions.
10 changes: 10 additions & 0 deletions aioscrapy/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@
except ImportError:
pass


try:
import motor
from aioscrapy.db.aiomongo import mongo_manager

db_manager_map['mongo'] = mongo_manager
except ImportError:
pass


logger = logging.getLogger(__name__)

__all__ = ['db_manager', 'get_pool', 'get_manager']
Expand Down
88 changes: 88 additions & 0 deletions aioscrapy/db/aiomongo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import logging

from motor.motor_asyncio import AsyncIOMotorClient

import aioscrapy
from aioscrapy.db.absmanager import AbsDBPoolManager

logger = logging.getLogger(__name__)


class MongoExecutor:
def __init__(self, alias: str, pool_manager: "AioMongoManager"):
self.alias = alias
self.pool_manager = pool_manager

async def insert(self, table_name, values, db_name=None):
client, db_name_default = self.pool_manager.get_pool(self.alias)
db_name = db_name or db_name_default
return await client[f'{db_name}'][f'{table_name}_collection'].insert_many(values)

def __getattr__(self, table_name: str):
client, db_name_default = self.pool_manager.get_pool(self.alias)
return client[f'{db_name_default}'][f'{table_name}_collection']


class AioMongoManager(AbsDBPoolManager):
_clients = {}

async def create(self, alias: str, params: dict):
if alias in self._clients:
return self._clients[alias]

params = params.copy()
db_name = params.pop('db')
params.setdefault('connecttimeoutms', 30)
client = AsyncIOMotorClient(**params)
return self._clients.setdefault(alias, (client, db_name))

def get_pool(self, alias: str):
return self._clients.get(alias)

def executor(self, alias: str) -> MongoExecutor:
"""Get RedisExecutor"""
return MongoExecutor(alias, self)

async def close(self, alias: str):
"""Close mongo pool named `alias`"""
client, *_ = self._clients.pop(alias, None)
if client:
client.close()

async def close_all(self):
"""Close all clients of mongo"""
for alias in list(self._clients.keys()):
await self.close(alias)

async def from_dict(self, db_args: dict):
"""Create mongo with dict"""
for alias, args in db_args.items():
await self.create(alias, args)

async def from_settings(self, settings: aioscrapy.Settings):
"""Create mongo with settings"""
for alias, args in settings.getdict('MONGO_ARGS').items():
await self.create(alias, args)


mongo_manager = AioMongoManager()

if __name__ == '__main__':
import asyncio


async def test():
await mongo_manager.create('default', {
'host': 'mongodb://root:root@192.168.234.128:27017',
'db': 'test',
})
mongo = mongo_manager.executor('default')
# result = await mongo.insert('user', [{'name': 'zhang', 'age': 18}, {'name': 'li', 'age': 20}])
# print('inserted %d docs' % (len(result.inserted_ids),))

document = await mongo.user.find_one({'img_url': {'$gt': 19}})
print(document)
await mongo_manager.close_all()


asyncio.get_event_loop().run_until_complete(test())
3 changes: 3 additions & 0 deletions aioscrapy/libs/pipelines/sink/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# -*- coding: utf-8 -*-
from .mysql import MysqlPipeline
from .mongo import MongoPipeline
84 changes: 84 additions & 0 deletions aioscrapy/libs/pipelines/sink/mongo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import asyncio
import logging
from aioscrapy.db import db_manager

logger = logging.getLogger(__name__)


class MongoPipeline:
def __init__(self, settings):
self.cache_num = settings.getint('SAVE_CACHE_NUM', 500)
self.save_cache_interval = settings.getint('SAVE_CACHE_INTERVAL', 10)
self.lock = asyncio.Lock()
self.running: bool = True
self.db_alias_cache = {}
self.table_cache = {}
self.item_cache = {}
self.db_cache = {}

@classmethod
def from_settings(cls, settings):
return cls(settings)

async def open_spider(self, spider):
asyncio.create_task(self.save_heartbeat())

async def save_heartbeat(self):
while self.running:
await asyncio.sleep(self.save_cache_interval)
asyncio.create_task(self.save_all())

async def process_item(self, item, spider):
await self.save_item(item)
return item

async def close_spider(self, spider):
self.running = False
await self.save_all()

def parse_item_to_cache(self, item: dict):
item.pop('save_insert_type', None)
db_name = item.pop('save_db_name', None)
table_name = item.pop('save_table_name', None)
assert table_name is not None, Exception('please set save_table_name')
save_db_alias = item.pop('save_db_alias', ['default'])
if isinstance(save_db_alias, str):
save_db_alias = [save_db_alias]

cache_key = ''.join(save_db_alias) + (db_name or '') + table_name

if self.table_cache.get(cache_key) is None:
self.db_alias_cache[cache_key] = save_db_alias
self.table_cache[cache_key] = table_name
self.db_cache[cache_key] = db_name
self.item_cache[cache_key] = []

self.item_cache[cache_key].append(item)
return cache_key, len(self.item_cache[cache_key])

async def save_all(self):
async with self.lock:
for cache_key, items in self.item_cache.items():
items and await self._save(cache_key)

async def save_item(self, item: dict):
async with self.lock:
cache_key, cache_count = self.parse_item_to_cache(item)
if cache_count >= self.cache_num:
await self._save(cache_key)

async def _save(self, cache_key):
table_name = self.table_cache[cache_key]
try:
for alias in self.db_alias_cache[cache_key]:
try:
executor = db_manager.mongo.executor(alias)
result = await executor.insert(
table_name, self.item_cache[cache_key], db_name=self.db_cache[cache_key]
)
logger.info(
f'table:{alias}->{table_name} sum:{len(self.item_cache[cache_key])} ok:{len(result.inserted_ids)}')
except Exception as e:
logger.exception(f'save data error, table:{alias}->{table_name}, err_msg:{e}')
finally:
self.item_cache[cache_key] = []
File renamed without changes.
21 changes: 15 additions & 6 deletions example/projectspider/redisdemo/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,19 @@
# SCHEDULER_SERIALIZER = 'aioscrapy.serializer.JsonSerializer'
# SCHEDULER_SERIALIZER = 'aioscrapy.serializer.PickleSerializer'

# ITEM_PIPELINES = {
# 'aioscrapy.libs.pipelines.sink.MysqlPipeline': 100,
# }
ITEM_PIPELINES = {
# 'aioscrapy.libs.pipelines.sink.MysqlPipeline': 100,
'aioscrapy.libs.pipelines.sink.MongoPipeline': 100,
}

BOT_NAME = 'redisdemo'

SPIDER_MODULES = ['redisdemo.spiders']
NEWSPIDER_MODULE = 'redisdemo.spiders'

DOWNLOAD_DELAY = 3
RANDOMIZE_DOWNLOAD_DELAY = True
CONCURRENT_REQUESTS = 1
# DOWNLOAD_DELAY = 3
# RANDOMIZE_DOWNLOAD_DELAY = True
# CONCURRENT_REQUESTS = 1

# SCHEDULER_FLUSH_ON_START = True

Expand Down Expand Up @@ -83,4 +84,12 @@
# }
}

# mongo parameter
MONGO_ARGS = {
'default': {
'host': 'mongodb://root:root@192.168.234.128:27017',
'db': 'test',
},
}

# LOG_FILE = 'test.log'
64 changes: 64 additions & 0 deletions example/singlespider/demo_http2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import re
from urllib.parse import unquote
import logging

from aioscrapy import Request
from aioscrapy.spiders import Spider

logger = logging.getLogger(__name__)


class DemoMemorySpider(Spider):
name = 'DemoMemorySpider'
custom_settings = {
"USER_AGENT": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36",
# 'DOWNLOAD_DELAY': 3,
# 'RANDOMIZE_DOWNLOAD_DELAY': True,
# 'CONCURRENT_REQUESTS': 1,
# 'LOG_LEVEL': 'INFO'
# 'DUPEFILTER_CLASS': 'aioscrapy.dupefilters.disk.RFPDupeFilter',
"CLOSE_SPIDER_ON_IDLE": True,
'DOWNLOAD_HANDLERS': {
'http': 'aioscrapy.core.downloader.handlers.httpx.HttpxDownloadHandler',
'https': 'aioscrapy.core.downloader.handlers.httpx.HttpxDownloadHandler',
},
'HTTPX_CLIENT_SESSION_ARGS': {
'http2': True
}
}

start_urls = ['https://quotes.toscrape.com']

@staticmethod
async def process_request(request, spider):
""" request middleware """
pass

@staticmethod
async def process_response(request, response, spider):
""" response middleware """
return response

@staticmethod
async def process_exception(request, exception, spider):
""" exception middleware """
pass

async def parse(self, response):
for quote in response.css('div.quote'):
yield {
'author': quote.xpath('span/small/text()').get(),
'text': quote.css('span.text::text').get(),
}

next_page = response.css('li.next a::attr("href")').get()
if next_page is not None:
# yield response.follow(next_page, self.parse)
yield Request(f"https://quotes.toscrape.com/{next_page}", callback=self.parse)

async def process_item(self, item):
print(item)


if __name__ == '__main__':
DemoMemorySpider.start()
4 changes: 2 additions & 2 deletions example/singlespider/demo_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class DemoMemorySpider(Spider):
# 'RANDOMIZE_DOWNLOAD_DELAY': True,
# 'CONCURRENT_REQUESTS': 1,
# 'LOG_LEVEL': 'INFO'
'DUPEFILTER_CLASS': 'aioscrapy.dupefilters.disk.RFPDupeFilter',
# 'DUPEFILTER_CLASS': 'aioscrapy.dupefilters.disk.RFPDupeFilter',
"CLOSE_SPIDER_ON_IDLE": True,
}

Expand All @@ -25,7 +25,7 @@ class DemoMemorySpider(Spider):
@staticmethod
async def process_request(request, spider):
""" request middleware """
return request
pass

@staticmethod
async def process_response(request, response, spider):
Expand Down
8 changes: 4 additions & 4 deletions example/singlespider/demo_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class DemoRedisSpider(Spider):
# SCHEDULER_SERIALIZER = 'aioscrapy.serializer.PickleSerializer'

'SCHEDULER_QUEUE_CLASS': 'aioscrapy.queue.redis.SpiderPriorityQueue',
'DUPEFILTER_CLASS': 'aioscrapy.dupefilters.redis.RFPDupeFilter',
# 'DUPEFILTER_CLASS': 'aioscrapy.dupefilters.redis.RFPDupeFilter',
'SCHEDULER_SERIALIZER': 'aioscrapy.serializer.JsonSerializer',
'REDIS_ARGS': {
'queue': {
Expand Down Expand Up @@ -60,9 +60,9 @@ async def parse(self, response):
'text': quote.css('span.text::text').get(),
}

# next_page = response.css('li.next a::attr("href")').get()
# if next_page is not None:
# yield response.follow(next_page, self.parse, dont_filter=False)
next_page = response.css('li.next a::attr("href")').get()
if next_page is not None:
yield response.follow(next_page, self.parse, dont_filter=False)

async def process_item(self, item):
print(item)
Expand Down
Loading

0 comments on commit 10e2819

Please sign in to comment.