Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def _process_async_callback(
self, callback_results: AsyncGeneratorType, response: Response = None
):
try:
async for callback_result in callback_results:
if isinstance(callback_result, AsyncGeneratorType):
await self._process_async_callback(callback_result)
elif isinstance(callback_result, Request):
self.request_queue.put_nowait(
self.handle_request(request=callback_result)
)
elif isinstance(callback_result, typing.Coroutine):
self.request_queue.put_nowait(
self.handle_callback(
aws_callback=callback_result, response=response
)
)
elif isinstance(callback_result, Item):
# Process target item
await self.process_item(callback_result)
else:
await self.process_callback_result(callback_result=callback_result)
except Exception as e:
self.logger.error(e)
async def _process_async_callback(self, callback_results: AsyncGeneratorType, response: Response = None):
try:
async for callback_result in callback_results:
if isinstance(callback_result, AsyncGeneratorType):
await self._process_async_callback(callback_result)
elif isinstance(callback_result, Request):
self.request_queue.put_nowait(self.handle_request(request=callback_result))
elif isinstance(callback_result, typing.Coroutine):
self.request_queue.put_nowait(self.handle_callback(aws_callback=callback_result, response=response))
elif isinstance(callback_result, Item):
# Process target item
await self.process_item(callback_result)
else:
await self.process_callback_result(callback_result=callback_result)
except Exception as e:
print(333)
self.logger.error(e)
headers: dict = None,
metadata: dict = None,
request_config: dict = None,
request_session=None,
**kwargs):
"""Init a Request class for crawling html"""
headers = headers or {}
metadata = metadata or {}
request_config = request_config or {}
request_session = request_session or self.request_session
headers.update(self.headers.copy())
request_config.update(self.request_config.copy())
kwargs.update(self.kwargs.copy())
return Request(url=url,
method=method,
callback=callback,
encoding=encoding,
headers=headers,
metadata=metadata,
request_config=request_config,
request_session=request_session,
**kwargs)
metadata: dict = None,
request_config: dict = None,
request_session=None,
**kwargs,
):
"""Init a Request class for crawling html"""
headers = headers or {}
metadata = metadata or {}
request_config = request_config or {}
request_session = request_session or self.request_session
headers.update(self.headers.copy())
request_config.update(self.request_config.copy())
kwargs.update(self.kwargs.copy())
return Request(
url=url,
method=method,
callback=callback,
encoding=encoding,
headers=headers,
metadata=metadata,
request_config=request_config,
request_session=request_session,
**kwargs,
)
async def _retry(self, error_msg):
"""Manage request"""
if self.retry_times > 0:
retry_times = self.request_config.get("RETRIES", 3) - self.retry_times + 1
self.logger.error(
f", Retry times: {retry_times}, Retry message: {error_msg}>"
)
self.retry_times -= 1
retry_func = self.request_config.get("RETRY_FUNC")
if retry_func and iscoroutinefunction(retry_func):
request_ins = await retry_func(weakref.proxy(self))
if isinstance(request_ins, Request):
return await request_ins.fetch()
return await self.fetch()
else:
response = Response(
url=self.url,
method=self.method,
metadata=self.metadata,
cookies={},
history=(),
headers=None,
)
return response
async def _get_html(cls, html: str = "", url: str = "", **kwargs):
if html or url:
if url:
sem = kwargs.pop("sem", None)
request = Request(url, **kwargs)
if sem:
_, response = await request.fetch_callback(sem=sem)
else:
response = await request.fetch()
html = response.html
return etree.HTML(html)
else:
ValueError("html(url or html_etree) is expected")