龙空技术网

LangChain手册(Python版)28模块:回调

AI让生活更美好 881

前言:

此时姐妹们对“python异步回调”大概比较重视,看官们都想要知道一些“python异步回调”的相关知识。那么小编也在网上搜集了一些有关“python异步回调””的相关知识,希望同学们能喜欢,我们快快来学习一下吧!

LangChain 提供了一个回调系统,允许您挂接到 LLM 应用程序的各个阶段。这对于日志记录、监控、流式传输和其他任务很有用。

callbacks您可以使用整个 API 中可用的参数来订阅这些事件。此参数是处理程序对象的列表,这些对象应实现下面详细描述的一个或多个方法。回调机制主要有两种:

构造函数回调将用于在该对象上进行的所有调用,并且仅限于该对象,即如果您将处理程序传递给构造函数LLMChain,它不会被附加到该链的模型使用。请求回调将仅用于该特定请求,以及它包含的所有子请求(例如,对 an 的调用触发LLMChain对模型的调用,该模型使用传递的相同处理程序)。这些是显式传递的。

高级:当您创建自定义链时,您可以轻松地将其设置为使用与所有内置链相同的回调系统。 Chains/LLMs/Chat Models/Agents/Tools 上的 、 、 和等价的异步方法现在接收一个名为的第二个参数,该参数绑定_call到_generate该运行,并包含该对象可以使用的日志记录方法(即)。这在构建自定义链时很有用。有关如何创建自定义链并在其中使用回调的更多信息,请参阅本指南。_runrun_manageron_llm_new_token

CallbackHandlers是实现CallbackHandler接口的对象,它有一个可以订阅的每个事件的方法。CallbackManager当事件被触发时,将在每个处理程序上调用适当的方法。

class BaseCallbackHandler:    """Base callback handler that can be used to handle callbacks from langchain."""    def on_llm_start(        self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any    ) -> Any:        """Run when LLM starts running."""    def on_llm_new_token(self, token: str, **kwargs: Any) -> Any:        """Run on new LLM token. Only available when streaming is enabled."""    def on_llm_end(self, response: LLMResult, **kwargs: Any) -> Any:        """Run when LLM ends running."""    def on_llm_error(        self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any    ) -> Any:        """Run when LLM errors."""    def on_chain_start(        self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any    ) -> Any:        """Run when chain starts running."""    def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> Any:        """Run when chain ends running."""    def on_chain_error(        self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any    ) -> Any:        """Run when chain errors."""    def on_tool_start(        self, serialized: Dict[str, Any], input_str: str, **kwargs: Any    ) -> Any:        """Run when tool starts running."""    def on_tool_end(self, output: str, **kwargs: Any) -> Any:        """Run when tool ends running."""    def on_tool_error(        self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any    ) -> Any:        """Run when tool errors."""    def on_text(self, text: str, **kwargs: Any) -> Any:        """Run on arbitrary text."""    def on_agent_action(self, action: AgentAction, **kwargs: Any) -> Any:        """Run on agent action."""    def on_agent_finish(self, finish: AgentFinish, **kwargs: Any) -> Any:        """Run on agent end."""
如何使用回调

该callbacks参数在整个 API(链、模型、工具、代理等)中的大多数对象上都可用,位于两个不同的位置:

构造函数回调:在构造函数中定义,例如。LLMChain(callbacks=[handler]),它将用于对该对象进行的所有调用,并且仅限于该对象,例如。如果您将处理程序传递给构造函数LLMChain,则附加到该链的模型将不会使用它。请求回调:定义在call()//用于发出请求的方法中,例如run()。,它将仅用于该特定请求,以及它包含的所有子请求(例如,对 LLMChain 的调用会触发对模型的调用,该模型使用方法中传递的相同处理程序)。apply()chain.call(inputs, callbacks=[handler])call()

该verbose参数在整个 API(链、模型、工具、代理等)中的大多数对象上都可用,作为构造函数参数,例如。LLMChain(verbose=True), 相当于将 a 传递ConsoleCallbackHandler给callbacks该对象及其所有子对象的参数。这对于调试很有用,因为它将所有事件记录到控制台。

你想什么时候使用这些?构造函数回调对于日志记录、监控等用例最有用,这些用例不特定于单个请求,而是特定于整个链。例如,如果您想记录对 LLMChain 发出的所有请求,您可以将处理程序传递给构造函数。请求回调对于诸如流式传输之类的用例最有用,在这种情况下,您希望将单个请求的输出流式传输到特定的 websocket 连接,或其他类似的用例。例如,如果你想将单个请求的输出流式传输到 websocket,你可以将处理程序传递给call()方法使用现有的处理程序

LangChain 提供了一些内置的处理程序,您可以使用它们来入门。这些在模块中可用langchain/callbacks。最基本的处理程序是StdOutCallbackHandler,它只是将所有事件记录到stdout。将来我们将向库中添加更多默认处理程序。

请注意,当verbose对象上的标志设置为 true 时,StdOutCallbackHandler即使没有显式传入也会调用 。

from langchain.callbacks import StdOutCallbackHandlerfrom langchain.chains import LLMChainfrom langchain.llms import OpenAIfrom langchain.prompts import PromptTemplatehandler = StdOutCallbackHandler()llm = OpenAI()prompt = PromptTemplate.from_template("1 + {number} = ")# First, let's explicitly set the StdOutCallbackHandler in `callbacks`chain = LLMChain(llm=llm, prompt=prompt, callbacks=[handler])chain.run(number=2)# Then, let's use the `verbose` flag to achieve the same resultchain = LLMChain(llm=llm, prompt=prompt, verbose=True)chain.run(number=2)# Finally, let's use the request `callbacks` to achieve the same resultchain = LLMChain(llm=llm, prompt=prompt)chain.run(number=2, callbacks=[handler])
> Entering new LLMChain chain...Prompt after formatting:1 + 2 = > Finished chain.> Entering new LLMChain chain...Prompt after formatting:1 + 2 = > Finished chain.> Entering new LLMChain chain...Prompt after formatting:1 + 2 = > Finished chain.
'\n\n3'
创建自定义处理程序

您也可以创建自定义处理程序以在对象上进行设置。在下面的示例中,我们将使用自定义处理程序实现流式传输。

from langchain.callbacks.base import BaseCallbackHandlerfrom langchain.chat_models import ChatOpenAIfrom langchain.schema import HumanMessageclass MyCustomHandler(BaseCallbackHandler):    def on_llm_new_token(self, token: str, **kwargs) -> None:        print(f"My custom handler, token: {token}")# To enable streaming, we pass in `streaming=True` to the ChatModel constructor# Additionally, we pass in a list with our custom handlerchat = ChatOpenAI(max_tokens=25, streaming=True, callbacks=[MyCustomHandler()])chat([HumanMessage(content="Tell me a joke")])
My custom handler, token: My custom handler, token: WhyMy custom handler, token:  didMy custom handler, token:  theMy custom handler, token:  tomatoMy custom handler, token:  turnMy custom handler, token:  redMy custom handler, token: ?My custom handler, token:  BecauseMy custom handler, token:  itMy custom handler, token:  sawMy custom handler, token:  theMy custom handler, token:  saladMy custom handler, token:  dressingMy custom handler, token: !My custom handler, token: 
AIMessage(content='Why did the tomato turn red? Because it saw the salad dressing!', additional_kwargs={})
异步回调

如果您打算使用异步 API,建议使用AsyncCallbackHandler以避免阻塞 runloop。

CallbackHandler如果您在使用异步方法运行您的 llm/chain/tool/agent 时使用同步,则高级,它仍然可以工作。但是,在幕后,run_in_executor如果您CallbackHandler不是线程安全的,它将被调用,这可能会导致问题。

import asynciofrom typing import Any, Dict, Listfrom langchain.schema import LLMResultfrom langchain.callbacks.base import AsyncCallbackHandlerclass MyCustomSyncHandler(BaseCallbackHandler):    def on_llm_new_token(self, token: str, **kwargs) -> None:        print(f"Sync handler being called in a `thread_pool_executor`: token: {token}")class MyCustomAsyncHandler(AsyncCallbackHandler):    """Async callback handler that can be used to handle callbacks from langchain."""    async def on_llm_start(        self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any    ) -> None:        """Run when chain starts running."""        print("zzzz....")        await asyncio.sleep(0.3)        class_name = serialized["name"]        print("Hi! I just woke up. Your llm is starting")    async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:        """Run when chain ends running."""        print("zzzz....")        await asyncio.sleep(0.3)        print("Hi! I just woke up. Your llm is ending")# To enable streaming, we pass in `streaming=True` to the ChatModel constructor# Additionally, we pass in a list with our custom handlerchat = ChatOpenAI(max_tokens=25, streaming=True, callbacks=[MyCustomSyncHandler(), MyCustomAsyncHandler()])await chat.agenerate([[HumanMessage(content="Tell me a joke")]])
zzzz....Hi! I just woke up. Your llm is startingSync handler being called in a `thread_pool_executor`: token: Sync handler being called in a `thread_pool_executor`: token: WhySync handler being called in a `thread_pool_executor`: token:  donSync handler being called in a `thread_pool_executor`: token: 'tSync handler being called in a `thread_pool_executor`: token:  scientistsSync handler being called in a `thread_pool_executor`: token:  trustSync handler being called in a `thread_pool_executor`: token:  atomsSync handler being called in a `thread_pool_executor`: token: ?Sync handler being called in a `thread_pool_executor`: token: BecauseSync handler being called in a `thread_pool_executor`: token:  theySync handler being called in a `thread_pool_executor`: token:  makeSync handler being called in a `thread_pool_executor`: token:  upSync handler being called in a `thread_pool_executor`: token:  everythingSync handler being called in a `thread_pool_executor`: token: !Sync handler being called in a `thread_pool_executor`: token: zzzz....Hi! I just woke up. Your llm is ending
LLMResult(generations=[[ChatGeneration(text="Why don't scientists trust atoms?\n\nBecause they make up everything!", generation_info=None, message=AIMessage(content="Why don't scientists trust atoms?\n\nBecause they make up everything!", additional_kwargs={}))]], llm_output={'token_usage': {}, 'model_name': 'gpt-3.5-turbo'})
使用多个处理程序,传入处理程序

在前面的示例中,我们在创建对象时通过使用传入了回调处理程序callbacks=。在这种情况下,回调将限定在该特定对象内。

但是,在许多情况下,在运行对象时传入处理程序是有利的。当我们在执行运行时CallbackHandlers使用关键字 arg传递时,这些回调将由执行中涉及的所有嵌套对象发出。callbacks例如,当处理程序传递给 时Agent,它将用于与代理相关的所有回调以及代理执行中涉及的所有对象,在本例中为Tools、LLMChain和LLM。

这使我们不必手动将处理程序附加到每个单独的嵌套对象。

from typing import Dict, Union, Any, Listfrom langchain.callbacks.base import BaseCallbackHandlerfrom langchain.schema import AgentActionfrom langchain.agents import AgentType, initialize_agent, load_toolsfrom langchain.callbacks import tracing_enabledfrom langchain.llms import OpenAI# First, define custom callback handler implementationsclass MyCustomHandlerOne(BaseCallbackHandler):    def on_llm_start(        self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any    ) -> Any:        print(f"on_llm_start {serialized['name']}")    def on_llm_new_token(self, token: str, **kwargs: Any) -> Any:        print(f"on_new_token {token}")    def on_llm_error(        self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any    ) -> Any:        """Run when LLM errors."""    def on_chain_start(        self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any    ) -> Any:        print(f"on_chain_start {serialized['name']}")    def on_tool_start(        self, serialized: Dict[str, Any], input_str: str, **kwargs: Any    ) -> Any:        print(f"on_tool_start {serialized['name']}")    def on_agent_action(self, action: AgentAction, **kwargs: Any) -> Any:        print(f"on_agent_action {action}")class MyCustomHandlerTwo(BaseCallbackHandler):    def on_llm_start(        self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any    ) -> Any:        print(f"on_llm_start (I'm the second handler!!) {serialized['name']}")# Instantiate the handlershandler1 = MyCustomHandlerOne()handler2 = MyCustomHandlerTwo()# Setup the agent. Only the `llm` will issue callbacks for handler2llm = OpenAI(temperature=0, streaming=True, callbacks=[handler2])tools = load_tools(["llm-math"], llm=llm)agent = initialize_agent(    tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION)# Callbacks for handler1 will be issued by every object involved in the # Agent execution (llm, llmchain, tool, agent executor)agent.run("What is 2 raised to the 0.235 power?", callbacks=[handler1])
on_chain_start AgentExecutoron_chain_start LLMChainon_llm_start OpenAIon_llm_start (I'm the second handler!!) OpenAIon_new_token  Ion_new_token  needon_new_token  toon_new_token  useon_new_token  aon_new_token  calculatoron_new_token  toon_new_token  solveon_new_token  thison_new_token .on_new_token Actionon_new_token :on_new_token  Calculatoron_new_token Actionon_new_token  Inputon_new_token :on_new_token  2on_new_token ^on_new_token 0on_new_token .on_new_token 235on_new_token on_agent_action AgentAction(tool='Calculator', tool_input='2^0.235', log=' I need to use a calculator to solve this.\nAction: Calculator\nAction Input: 2^0.235')on_tool_start Calculatoron_chain_start LLMMathChainon_chain_start LLMChainon_llm_start OpenAIon_llm_start (I'm the second handler!!) OpenAIon_new_token on_new_token ```texton_new_token on_new_token 2on_new_token **on_new_token 0on_new_token .on_new_token 235on_new_token on_new_token ```on_new_token ...on_new_token numon_new_token expron_new_token .on_new_token evaluateon_new_token ("on_new_token 2on_new_token **on_new_token 0on_new_token .on_new_token 235on_new_token ")on_new_token ...on_new_token on_new_token on_chain_start LLMChainon_llm_start OpenAIon_llm_start (I'm the second handler!!) OpenAIon_new_token  Ion_new_token  nowon_new_token  knowon_new_token  theon_new_token  finalon_new_token  answeron_new_token .on_new_token Finalon_new_token  Answeron_new_token :on_new_token  1on_new_token .on_new_token 17on_new_token 690on_new_token 67on_new_token 372on_new_token 187on_new_token 674on_new_token 
'1.1769067372187674'
跟踪和令牌计数

跟踪和令牌计数是我们提供的两项功能,它们建立在我们的回调机制之上。

追踪

有两种推荐的方法来跟踪您的 LangChain:

将LANGCHAIN_TRACING环境变量设置为"true".使用上下文管理器来跟踪特定的代码块。with tracing_enabled()

请注意,如果设置了环境变量,将跟踪所有代码,无论它是否在上下文管理器中。

import osfrom langchain.agents import AgentType, initialize_agent, load_toolsfrom langchain.callbacks import tracing_enabledfrom langchain.llms import OpenAI# To run the code, make sure to set OPENAI_API_KEY and SERPAPI_API_KEYllm = OpenAI(temperature=0)tools = load_tools(["llm-math", "serpapi"], llm=llm)agent = initialize_agent(    tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True)questions = [    "Who won the US Open men's final in 2019? What is his age raised to the 0.334 power?",    "Who is Olivia Wilde's boyfriend? What is his current age raised to the 0.23 power?",    "Who won the most recent formula 1 grand prix? What is their age raised to the 0.23 power?",    "Who won the US Open women's final in 2019? What is her age raised to the 0.34 power?",    "Who is Beyonce's husband? What is his age raised to the 0.19 power?",]
os.environ["LANGCHAIN_TRACING"] = "true"# Both of the agent runs will be traced because the environment variable is setagent.run(questions[0])with tracing_enabled() as session:    assert session    agent.run(questions[1])
> Entering new AgentExecutor chain... I need to find out who won the US Open men's final in 2019 and then calculate his age raised to the 0.334 power.Action: SearchAction Input: "US Open men's final 2019 winner"Observation: Rafael Nadal defeated Daniil Medvedev in the final, 7–5, 6–3, 5–7, 4–6, 6–4 to win the men's singles tennis title at the 2019 US Open. It was his fourth US ...Thought: I need to find out the age of the winnerAction: SearchAction Input: "Rafael Nadal age"Observation: 36 yearsThought: I need to calculate the age raised to the 0.334 powerAction: CalculatorAction Input: 36^0.334Observation: Answer: 3.3098250249682484Thought: I now know the final answerFinal Answer: Rafael Nadal, aged 36, won the US Open men's final in 2019 and his age raised to the 0.334 power is 3.3098250249682484.> Finished chain.> Entering new AgentExecutor chain... I need to find out who Olivia Wilde's boyfriend is and then calculate his age raised to the 0.23 power.Action: SearchAction Input: "Olivia Wilde boyfriend"Observation: Sudeikis and Wilde's relationship ended in November 2020. Wilde was publicly served with court documents regarding child custody while she was presenting Don't Worry Darling at CinemaCon 2022. In January 2021, Wilde began dating singer Harry Styles after meeting during the filming of Don't Worry Darling.Thought: I need to find out Harry Styles' age.Action: SearchAction Input: "Harry Styles age"Observation: 29 yearsThought: I need to calculate 29 raised to the 0.23 power.Action: CalculatorAction Input: 29^0.23Observation: Answer: 2.169459462491557Thought: I now know the final answer.Final Answer: Harry Styles is Olivia Wilde's boyfriend and his current age raised to the 0.23 power is 2.169459462491557.> Finished chain.
# Now, we unset the environment variable and use a context manager.if "LANGCHAIN_TRACING" in os.environ:    del os.environ["LANGCHAIN_TRACING"]# here, we are writing traces to "my_test_session"with tracing_enabled("my_test_session") as session:    assert session    agent.run(questions[0])  # this should be tracedagent.run(questions[1])  # this should not be traced
> Entering new AgentExecutor chain... I need to find out who won the US Open men's final in 2019 and then calculate his age raised to the 0.334 power.Action: SearchAction Input: "US Open men's final 2019 winner"Observation: Rafael Nadal defeated Daniil Medvedev in the final, 7–5, 6–3, 5–7, 4–6, 6–4 to win the men's singles tennis title at the 2019 US Open. It was his fourth US ...Thought: I need to find out the age of the winnerAction: SearchAction Input: "Rafael Nadal age"Observation: 36 yearsThought: I need to calculate the age raised to the 0.334 powerAction: CalculatorAction Input: 36^0.334Observation: Answer: 3.3098250249682484Thought: I now know the final answerFinal Answer: Rafael Nadal, aged 36, won the US Open men's final in 2019 and his age raised to the 0.334 power is 3.3098250249682484.> Finished chain.> Entering new AgentExecutor chain... I need to find out who Olivia Wilde's boyfriend is and then calculate his age raised to the 0.23 power.Action: SearchAction Input: "Olivia Wilde boyfriend"Observation: Sudeikis and Wilde's relationship ended in November 2020. Wilde was publicly served with court documents regarding child custody while she was presenting Don't Worry Darling at CinemaCon 2022. In January 2021, Wilde began dating singer Harry Styles after meeting during the filming of Don't Worry Darling.Thought: I need to find out Harry Styles' age.Action: SearchAction Input: "Harry Styles age"Observation: 29 yearsThought: I need to calculate 29 raised to the 0.23 power.Action: CalculatorAction Input: 29^0.23Observation: Answer: 2.169459462491557Thought: I now know the final answer.Final Answer: Harry Styles is Olivia Wilde's boyfriend and his current age raised to the 0.23 power is 2.169459462491557.> Finished chain.
"Harry Styles is Olivia Wilde's boyfriend and his current age raised to the 0.23 power is 2.169459462491557."
# The context manager is concurrency safe:if "LANGCHAIN_TRACING" in os.environ:    del os.environ["LANGCHAIN_TRACING"]# start a background tasktask = asyncio.create_task(agent.arun(questions[0]))  # this should not be tracedwith tracing_enabled() as session:    assert session    tasks = [agent.arun(q) for q in questions[1:3]]  # these should be traced    await asyncio.gather(*tasks)await task
> Entering new AgentExecutor chain...> Entering new AgentExecutor chain...> Entering new AgentExecutor chain... I need to find out who won the grand prix and then calculate their age raised to the 0.23 power.Action: SearchAction Input: "Formula 1 Grand Prix Winner" I need to find out who won the US Open men's final in 2019 and then calculate his age raised to the 0.334 power.Action: SearchAction Input: "US Open men's final 2019 winner"Rafael Nadal defeated Daniil Medvedev in the final, 7–5, 6–3, 5–7, 4–6, 6–4 to win the men's singles tennis title at the 2019 US Open. It was his fourth US ... I need to find out who Olivia Wilde's boyfriend is and then calculate his age raised to the 0.23 power.Action: SearchAction Input: "Olivia Wilde boyfriend"Sudeikis and Wilde's relationship ended in November 2020. Wilde was publicly served with court documents regarding child custody while she was presenting Don't Worry Darling at CinemaCon 2022. In January 2021, Wilde began dating singer Harry Styles after meeting during the filming of Don't Worry Darling.Lewis Hamilton has won 103 Grands Prix during his career. He won 21 races with McLaren and has won 82 with Mercedes. Lewis Hamilton holds the record for the ... I need to find out the age of the winnerAction: SearchAction Input: "Rafael Nadal age"36 years I need to find out Harry Styles' age.Action: SearchAction Input: "Harry Styles age" I need to find out Lewis Hamilton's ageAction: SearchAction Input: "Lewis Hamilton Age"29 years I need to calculate the age raised to the 0.334 powerAction: CalculatorAction Input: 36^0.334 I need to calculate 29 raised to the 0.23 power.Action: CalculatorAction Input: 29^0.23Answer: 3.3098250249682484Answer: 2.16945946249155738 years> Finished chain.> Finished chain. I now need to calculate 38 raised to the 0.23 powerAction: CalculatorAction Input: 38^0.23Answer: 2.3086081644669734> Finished chain.
"Rafael Nadal, aged 36, won the US Open men's final in 2019 and his age raised to the 0.334 power is 3.3098250249682484."
代币计数

LangChain 提供了一个上下文管理器,允许您计算代币。

from langchain.callbacks import get_openai_callbackllm = OpenAI(temperature=0)with get_openai_callback() as cb:    llm("What is the square root of 4?")total_tokens = cb.total_tokensassert total_tokens > 0with get_openai_callback() as cb:    llm("What is the square root of 4?")    llm("What is the square root of 4?")assert cb.total_tokens == total_tokens * 2# You can kick off concurrent runs from within the context managerwith get_openai_callback() as cb:    await asyncio.gather(        *[llm.agenerate(["What is the square root of 4?"]) for _ in range(3)]    )assert cb.total_tokens == total_tokens * 3# The context manager is concurrency safetask = asyncio.create_task(llm.agenerate(["What is the square root of 4?"]))with get_openai_callback() as cb:    await llm.agenerate(["What is the square root of 4?"])await taskassert cb.total_tokens == total_tokens

标签: #python异步回调