feat: Integrate WaterCrawl.dev as a new knowledge base provider

Add WaterCrawl.dev as an alternative provider for website crawling in datasets/knowledge base alongside Firecrawl and Jina Reader. This integration enhances the data source options for knowledge bases, allowing users to configure and use WaterCrawl for their website content extraction needs. Resolved #15950
This commit is contained in:
Amir Mohsen 2025-03-21 00:30:34 +01:00
parent 29a4dec387
commit e563a4ae20
2 changed files with 35 additions and 25 deletions

View File

@ -1,5 +1,6 @@
import json import json
from typing import Union, Generator from collections.abc import Generator
from typing import Union
from urllib.parse import urljoin from urllib.parse import urljoin
import requests import requests
@ -21,35 +22,35 @@ class BaseAPIClient:
session.headers.update({'Accept-Language': 'en-US'}) session.headers.update({'Accept-Language': 'en-US'})
return session return session
def _get(self, endpoint: str, query_params: dict = None, **kwargs): def _get(self, endpoint: str, query_params: dict | None = None, **kwargs):
return self.session.get( return self.session.get(
urljoin(self.base_url, endpoint), urljoin(self.base_url, endpoint),
params=query_params, params=query_params,
**kwargs **kwargs
) )
def _post(self, endpoint: str, query_params: dict = None, data: dict = None, **kwargs): def _post(self, endpoint: str, query_params: dict | None = None, data: dict | None = None, **kwargs):
return self.session.post( return self.session.post(
urljoin(self.base_url, endpoint), urljoin(self.base_url, endpoint),
params=query_params, params=query_params,
json=data, **kwargs json=data, **kwargs
) )
def _put(self, endpoint: str, query_params: dict = None, data: dict = None, **kwargs): def _put(self, endpoint: str, query_params: dict | None = None, data: dict | None = None, **kwargs):
return self.session.put( return self.session.put(
urljoin(self.base_url, endpoint), urljoin(self.base_url, endpoint),
params=query_params, params=query_params,
json=data, **kwargs json=data, **kwargs
) )
def _delete(self, endpoint: str, query_params: dict = None, **kwargs): def _delete(self, endpoint: str, query_params: dict | None = None, **kwargs):
return self.session.delete( return self.session.delete(
urljoin(self.base_url, endpoint), urljoin(self.base_url, endpoint),
params=query_params, params=query_params,
**kwargs **kwargs
) )
def _patch(self, endpoint: str, query_params: dict = None, data: dict = None, **kwargs): def _patch(self, endpoint: str, query_params: dict | None = None, data: dict | None = None, **kwargs):
return self.session.patch( return self.session.patch(
urljoin(self.base_url, endpoint), urljoin(self.base_url, endpoint),
params=query_params, params=query_params,
@ -58,7 +59,7 @@ class BaseAPIClient:
class WaterCrawlAPIClient(BaseAPIClient): class WaterCrawlAPIClient(BaseAPIClient):
def __init__(self, api_key, base_url: str = 'https://app.watercrawl.dev/'): def __init__(self, api_key, base_url: str | None = 'https://app.watercrawl.dev/'):
super().__init__(api_key, base_url) super().__init__(api_key, base_url)
def process_eventstream(self, response: Response, download: bool = False): def process_eventstream(self, response: Response, download: bool = False):
@ -86,7 +87,7 @@ class WaterCrawlAPIClient(BaseAPIClient):
raise Exception(f'Unknown response type: {response.headers.get("Content-Type")}') raise Exception(f'Unknown response type: {response.headers.get("Content-Type")}')
def get_crawl_requests_list(self, page: int = None, page_size: int = None): def get_crawl_requests_list(self, page: int | None = None, page_size: int | None = None):
query_params = { query_params = {
'page': page or 1, 'page': page or 1,
'page_size': page_size or 10 'page_size': page_size or 10
@ -107,10 +108,10 @@ class WaterCrawlAPIClient(BaseAPIClient):
def create_crawl_request( def create_crawl_request(
self, self,
url: Union[list, str] = None, url: Union[list, str] | None = None,
spider_options: dict = None, spider_options: dict | None = None,
page_options: dict = None, page_options: dict | None = None,
plugin_options: dict = None plugin_options: dict | None = None
): ):
data = { data = {
# 'urls': url if isinstance(url, list) else [url], # 'urls': url if isinstance(url, list) else [url],
@ -154,7 +155,13 @@ class WaterCrawlAPIClient(BaseAPIClient):
), ),
) )
def get_crawl_request_results(self, item_id: str, page: int = 1, page_size: int = 25, query_params: dict = None): def get_crawl_request_results(
self,
item_id: str,
page: int = 1,
page_size: int = 25,
query_params: dict | None = None
):
query_params = query_params or {} query_params = query_params or {}
query_params.update({ query_params.update({
'page': page or 1, 'page': page or 1,
@ -169,22 +176,22 @@ class WaterCrawlAPIClient(BaseAPIClient):
def scrape_url(self, def scrape_url(self,
url: str, url: str,
page_options: dict = None, page_options: dict | None = None,
plugin_options: dict = None, plugin_options: dict | None = None,
sync: bool = True, sync: bool = True,
prefetched: bool = True prefetched: bool = True
): ):
result = self.create_crawl_request( response_result = self.create_crawl_request(
url=url, url=url,
page_options=page_options, page_options=page_options,
plugin_options=plugin_options plugin_options=plugin_options
) )
if not sync: if not sync:
return result return response_result
for result in self.monitor_crawl_request(result['uuid'], prefetched): for event_data in self.monitor_crawl_request(response_result['uuid'], prefetched):
if result['type'] == 'result': if event_data['type'] == 'result':
return result['data'] return event_data['data']
def download_result(self, result_object: dict): def download_result(self, result_object: dict):
response = requests.get(result_object['result']) response = requests.get(result_object['result'])

View File

@ -1,10 +1,10 @@
from datetime import datetime, timezone from datetime import datetime
from core.rag.extractor.watercrawl.client import WaterCrawlAPIClient from core.rag.extractor.watercrawl.client import WaterCrawlAPIClient
class WaterCrawlProvider: class WaterCrawlProvider:
def __init__(self, api_key, base_url: str = None): def __init__(self, api_key, base_url: str | None = None):
self.client = WaterCrawlAPIClient(api_key, base_url) self.client = WaterCrawlAPIClient(api_key, base_url)
def crawl_url(self, url, options): def crawl_url(self, url, options):
@ -25,7 +25,7 @@ class WaterCrawlProvider:
page_options = { page_options = {
"exclude_tags": options.get("exclude_tags", '').split(",") if options.get("exclude_tags") else [], "exclude_tags": options.get("exclude_tags", '').split(",") if options.get("exclude_tags") else [],
"include_tags": options.get("include_tags", '').split(",") if options.get("include_tags") else [], "include_tags": options.get("include_tags", '').split(",") if options.get("include_tags") else [],
"wait_time": wait_time if wait_time > 1000 else 1000, # minimum wait time is 1 second "wait_time": max(1000, wait_time), # minimum wait time is 1 second
"include_html": False, "include_html": False,
"only_main_content": options.get("only_main_content", True), "only_main_content": options.get("only_main_content", True),
"include_links": False, "include_links": False,
@ -55,7 +55,10 @@ class WaterCrawlProvider:
time_consuming = 0 time_consuming = 0
if time_str: if time_str:
time_obj = datetime.strptime(time_str, "%H:%M:%S.%f") time_obj = datetime.strptime(time_str, "%H:%M:%S.%f")
time_consuming = time_obj.hour * 3600 + time_obj.minute * 60 + time_obj.second + time_obj.microsecond / 1_000_000 time_consuming = (time_obj.hour * 3600 +
time_obj.minute * 60 +
time_obj.second +
time_obj.microsecond / 1_000_000)
return { return {
"status": status, "status": status,
@ -98,7 +101,7 @@ class WaterCrawlProvider:
"markdown": result_object.get('result').get("markdown"), "markdown": result_object.get('result').get("markdown"),
} }
def _get_results(self, crawl_request_id: str, query_params: dict = None): def _get_results(self, crawl_request_id: str, query_params: dict | None = None):
page = 0 page = 0
page_size = 100 page_size = 100