-
Notifications
You must be signed in to change notification settings - Fork 10
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: postpresql pipeline & excel pipeline & csv pipline
- Loading branch information
Showing
30 changed files
with
1,925 additions
and
1,075 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
1.2.17 | ||
1.3.1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,61 +1,62 @@ | ||
import asyncio | ||
import logging | ||
|
||
import pyhttpx | ||
|
||
from aioscrapy import Request | ||
from aioscrapy.core.downloader.handlers import BaseDownloadHandler | ||
from aioscrapy.http import HtmlResponse | ||
from aioscrapy.settings import Settings | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class PyhttpxDownloadHandler(BaseDownloadHandler): | ||
|
||
def __init__(self, settings): | ||
self.settings: Settings = settings | ||
self.pyhttpx_client_args: dict = self.settings.get('PYHTTPX_CLIENT_ARGS', {}) | ||
self.verify_ssl: bool = self.settings.get("VERIFY_SSL") | ||
self.loop = asyncio.get_running_loop() | ||
|
||
@classmethod | ||
def from_settings(cls, settings: Settings): | ||
return cls(settings) | ||
|
||
async def download_request(self, request: Request, _) -> HtmlResponse: | ||
kwargs = { | ||
'timeout': self.settings.get('DOWNLOAD_TIMEOUT'), | ||
'cookies': dict(request.cookies), | ||
'verify': self.verify_ssl, | ||
'allow_redirects': self.settings.getbool('REDIRECT_ENABLED', True) if request.meta.get( | ||
'dont_redirect') is None else request.meta.get('dont_redirect') | ||
} | ||
post_data = request.body or None | ||
if isinstance(post_data, dict): | ||
kwargs['json'] = post_data | ||
else: | ||
kwargs['data'] = post_data | ||
|
||
headers = request.headers or self.settings.get('DEFAULT_REQUEST_HEADERS') | ||
kwargs['headers'] = headers | ||
|
||
proxy = request.meta.get("proxy") | ||
if proxy: | ||
kwargs["proxies"] = {'https': proxy} | ||
logger.debug(f"use proxy {proxy}: {request.url}") | ||
|
||
session_args = self.pyhttpx_client_args.copy() | ||
with pyhttpx.HttpSession(**session_args) as session: | ||
response = await asyncio.to_thread(session.request, request.method, request.url, **kwargs) | ||
return HtmlResponse( | ||
request.url, | ||
status=response.status_code, | ||
headers=response.headers, | ||
body=response.content, | ||
cookies=dict(response.cookies), | ||
encoding=response.encoding | ||
) | ||
|
||
async def close(self): | ||
pass | ||
import asyncio | ||
import logging | ||
|
||
import pyhttpx | ||
|
||
from aioscrapy import Request | ||
from aioscrapy.core.downloader.handlers import BaseDownloadHandler | ||
from aioscrapy.http import HtmlResponse | ||
from aioscrapy.settings import Settings | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class PyhttpxDownloadHandler(BaseDownloadHandler): | ||
|
||
def __init__(self, settings): | ||
self.settings: Settings = settings | ||
self.pyhttpx_client_args: dict = self.settings.get('PYHTTPX_CLIENT_ARGS', {}) | ||
self.verify_ssl = self.settings.get("VERIFY_SSL", True) | ||
self.loop = asyncio.get_running_loop() | ||
|
||
@classmethod | ||
def from_settings(cls, settings: Settings): | ||
return cls(settings) | ||
|
||
async def download_request(self, request: Request, _) -> HtmlResponse: | ||
kwargs = { | ||
'timeout': self.settings.get('DOWNLOAD_TIMEOUT'), | ||
'cookies': dict(request.cookies), | ||
'verify': self.verify_ssl, | ||
'allow_redirects': self.settings.getbool('REDIRECT_ENABLED', True) if request.meta.get( | ||
'dont_redirect') is None else request.meta.get('dont_redirect') | ||
} | ||
post_data = request.body or None | ||
if isinstance(post_data, dict): | ||
kwargs['json'] = post_data | ||
else: | ||
kwargs['data'] = post_data | ||
|
||
headers = request.headers or self.settings.get('DEFAULT_REQUEST_HEADERS') | ||
kwargs['headers'] = headers | ||
|
||
proxy = request.meta.get("proxy") | ||
if proxy: | ||
kwargs["proxies"] = {'https': proxy} | ||
logger.debug(f"use proxy {proxy}: {request.url}") | ||
|
||
session_args = self.pyhttpx_client_args.copy() | ||
session_args.setdefault('http2', True) | ||
with pyhttpx.HttpSession(**session_args) as session: | ||
response = await asyncio.to_thread(session.request, request.method, request.url, **kwargs) | ||
return HtmlResponse( | ||
request.url, | ||
status=response.status_code, | ||
headers=response.headers, | ||
body=response.content, | ||
cookies=dict(response.cookies), | ||
encoding=response.encoding | ||
) | ||
|
||
async def close(self): | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,73 +1,74 @@ | ||
import logging | ||
from importlib import import_module | ||
from typing import Any | ||
|
||
import aioscrapy | ||
from aioscrapy.db.absmanager import AbsDBPoolManager | ||
from aioscrapy.db.aioredis import redis_manager | ||
from aioscrapy.utils.misc import load_object | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
__all__ = ['db_manager', 'get_pool', 'get_manager'] | ||
|
||
DB_MODULE_MAP = { | ||
'redis': ('redis', 'aioscrapy.db.aioredis.redis_manager'), | ||
'aiomysql': ('mysql', 'aioscrapy.db.aiomysql.mysql_manager'), | ||
'aio_pika': ('rabbitmq', 'aioscrapy.db.aiorabbitmq.rabbitmq_manager'), | ||
'motor': ('mongo', 'aioscrapy.db.aiomongo.mongo_manager'), | ||
} | ||
|
||
db_manager_map = {} | ||
|
||
for module_name, (manager_key, class_path) in DB_MODULE_MAP.items(): | ||
try: | ||
import_module(module_name) | ||
except ImportError: | ||
pass | ||
else: | ||
db_manager_map[manager_key] = load_object(class_path) | ||
|
||
|
||
class DBManager: | ||
|
||
@staticmethod | ||
def get_manager(db_type: str) -> AbsDBPoolManager: | ||
manager = db_manager_map.get(db_type) | ||
assert manager is not None, f"Not support db type:{db_type}" | ||
return manager | ||
|
||
def get_pool(self, db_type: str, alias='default') -> Any: | ||
manager = self.get_manager(db_type) | ||
return manager.get_pool(alias) | ||
|
||
@staticmethod | ||
async def close_all() -> None: | ||
for manager in db_manager_map.values(): | ||
await manager.close_all() | ||
|
||
@staticmethod | ||
async def from_dict(db_args: dict) -> None: | ||
for db_type, args in db_args.items(): | ||
manager = db_manager_map.get(db_type) | ||
if manager is None: | ||
logger.warning(f'Not support db type: {db_type}; Only {", ".join(db_manager_map.keys())} supported') | ||
await manager.from_dict(args) | ||
|
||
@staticmethod | ||
async def from_settings(settings: aioscrapy.Settings) -> None: | ||
for manager in db_manager_map.values(): | ||
await manager.from_settings(settings) | ||
|
||
async def from_crawler(self, crawler: "aioscrapy.Crawler") -> None: | ||
return await self.from_settings(crawler.settings) | ||
|
||
def __getattr__(self, db_type: str) -> Any: | ||
if db_type not in db_manager_map: | ||
raise AttributeError(f'Not support db type: {db_type}') | ||
return db_manager_map[db_type] | ||
|
||
|
||
db_manager = DBManager() | ||
get_manager = db_manager.get_manager | ||
get_pool = db_manager.get_pool | ||
import logging | ||
from importlib import import_module | ||
from typing import Any | ||
|
||
import aioscrapy | ||
from aioscrapy.db.absmanager import AbsDBPoolManager | ||
from aioscrapy.db.aioredis import redis_manager | ||
from aioscrapy.utils.misc import load_object | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
__all__ = ['db_manager', 'get_pool', 'get_manager'] | ||
|
||
DB_MODULE_MAP = { | ||
'redis': ('redis', 'aioscrapy.db.aioredis.redis_manager'), | ||
'aiomysql': ('mysql', 'aioscrapy.db.aiomysql.mysql_manager'), | ||
'aio_pika': ('rabbitmq', 'aioscrapy.db.aiorabbitmq.rabbitmq_manager'), | ||
'motor': ('mongo', 'aioscrapy.db.aiomongo.mongo_manager'), | ||
'asyncpg': ('pg', 'aioscrapy.db.aiopg.pg_manager'), | ||
} | ||
|
||
db_manager_map = {} | ||
|
||
for module_name, (manager_key, class_path) in DB_MODULE_MAP.items(): | ||
try: | ||
import_module(module_name) | ||
except ImportError: | ||
pass | ||
else: | ||
db_manager_map[manager_key] = load_object(class_path) | ||
|
||
|
||
class DBManager: | ||
|
||
@staticmethod | ||
def get_manager(db_type: str) -> AbsDBPoolManager: | ||
manager = db_manager_map.get(db_type) | ||
assert manager is not None, f"Not support db type:{db_type}" | ||
return manager | ||
|
||
def get_pool(self, db_type: str, alias='default') -> Any: | ||
manager = self.get_manager(db_type) | ||
return manager.get_pool(alias) | ||
|
||
@staticmethod | ||
async def close_all() -> None: | ||
for manager in db_manager_map.values(): | ||
await manager.close_all() | ||
|
||
@staticmethod | ||
async def from_dict(db_args: dict) -> None: | ||
for db_type, args in db_args.items(): | ||
manager = db_manager_map.get(db_type) | ||
if manager is None: | ||
logger.warning(f'Not support db type: {db_type}; Only {", ".join(db_manager_map.keys())} supported') | ||
await manager.from_dict(args) | ||
|
||
@staticmethod | ||
async def from_settings(settings: aioscrapy.Settings) -> None: | ||
for manager in db_manager_map.values(): | ||
await manager.from_settings(settings) | ||
|
||
async def from_crawler(self, crawler: "aioscrapy.Crawler") -> None: | ||
return await self.from_settings(crawler.settings) | ||
|
||
def __getattr__(self, db_type: str) -> Any: | ||
if db_type not in db_manager_map: | ||
raise AttributeError(f'Not support db type: {db_type}') | ||
return db_manager_map[db_type] | ||
|
||
|
||
db_manager = DBManager() | ||
get_manager = db_manager.get_manager | ||
get_pool = db_manager.get_pool |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
import logging | ||
from contextlib import asynccontextmanager | ||
|
||
from asyncpg.pool import create_pool | ||
|
||
import aioscrapy | ||
from aioscrapy.db.absmanager import AbsDBPoolManager | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class PGExecutor: | ||
def __init__(self, alias: str, pool_manager: "AioPGPoolManager"): | ||
self.alias = alias | ||
self.pool_manager = pool_manager | ||
|
||
async def insert(self, sql, value): | ||
async with self.pool_manager.get(self.alias) as connect: | ||
try: | ||
result = await connect.executemany(sql, value) | ||
return result | ||
except Exception as e: | ||
await connect.rollback() | ||
raise Exception from e | ||
|
||
async def fetch(self, sql: str): | ||
async with self.pool_manager.get(self.alias) as connect: | ||
return await connect.fetch(sql) | ||
|
||
async def query(self, sql: str): | ||
return await self.fetch(sql) | ||
|
||
|
||
class AioPGPoolManager(AbsDBPoolManager): | ||
_clients = {} | ||
|
||
async def create(self, alias: str, params: dict): | ||
if alias in self._clients: | ||
return self._clients[alias] | ||
|
||
params = params.copy() | ||
params.setdefault('timeout', 30) | ||
pg_pool = await create_pool(**params) | ||
return self._clients.setdefault(alias, pg_pool) | ||
|
||
def get_pool(self, alias: str): | ||
pg_pool = self._clients.get(alias) | ||
assert pg_pool is not None, f"Dont create the PG pool named {alias}" | ||
return pg_pool | ||
|
||
@asynccontextmanager | ||
async def get(self, alias: str): | ||
""" Get connection of pg """ | ||
pg_pool = self.get_pool(alias) | ||
conn = await pg_pool.acquire() | ||
try: | ||
yield conn | ||
finally: | ||
await pg_pool.release(conn) | ||
|
||
def executor(self, alias: str) -> PGExecutor: | ||
return PGExecutor(alias, self) | ||
|
||
async def close(self, alias: str): | ||
pg_pool = self._clients.pop(alias, None) | ||
if pg_pool: | ||
await pg_pool.close() | ||
|
||
async def close_all(self): | ||
for alias in list(self._clients.keys()): | ||
await self.close(alias) | ||
|
||
async def from_dict(self, db_args: dict): | ||
for alias, pg_args in db_args.items(): | ||
await self.create(alias, pg_args) | ||
|
||
async def from_settings(self, settings: aioscrapy.Settings): | ||
for alias, pg_args in settings.getdict('PG_ARGS').items(): | ||
await self.create(alias, pg_args) | ||
|
||
|
||
pg_manager = AioPGPoolManager() | ||
|
||
if __name__ == '__main__': | ||
import asyncio | ||
|
||
|
||
async def test(): | ||
pg_pool = await pg_manager.create( | ||
'default', | ||
dict( | ||
user='username', | ||
password='pwd', | ||
database='dbname', | ||
host='127.0.0.1' | ||
) | ||
) | ||
|
||
# 方式一: | ||
conn = await pg_pool.acquire() | ||
try: | ||
result = await conn.fetch('select 1 ') | ||
print(tuple(result[0])) | ||
finally: | ||
await pg_pool.release(conn) | ||
|
||
# 方式二: | ||
async with pg_manager.get('default') as conn: | ||
result = await conn.fetch('select 1 ') | ||
print(tuple(result[0])) | ||
|
||
asyncio.run(test()) |
Oops, something went wrong.