Overview › Forums › Discussions › index/reindex and search with API › Reply To: index/reindex and search with API
import asyncio
import aiohttp
import json
import logging
import uuid
logging.basicConfig(
level=logging.INFO,
format=’%(asctime)s – %(levelname)s – %(message)s’,
datefmt=’%Y-%m-%d %H:%M:%S’
)
# 线程安全的计数器
counter_lock = asyncio.Lock()
counter = 0
def build_payload(method, params):
“””
构建 JSON-RPC 请求的有效载荷。
参数:
method (str): API 方法名。
params (dict): 请求参数。
返回:
dict: 包含 id、jsonrpc、method 和 params 的字典。
“””
return {
“id”: str(uuid.uuid4()),
“jsonrpc”: “2.0”,
“method”: method,
“params”: {“input”: params}
}
async def make_request(session, url, headers, payload):
“””
发送异步 HTTP POST 请求。
参数:
session (aiohttp.ClientSession): 异步 HTTP 会话。
url (str): 请求的目标 URL。
headers (dict): 请求头信息。
payload (dict): 请求的有效载荷。
返回:
dict or None: 请求成功时返回响应的 JSON 数据,失败时返回 None。
“””
try:
async with session.post(url, headers=headers, data=json.dumps(payload)) as response:
response.raise_for_status()
return await response.json()
except (aiohttp.ClientError, json.JSONDecodeError) as e:
logging.error(f”请求异常: {e}, Payload: {payload}”)
return None
except aiohttp.ClientResponseError as e:
logging.error(f”HTTP 响应错误: {e}, Payload: {payload}”)
return None
except aiohttp.ClientConnectionError as e:
logging.error(f”连接错误: {e}, Payload: {payload}”)
return None
except Exception as e:
logging.error(f”未知异常: {e}, Payload: {payload}”)
return None
async def get_result(url, headers, text, Dir, filterExt, max_concurrent_requests=1):
“””
获取搜索结果并处理文件片段。
参数:
url (str): 请求的目标 URL。
headers (dict): 请求头信息。
text (str): 搜索文本。
Dir (str): 目录过滤条件。
filterExt (str): 文件扩展名过滤条件。
max_concurrent_requests (int): 最大并发请求数,默认为 10。
“””
params = {
“pattern”: text,
“filterDir”: Dir,
“filterExt”: filterExt,
“lastModifyBegin”: 0,
“lastModifyEnd”: 2147483647,
“limit”: 300,
“offset”: 0,
“order”: 0
}
payload = build_payload(“ATRpcServer.Searcher.V1.GetResult”, params)
async with aiohttp.ClientSession() as session:
result = await make_request(session, url, headers, payload)
if not result:
return
output = result.get(‘result’, {}).get(‘data’, {}).get(‘output’, ‘无输出’)
if isinstance(output, dict):
logging.info(“找到 %s 个”, output.get(‘count’, 0))
files = output.get(‘files’, [])
fids = [file[0] for file in files]
if fids:
semaphore = asyncio.Semaphore(max_concurrent_requests)
tasks = [get_fragment(session, url, headers, fid, semaphore, text) for fid in fids]
await asyncio.gather(*tasks)
else:
logging.info(“没有找到文件信息”)
else:
logging.info(“无效的输出格式”)
async def get_fragment(session, url, headers, fid, semaphore, text):
“””
获取指定文件 ID 的片段内容。
参数:
session (aiohttp.ClientSession): 异步 HTTP 会话。
url (str): 请求的目标 URL。
headers (dict): 请求头信息。
fid (str): 文件 ID。
semaphore (asyncio.Semaphore): 控制并发数的信号量。
text (str): 搜索文本。
“””
async with semaphore:
params = {
“fid”: fid,
“pattern”: text
}
payload = build_payload(“ATRpcServer.Searcher.V1.GetFragment”, params)
result = await make_request(session, url, headers, payload)
if not result:
return
output = result.get(‘result’, {}).get(‘data’, {}).get(‘output’, ‘无输出’)
if isinstance(output, dict) and ‘text’ in output:
global counter
async with counter_lock:
counter += 1
logging.info(f”片段内容({counter}): {output[‘text’]}”)
else:
logging.info(“文件ID: %s, 输出: %s”, fid, output)
if __name__ == “__main__”:
# 配置请求 URL、请求头、搜索文本、目录和文件扩展名过滤条件
url = “http://localhost:9920”
headers = {
“Accept”: “application/json”,
“Content-Type”: “application/json”
}
text = “\”智能数据库\”” #”\”智能数据库\””
Dir = “E:\\学习资料\\教材\\”
filterExt = “*.epub”
# 运行异步主函数
asyncio.run(get_result(url, headers, text, Dir, filterExt))