# 标准库导入 import os # 操作系统接口 import copy # 深浅拷贝功能 from functools import lru_cache # 最近最少使用缓存装饰器 import json # JSON数据处理 import base64 # Base64编解码 import struct # 处理二进制数据结构 # 第三方异步库 import aioboto3 # AWS SDK的异步版本 import aiohttp # 异步HTTP客户端/服务器 import ollama # Ollama API客户端 # 数值计算和机器学习库 import numpy as np # 数值计算库 import torch # PyTorch深度学习框架 from transformers import ( # Hugging Face转换器库 AutoTokenizer, # 自动分词器 AutoModelForCausalLM, # 自动因果语言模型 ) # OpenAI相关导入 from openai import ( AsyncOpenAI, # OpenAI异步客户端 APIConnectionError, # API连接错误 RateLimitError, # 速率限制错误 Timeout, # 超时错误 AsyncAzureOpenAI, # Azure OpenAI异步客户端 ) # 重试机制相关导入 from tenacity import ( retry, # 重试装饰器 stop_after_attempt, # 最大重试次数 wait_exponential, # 指数退避等待 retry_if_exception_type, # 基于异常类型的重试条件 ) # 数据验证和类型提示 from pydantic import BaseModel, Field # 数据验证模型 from typing import List, Dict, Callable, Any # 类型提示 # 本地模块导入 from .base import BaseKVStorage # 键值存储基类 from .utils import ( compute_args_hash, # 计算参数哈希值 wrap_embedding_func_with_attrs, # 包装嵌入函数的装饰器 ) # 禁用并行化以避免tokenizers的并行化导致的问题 os.environ["TOKENIZERS_PARALLELISM"] = "false" # 使用retry装饰器处理重试逻辑,处理OpenAI API的速率限制、连接和超时错误 @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10), retry=retry_if_exception_type((RateLimitError, APIConnectionError, Timeout)), ) async def openai_complete_if_cache( model, prompt, system_prompt=None, history_messages=[], base_url=None, api_key=None, **kwargs, ) -> str: """ 异步函数,通过OpenAI的API获取语言模型的补全结果,支持缓存机制。 参数: - model: 使用的模型名称 - prompt: 用户输入的提示 - system_prompt: 系统提示(可选) - history_messages: 历史消息(可选) - base_url: API的基础URL(可选) - api_key: API密钥(可选) - **kwargs: 其他参数 返回: - str: 模型生成的文本 """ # 设置环境变量中的API密钥 if api_key: os.environ["OPENAI_API_KEY"] = api_key # 初始化OpenAI异步客户端 openai_async_client = ( AsyncOpenAI() if base_url is None else AsyncOpenAI(base_url=base_url) ) # 初始化哈希存储和消息列表 hashing_kv: BaseKVStorage = kwargs.pop("hashing_kv", None) messages = [] # 添加系统提示到消息列表 if system_prompt: messages.append({"role": "system", "content": system_prompt}) # 将历史消息和当前提示添加到消息列表 messages.extend(history_messages) messages.append({"role": "user", "content": prompt}) # 检查缓存中是否有结果 if hashing_kv is not None: args_hash = compute_args_hash(model, messages) if_cache_return = await hashing_kv.get_by_id(args_hash) if if_cache_return is not None: return if_cache_return["return"] # 调用OpenAI API获取补全结果 response = await openai_async_client.chat.completions.create( model=model, messages=messages, **kwargs ) # 将结果缓存 if hashing_kv is not None: await hashing_kv.upsert( {args_hash: {"return": response.choices[0].message.content, "model": model}} ) # 返回生成的文本 return response.choices[0].message.content # 与openai_complete_if_cache类似的函数,但用于Azure OpenAI服务 @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10), retry=retry_if_exception_type((RateLimitError, APIConnectionError, Timeout)), ) async def azure_openai_complete_if_cache( model, prompt, system_prompt=None, history_messages=[], base_url=None, api_key=None, **kwargs, ): """ 异步函数,通过Azure OpenAI的API获取语言模型的补全结果,支持缓存机制。 参数: - model: 使用的模型名称 - prompt: 用户输入的提示 - system_prompt: 系统提示(可选) - history_messages: 历史消息(可选) - base_url: API的基础URL(可选) - api_key: API密钥(可选) - **kwargs: 其他参数 返回: - str: 模型生成的文本 """ # 设置环境变量中的API密钥和端点 if api_key: os.environ["AZURE_OPENAI_API_KEY"] = api_key if base_url: os.environ["AZURE_OPENAI_ENDPOINT"] = base_url # 初始化Azure OpenAI异步客户端 openai_async_client = AsyncAzureOpenAI( azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"), api_key=os.getenv("AZURE_OPENAI_API_KEY"), api_version=os.getenv("AZURE_OPENAI_API_VERSION"), ) # 初始化哈希存储和消息列表 hashing_kv: BaseKVStorage = kwargs.pop("hashing_kv", None) messages = [] # 添加系统提示到消息列表 if system_prompt: messages.append({"role": "system", "content": system_prompt}) # 将历史消息和当前提示添加到消息列表 messages.extend(history_messages) if prompt is not None: messages.append({"role": "user", "content": prompt}) # 检查缓存中是否有结果 if hashing_kv is not None: args_hash = compute_args_hash(model, messages) if_cache_return = await hashing_kv.get_by_id(args_hash) if if_cache_return is not None: return if_cache_return["return"] # 调用Azure OpenAI API获取补全结果 response = await openai_async_client.chat.completions.create( model=model, messages=messages, **kwargs ) # 将结果缓存 if hashing_kv is not None: await hashing_kv.upsert( {args_hash: {"return": response.choices[0].message.content, "model": model}} ) # 返回生成的文本 return response.choices[0].message.content class BedrockError(Exception): """Amazon Bedrock 相关问题的通用错误""" @retry( stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=60), retry=retry_if_exception_type((BedrockError)), ) async def bedrock_complete_if_cache( model, prompt, system_prompt=None, history_messages=[], aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None, **kwargs, ) -> str: """ 异步使用 Amazon Bedrock 完成文本生成,支持缓存。 如果缓存命中,则直接返回缓存结果。该函数在失败时支持重试。 参数: - model: 要使用的 Bedrock 模型的模型 ID。 - prompt: 用户输入的提示。 - system_prompt: 系统提示,如果有。 - history_messages: 会话历史消息列表,用于对话上下文。 - aws_access_key_id: AWS 访问密钥 ID。 - aws_secret_access_key: AWS 秘密访问密钥。 - aws_session_token: AWS 会话令牌。 - **kwargs: 其他参数,例如推理参数。 返回: - str: 生成的文本结果。 """ # 设置 AWS 凭证 os.environ["AWS_ACCESS_KEY_ID"] = os.environ.get( "AWS_ACCESS_KEY_ID", aws_access_key_id ) os.environ["AWS_SECRET_ACCESS_KEY"] = os.environ.get( "AWS_SECRET_ACCESS_KEY", aws_secret_access_key ) os.environ["AWS_SESSION_TOKEN"] = os.environ.get( "AWS_SESSION_TOKEN", aws_session_token ) # 修复消息历史记录格式 messages = [] for history_message in history_messages: message = copy.copy(history_message) message["content"] = [{"text": message["content"]}] messages.append(message) # 添加用户提示 messages.append({"role": "user", "content": [{"text": prompt}]}) # 初始化 Converse API 参数 args = {"modelId": model, "messages": messages} # 定义系统提示 if system_prompt: args["system"] = [{"text": system_prompt}] # 映射并设置推理参数 inference_params_map = { "max_tokens": "maxTokens", "top_p": "topP", "stop_sequences": "stopSequences", } if inference_params := list( set(kwargs) & set(["max_tokens", "temperature", "top_p", "stop_sequences"]) ): args["inferenceConfig"] = {} for param in inference_params: args["inferenceConfig"][inference_params_map.get(param, param)] = ( kwargs.pop(param) ) # 处理缓存逻辑 hashing_kv: BaseKVStorage = kwargs.pop("hashing_kv", None) if hashing_kv is not None: args_hash = compute_args_hash(model, messages) if_cache_return = await hashing_kv.get_by_id(args_hash) if if_cache_return is not None: return if_cache_return["return"] # 通过 Converse API 调用模型 session = aioboto3.Session() async with session.client("bedrock-runtime") as bedrock_async_client: try: response = await bedrock_async_client.converse(**args, **kwargs) except Exception as e: raise BedrockError(e) # 更新缓存(如果启用) if hashing_kv is not None: await hashing_kv.upsert( { args_hash: { "return": response["output"]["message"]["content"][0]["text"], "model": model, } } ) return response["output"]["message"]["content"][0]["text"] @lru_cache(maxsize=1) def initialize_hf_model(model_name): """ 初始化Hugging Face模型和tokenizer。 使用指定的模型名称初始化模型和tokenizer,并根据需要设置padding token。 参数: - model_name: 模型的名称。 返回: - hf_model: 初始化的Hugging Face模型。 - hf_tokenizer: 初始化的Hugging Face tokenizer。 """ hf_tokenizer = AutoTokenizer.from_pretrained( model_name, device_map="auto", trust_remote_code=True ) hf_model = AutoModelForCausalLM.from_pretrained( model_name, device_map="auto", trust_remote_code=True ) if hf_tokenizer.pad_token is None: hf_tokenizer.pad_token = hf_tokenizer.eos_token return hf_model, hf_tokenizer async def hf_model_if_cache( model, prompt, system_prompt=None, history_messages=[], **kwargs ) -> str: """ 使用缓存的Hugging Face模型进行推理。 如果缓存中存在相同的输入,则直接返回结果,否则使用指定的模型进行推理并将结果缓存。 参数: - model: 模型的名称。 - prompt: 用户的输入提示。 - system_prompt: 系统的提示(可选)。 - history_messages: 历史消息列表(可选)。 - **kwargs: 其他关键字参数,例如hashing_kv用于缓存存储。 返回: - response_text: 模型的响应文本。 """ model_name = model hf_model, hf_tokenizer = initialize_hf_model(model_name) hashing_kv: BaseKVStorage = kwargs.pop("hashing_kv", None) messages = [] if system_prompt: messages.append({"role": "system", "content": system_prompt}) messages.extend(history_messages) messages.append({"role": "user", "content": prompt}) if hashing_kv is not None: args_hash = compute_args_hash(model, messages) if_cache_return = await hashing_kv.get_by_id(args_hash) if if_cache_return is not None: return if_cache_return["return"] input_prompt = "" try: input_prompt = hf_tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) except Exception: try: ori_message = copy.deepcopy(messages) if messages[0]["role"] == "system": messages[1]["content"] = ( "" + messages[0]["content"] + "\n" + messages[1]["content"] ) messages = messages[1:] input_prompt = hf_tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) except Exception: len_message = len(ori_message) for msgid in range(len_message): input_prompt = ( input_prompt + "<" + ori_message[msgid]["role"] + ">" + ori_message[msgid]["content"] + "\n" ) input_ids = hf_tokenizer( input_prompt, return_tensors="pt", padding=True, truncation=True ).to("cuda") inputs = {k: v.to(hf_model.device) for k, v in input_ids.items()} output = hf_model.generate( **input_ids, max_new_tokens=512, num_return_sequences=1, early_stopping=True ) response_text = hf_tokenizer.decode( output[0][len(inputs["input_ids"][0]) :], skip_special_tokens=True ) if hashing_kv is not None: await hashing_kv.upsert({args_hash: {"return": response_text, "model": model}}) return response_text async def ollama_model_if_cache( model, prompt, system_prompt=None, history_messages=[], **kwargs ) -> str: """ 异步函数,通过Olama模型生成回答,支持缓存机制以优化性能。 参数: model: 使用的模型名称。 prompt: 用户的提问。 system_prompt: 系统的提示,用于设定对话背景。 history_messages: 历史对话消息,用于维持对话上下文。 **kwargs: 其他参数,包括max_tokens, response_format, host, timeout等。 返回: 生成的模型回答。 """ # 移除不需要的参数,以符合Olama客户端的期望 kwargs.pop("max_tokens", None) kwargs.pop("response_format", None) host = kwargs.pop("host", None) timeout = kwargs.pop("timeout", None) # 初始化Olama异步客户端 ollama_client = ollama.AsyncClient(host=host, timeout=timeout) # 构建消息列表,首先添加系统提示(如果有) messages = [] if system_prompt: messages.append({"role": "system", "content": system_prompt}) # 获取哈希存储实例,用于缓存 hashing_kv: BaseKVStorage = kwargs.pop("hashing_kv", None) # 将历史消息和当前用户提问添加到消息列表 messages.extend(history_messages) messages.append({"role": "user", "content": prompt}) # 如果提供了哈希存储,尝试从缓存中获取回答 if hashing_kv is not None: args_hash = compute_args_hash(model, messages) if_cache_return = await hashing_kv.get_by_id(args_hash) if if_cache_return is not None: return if_cache_return["return"] # 如果缓存中没有回答,调用Olama模型生成回答 response = await ollama_client.chat(model=model, messages=messages, **kwargs) # 提取生成的回答内容 result = response["message"]["content"] # 如果使用了哈希存储,将新生成的回答存入缓存 if hashing_kv is not None: await hashing_kv.upsert({args_hash: {"return": result, "model": model}}) # 返回生成的回答 return result @lru_cache(maxsize=1) def initialize_lmdeploy_pipeline( model, tp=1, chat_template=None, log_level="WARNING", model_format="hf", quant_policy=0, ): """ 初始化lmdeploy管道,用于模型推理,带有缓存机制。 参数: model: 模型路径。 tp: 张量并行度。 chat_template: 聊天模板配置。 log_level: 日志级别。 model_format: 模型格式。 quant_policy: 量化策略。 返回: 初始化的lmdeploy管道实例。 """ # 导入必要的模块和类 from lmdeploy import pipeline, ChatTemplateConfig, TurbomindEngineConfig # 创建并配置lmdeploy管道 lmdeploy_pipe = pipeline( model_path=model, backend_config=TurbomindEngineConfig( tp=tp, model_format=model_format, quant_policy=quant_policy ), chat_template_config=ChatTemplateConfig(model_name=chat_template) if chat_template else None, log_level="WARNING", ) # 返回配置好的管道实例 return lmdeploy_pipe async def lmdeploy_model_if_cache( model, prompt, system_prompt=None, history_messages=[], chat_template=None, model_format="hf", quant_policy=0, **kwargs, ) -> str: """ 异步执行语言模型推理,支持缓存。 该函数初始化 lmdeploy 管道进行模型推理,支持多种模型格式和量化策略。它处理输入的提示文本、系统提示和历史消息, 并尝试从缓存中检索响应。如果未命中缓存,则生成响应并缓存结果以供将来使用。 参数: model (str): 模型路径。 可以是以下选项之一: - i) 通过 `lmdeploy convert` 命令转换或从 ii) 和 iii) 下载的本地 turbomind 模型目录路径。 - ii) 在 huggingface.co 上托管的 lmdeploy 量化模型的 model_id,例如 "InternLM/internlm-chat-20b-4bit", "lmdeploy/llama2-chat-70b-4bit" 等。 - iii) 在 huggingface.co 上托管的模型的 model_id,例如 "internlm/internlm-chat-7b", "Qwen/Qwen-7B-Chat ", "baichuan-inc/Baichuan2-7B-Chat" 等。 chat_template (str): 当模型是 huggingface.co 上的 PyTorch 模型时需要,例如 "internlm-chat-7b", "Qwen-7B-Chat ", "Baichuan2-7B-Chat" 等,以及当本地路径的模型名称与 HF 中的原始模型名称不匹配时。 tp (int): 张量并行度 prompt (Union[str, List[str]]): 要完成的输入文本。 do_preprocess (bool): 是否预处理消息。默认为 True,表示将应用 chat_template。 skip_special_tokens (bool): 解码时是否移除特殊标记。默认为 True。 do_sample (bool): 是否使用采样,否则使用贪心解码。默认为 False,表示使用贪心解码。 """ # 导入 lmdeploy 及相关模块,如果未安装则抛出错误 try: import lmdeploy from lmdeploy import version_info, GenerationConfig except Exception: raise ImportError("请在初始化 lmdeploy 后端之前安装 lmdeploy。") # 提取并处理关键字参数 kwargs.pop("response_format", None) max_new_tokens = kwargs.pop("max_tokens", 512) tp = kwargs.pop("tp", 1) skip_special_tokens = kwargs.pop("skip_special_tokens", True) do_preprocess = kwargs.pop("do_preprocess", True) do_sample = kwargs.pop("do_sample", False) gen_params = kwargs # 检查 lmdeploy 版本兼容性,确保支持 do_sample 参数 version = version_info if do_sample is not None and version < (0, 6, 0): raise RuntimeError( "`do_sample` 参数在 lmdeploy v0.6.0 之前不受支持,当前使用的 lmdeploy 版本为 {}" .format(lmdeploy.__version__) ) else: do_sample = True gen_params.update(do_sample=do_sample) # 初始化 lmdeploy 管道 lmdeploy_pipe = initialize_lmdeploy_pipeline( model=model, tp=tp, chat_template=chat_template, model_format=model_format, quant_policy=quant_policy, log_level="WARNING", ) # 构建消息列表 messages = [] if system_prompt: messages.append({"role": "system", "content": system_prompt}) # 获取哈希存储对象 hashing_kv: BaseKVStorage = kwargs.pop("hashing_kv", None) messages.extend(history_messages) messages.append({"role": "user", "content": prompt}) # 尝试从缓存中获取响应 if hashing_kv is not None: args_hash = compute_args_hash(model, messages) if_cache_return = await hashing_kv.get_by_id(args_hash) if if_cache_return is not None: return if_cache_return["return"] # 配置生成参数 gen_config = GenerationConfig( skip_special_tokens=skip_special_tokens, max_new_tokens=max_new_tokens, **gen_params, ) # 生成响应 response = "" async for res in lmdeploy_pipe.generate( messages, gen_config=gen_config, do_preprocess=do_preprocess, stream_response=False, session_id=1, ): response += res.response # 缓存生成的响应 if hashing_kv is not None: await hashing_kv.upsert({args_hash: {"return": response, "model": model}}) return response async def gpt_4o_complete( prompt, system_prompt=None, history_messages=[], **kwargs ) -> str: """ 使用GPT-4o模型完成文本生成任务。 参数: - prompt: 用户输入的提示文本。 - system_prompt: 系统级别的提示文本,用于指导模型生成。 - history_messages: 历史对话消息,用于上下文理解。 - **kwargs: 其他可变关键字参数。 返回: - 生成的文本结果。 """ return await openai_complete_if_cache( "gpt-4o", prompt, system_prompt=system_prompt, history_messages=history_messages, **kwargs, ) async def gpt_4o_mini_complete( prompt, system_prompt=None, history_messages=[], **kwargs ) -> str: """ 使用较小的GPT-4o模型完成文本生成任务。 参数: - prompt: 用户输入的提示文本。 - system_prompt: 系统级别的提示文本,用于指导模型生成。 - history_messages: 历史对话消息,用于上下文理解。 - **kwargs: 其他可变关键字参数。 返回: - 生成的文本结果。 """ return await openai_complete_if_cache( "gpt-4o-mini", prompt, system_prompt=system_prompt, history_messages=history_messages, **kwargs, ) async def azure_openai_complete( prompt, system_prompt=None, history_messages=[], **kwargs ) -> str: """ 使用Azure上的OpenAI模型完成文本生成任务。 参数: - prompt: 用户输入的提示文本。 - system_prompt: 系统级别的提示文本,用于指导模型生成。 - history_messages: 历史对话消息,用于上下文理解。 - **kwargs: 其他可变关键字参数。 返回: - 生成的文本结果。 """ return await azure_openai_complete_if_cache( "conversation-4o-mini", prompt, system_prompt=system_prompt, history_messages=history_messages, **kwargs, ) async def bedrock_complete( prompt, system_prompt=None, history_messages=[], **kwargs ) -> str: """ 使用Bedrock平台的特定模型完成文本生成任务。 参数: - prompt: 用户输入的提示文本。 - system_prompt: 系统级别的提示文本,用于指导模型生成。 - history_messages: 历史对话消息,用于上下文理解。 - **kwargs: 其他可变关键字参数。 返回: - 生成的文本结果。 """ return await bedrock_complete_if_cache( "anthropic.claude-3-haiku-20240307-v1:0", prompt, system_prompt=system_prompt, history_messages=history_messages, **kwargs, ) async def hf_model_complete( prompt, system_prompt=None, history_messages=[], **kwargs ) -> str: """ 使用Hugging Face模型完成文本生成任务。 参数: - prompt: 用户输入的提示文本。 - system_prompt: 系统级别的提示文本,用于指导模型生成。 - history_messages: 历史对话消息,用于上下文理解。 - **kwargs: 其他可变关键字参数,包括模型名称。 返回: - 生成的文本结果。 """ model_name = kwargs["hashing_kv"].global_config["llm_model_name"] return await hf_model_if_cache( model_name, prompt, system_prompt=system_prompt, history_messages=history_messages, **kwargs, ) async def ollama_model_complete( prompt, system_prompt=None, history_messages=[], **kwargs ) -> str: """ 使用Ollama模型完成文本生成任务。 参数: - prompt: 用户输入的提示文本。 - system_prompt: 系统级别的提示文本,用于指导模型生成。 - history_messages: 历史对话消息,用于上下文理解。 - **kwargs: 其他可变关键字参数,包括模型名称。 返回: - 生成的文本结果。 """ model_name = kwargs["hashing_kv"].global_config["llm_model_name"] return await ollama_model_if_cache( model_name, prompt, system_prompt=system_prompt, history_messages=history_messages, **kwargs, ) # 使用装饰器添加属性,如嵌入维度和最大令牌大小 @wrap_embedding_func_with_attrs(embedding_dim=1536, max_token_size=8192) # 使用重试机制处理可能的速率限制、API连接和超时错误 @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=60), retry=retry_if_exception_type((RateLimitError, APIConnectionError, Timeout)), ) async def openai_embedding( texts: list[str], model: str = "text-embedding-3-small", base_url: str = None, api_key: str = None, ) -> np.ndarray: """ 使用OpenAI模型生成文本嵌入。 参数: - texts: 需要生成嵌入的文本列表 - model: 使用的模型名称 - base_url: API的基础URL - api_key: API密钥 返回: - 嵌入的NumPy数组 """ if api_key: os.environ["OPENAI_API_KEY"] = api_key openai_async_client = ( AsyncOpenAI() if base_url is None else AsyncOpenAI(base_url=base_url) ) response = await openai_async_client.embeddings.create( model=model, input=texts, encoding_format="float" ) return np.array([dp.embedding for dp in response.data]) # 使用装饰器添加属性,如嵌入维度和最大令牌大小 @wrap_embedding_func_with_attrs(embedding_dim=1536, max_token_size=8192) # 使用重试机制处理可能的速率限制、API连接和超时错误 @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10), retry=retry_if_exception_type((RateLimitError, APIConnectionError, Timeout)), ) async def azure_openai_embedding( texts: list[str], model: str = "text-embedding-3-small", base_url: str = None, api_key: str = None, ) -> np.ndarray: """ 使用Azure OpenAI模型生成文本嵌入。 参数: - texts: 需要生成嵌入的文本列表 - model: 使用的模型名称 - base_url: API的基础URL - api_key: API密钥 返回: - 嵌入的NumPy数组 """ if api_key: os.environ["AZURE_OPENAI_API_KEY"] = api_key if base_url: os.environ["AZURE_OPENAI_ENDPOINT"] = base_url openai_async_client = AsyncAzureOpenAI( azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"), api_key=os.getenv("AZURE_OPENAI_API_KEY"), api_version=os.getenv("AZURE_OPENAI_API_VERSION"), ) response = await openai_async_client.embeddings.create( model=model, input=texts, encoding_format="float" ) return np.array([dp.embedding for dp in response.data]) # 使用重试机制处理可能的速率限制、API连接和超时错误 @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=60), retry=retry_if_exception_type((RateLimitError, APIConnectionError, Timeout)), ) async def siliconcloud_embedding( texts: list[str], model: str = "netease-youdao/bce-embedding-base_v1", base_url: str = "https://api.siliconflow.cn/v1/embeddings", max_token_size: int = 512, api_key: str = None, ) -> np.ndarray: """ 使用SiliconCloud模型生成文本嵌入。 参数: - texts: 需要生成嵌入的文本列表 - model: 使用的模型名称 - base_url: API的基础URL - max_token_size: 最大令牌大小 - api_key: API密钥 返回: - 嵌入的NumPy数组 """ if api_key and not api_key.startswith("Bearer "): api_key = "Bearer " + api_key headers = {"Authorization": api_key, "Content-Type": "application/json"} truncate_texts = [text[0:max_token_size] for text in texts] payload = {"model": model, "input": truncate_texts, "encoding_format": "base64"} base64_strings = [] async with aiohttp.ClientSession() as session: async with session.post(base_url, headers=headers, json=payload) as response: content = await response.json() if "code" in content: raise ValueError(content) base64_strings = [item["embedding"] for item in content["data"]] embeddings = [] for string in base64_strings: decode_bytes = base64.b64decode(string) n = len(decode_bytes) // 4 float_array = struct.unpack("<" + "f" * n, decode_bytes) embeddings.append(float_array) return np.array(embeddings) # @wrap_embedding_func_with_attrs(embedding_dim=1024, max_token_size=8192) # @retry( # stop=stop_after_attempt(3), # wait=wait_exponential(multiplier=1, min=4, max=10), # retry=retry_if_exception_type((RateLimitError, APIConnectionError, Timeout)), # TODO: fix exceptions # ) async def bedrock_embedding( texts: list[str], model: str = "amazon.titan-embed-text-v2:0", aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None, ) -> np.ndarray: """ 生成给定文本的嵌入向量。 使用指定的模型对文本列表进行嵌入处理,支持Amazon Bedrock和Cohere模型。 参数: - texts: 需要嵌入的文本列表。 - model: 使用的模型标识符,默认为"amazon.titan-embed-text-v2:0"。 - aws_access_key_id: AWS访问密钥ID。 - aws_secret_access_key: AWS秘密访问密钥。 - aws_session_token: AWS会话令牌。 返回: - 嵌入向量的NumPy数组。 """ # 设置AWS环境变量 os.environ["AWS_ACCESS_KEY_ID"] = os.environ.get( "AWS_ACCESS_KEY_ID", aws_access_key_id ) os.environ["AWS_SECRET_ACCESS_KEY"] = os.environ.get( "AWS_SECRET_ACCESS_KEY", aws_secret_access_key ) os.environ["AWS_SESSION_TOKEN"] = os.environ.get( "AWS_SESSION_TOKEN", aws_session_token ) # 创建aioboto3会话 session = aioboto3.Session() async with session.client("bedrock-runtime") as bedrock_async_client: # 根据模型提供者进行不同的处理 if (model_provider := model.split(".")[0]) == "amazon": embed_texts = [] for text in texts: # 根据模型版本构建请求体 if "v2" in model: body = json.dumps( { "inputText": text, # 'dimensions': embedding_dim, "embeddingTypes": ["float"], } ) elif "v1" in model: body = json.dumps({"inputText": text}) else: raise ValueError(f"Model {model} is not supported!") # 调用Bedrock模型 response = await bedrock_async_client.invoke_model( modelId=model, body=body, accept="application/json", contentType="application/json", ) response_body = await response.get("body").json() embed_texts.append(response_body["embedding"]) elif model_provider == "cohere": body = json.dumps( {"texts": texts, "input_type": "search_document", "truncate": "NONE"} ) response = await bedrock_async_client.invoke_model( model=model, body=body, accept="application/json", contentType="application/json", ) response_body = json.loads(response.get("body").read()) embed_texts = response_body["embeddings"] else: raise ValueError(f"Model provider '{model_provider}' is not supported!") return np.array(embed_texts) async def hf_embedding(texts: list[str], tokenizer, embed_model) -> np.ndarray: """ 使用Hugging Face模型生成给定文本的嵌入向量。 参数: - texts: 需要嵌入的文本列表。 - tokenizer: Hugging Face的标记器实例。 - embed_model: Hugging Face的嵌入模型实例。 返回: - 嵌入向量的NumPy数组。 """ # 对文本进行标记化处理 input_ids = tokenizer( texts, return_tensors="pt", padding=True, truncation=True ).input_ids # 使用模型生成嵌入向量 with torch.no_grad(): outputs = embed_model(input_ids) embeddings = outputs.last_hidden_state.mean(dim=1) return embeddings.detach().numpy() async def ollama_embedding(texts: list[str], embed_model, **kwargs) -> np.ndarray: """ 使用Ollama模型生成给定文本的嵌入向量。 参数: - texts: 需要嵌入的文本列表。 - embed_model: 使用的嵌入模型标识符。 - **kwargs: 传递给Ollama客户端的其他参数。 返回: - 嵌入向量的列表。 """ embed_text = [] # 创建Ollama客户端实例 ollama_client = ollama.Client(**kwargs) for text in texts: # 调用模型生成嵌入向量 data = ollama_client.embeddings(model=embed_model, prompt=text) embed_text.append(data["embedding"]) return embed_text class Model(BaseModel): """ This is a Pydantic model class named 'Model' that is used to define a custom language model. Attributes: gen_func (Callable[[Any], str]): A callable function that generates the response from the language model. The function should take any argument and return a string. kwargs (Dict[str, Any]): A dictionary that contains the arguments to pass to the callable function. This could include parameters such as the model name, API key, etc. Example usage: Model(gen_func=openai_complete_if_cache, kwargs={"model": "gpt-4", "api_key": os.environ["OPENAI_API_KEY_1"]}) In this example, 'openai_complete_if_cache' is the callable function that generates the response from the OpenAI model. The 'kwargs' dictionary contains the model name and API key to be passed to the function. """ gen_func: Callable[[Any], str] = Field( ..., description="A function that generates the response from the llm. The response must be a string", ) kwargs: Dict[str, Any] = Field( ..., description="The arguments to pass to the callable function. Eg. the api key, model name, etc", ) class Config: arbitrary_types_allowed = True class MultiModel: """ Distributes the load across multiple language models. Useful for circumventing low rate limits with certain api providers especially if you are on the free tier. Could also be used for spliting across diffrent models or providers. Attributes: models (List[Model]): A list of language models to be used. Usage example: ```python models = [ Model(gen_func=openai_complete_if_cache, kwargs={"model": "gpt-4", "api_key": os.environ["OPENAI_API_KEY_1"]}), Model(gen_func=openai_complete_if_cache, kwargs={"model": "gpt-4", "api_key": os.environ["OPENAI_API_KEY_2"]}), Model(gen_func=openai_complete_if_cache, kwargs={"model": "gpt-4", "api_key": os.environ["OPENAI_API_KEY_3"]}), Model(gen_func=openai_complete_if_cache, kwargs={"model": "gpt-4", "api_key": os.environ["OPENAI_API_KEY_4"]}), Model(gen_func=openai_complete_if_cache, kwargs={"model": "gpt-4", "api_key": os.environ["OPENAI_API_KEY_5"]}), ] multi_model = MultiModel(models) rag = LightRAG( llm_model_func=multi_model.llm_model_func / ..other args ) ``` """ def __init__(self, models: List[Model]): self._models = models self._current_model = 0 def _next_model(self): self._current_model = (self._current_model + 1) % len(self._models) return self._models[self._current_model] async def llm_model_func( self, prompt, system_prompt=None, history_messages=[], **kwargs ) -> str: kwargs.pop("model", None) # stop from overwriting the custom model name next_model = self._next_model() args = dict( prompt=prompt, system_prompt=system_prompt, history_messages=history_messages, **kwargs, **next_model.kwargs, ) return await next_model.gen_func(**args) if __name__ == "__main__": import asyncio async def main(): result = await gpt_4o_mini_complete("How are you?") print(result) asyncio.run(main())