From bfce7d8d7faae507c09f794ef307ffc56f4a01ba Mon Sep 17 00:00:00 2001 From: Niels Garve Date: Sat, 7 Sep 2024 12:18:59 +0200 Subject: [PATCH] feat(actions): enable streaming in custom actions --- .../rag/custom_rag_streaming/README.md | 93 ++++++++++++ .../rag/custom_rag_streaming/config.py | 96 +++++++++++++ .../rag/custom_rag_streaming/config.yml | 6 + .../rag/custom_rag_streaming/kb/report.md | 135 ++++++++++++++++++ .../rag/custom_rag_streaming/rails/output.co | 11 ++ nemoguardrails/streaming.py | 6 + 6 files changed, 347 insertions(+) create mode 100644 examples/configs/rag/custom_rag_streaming/README.md create mode 100644 examples/configs/rag/custom_rag_streaming/config.py create mode 100644 examples/configs/rag/custom_rag_streaming/config.yml create mode 100644 examples/configs/rag/custom_rag_streaming/kb/report.md create mode 100644 examples/configs/rag/custom_rag_streaming/rails/output.co diff --git a/examples/configs/rag/custom_rag_streaming/README.md b/examples/configs/rag/custom_rag_streaming/README.md new file mode 100644 index 000000000..e06007917 --- /dev/null +++ b/examples/configs/rag/custom_rag_streaming/README.md @@ -0,0 +1,93 @@ +# Custom RAG with streaming + +It is possible to use streaming in custom actions, such as RAG. + +This is because the streaming handler is defined and available as a context variable. + +```python +import contextvars +streaming_handler_var = contextvars.ContextVar("streaming_handler", default=None) +streaming_handler: StreamingHandler = streaming_handler_var.get() +``` + +But let's first clarify the folder structure of this example: + +- `kb/` - A folder containing our knowledge base to retrieve context from. This folder includes the March 2023 US Jobs + report in `kb/report.md`. +- `rails/output.co` - A colang file that contains a flow that routes all user messages into our + custom RAG. +- `config.py` - The config file containing the custom RAG action, the disclaimer action, and the init function that gets + called as part of the initialization of the LLMRails instance. +- `config.yml` - The config file holding all the configuration options. + +The following code samples demonstrate the core of this example in action: + +```colang +# output.co + +define flow answer report question + user ... + $answer = execute rag + bot $answer + $disclaimer = execute disclaimer + bot $disclaimer +``` + +```python +# config.py + +class ContinuousStreamingHandler(StreamingHandler): + async def _process(self, chunk: str): + """Processes a chunk of text. + + Stops the stream if the chunk is `""` or `None` (stopping chunks). + In case you want to keep the stream open, all non-stopping chunks can be piped to a specified handler. + """ + if chunk is None or chunk == "": + await self.queue.put(chunk) + self.streaming_finished_event.set() + self.top_k_nonempty_lines_event.set() + return + + await super()._process(chunk) + + +async def rag(context: dict, llm: BaseLLM, kb: KnowledgeBase) -> ActionResult: + + # ... + + chain = prompt_template | llm | output_parser + + # 💡 Enable streaming + streaming_handler: StreamingHandler = streaming_handler_var.get() + local_streaming_handler = ContinuousStreamingHandler() + local_streaming_handler.set_pipe_to(streaming_handler) + + config = RunnableConfig(callbacks=[local_streaming_handler]) + answer = await chain.ainvoke(input_variables, config) + + return ActionResult(return_value=answer, context_updates=context_updates) +``` + +Here's what's happening, step by step: + +1. We define a custom RAG chain using LangChain's LCEL, but it could be any library. For example, you could call the + `openai` library directly. +2. We then define a `RunnableConfig` with a local streaming handler as a callback. The local handler is configured to + pipe the stream to the main streaming handler. The idea behind this is to handle stream-stopping chunks (`""` or + `None`) only locally, while keeping the main streaming handler running. This enables streaming results from multiple + actions. +3. We then invoke the chain with the config, which will trigger the streaming handler to be called. +4. Finally, we return the final answer as `ActionResult` which enables downstream processing. In this example, we define + a `disclaimer` action that just prints a sentence; it could also access the final answer or other context + variables we define as `context_updates`. + +_Note: For simplicity, we re-use the LLM instance configured in [config.yml](./config.yml) as well as the +built-in retrieval via the knowledge base._ + +## Run the example + +```shell +$ export OPENAI_API_KEY='sk-xxx' +$ python -m nemoguardrails.__main__ chat --config //examples/configs/rag/custom_rag_streaming --streaming +``` diff --git a/examples/configs/rag/custom_rag_streaming/config.py b/examples/configs/rag/custom_rag_streaming/config.py new file mode 100644 index 000000000..a93f32c2e --- /dev/null +++ b/examples/configs/rag/custom_rag_streaming/config.py @@ -0,0 +1,96 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from langchain.llms.base import BaseLLM +from langchain.prompts import PromptTemplate +from langchain_core.output_parsers import StrOutputParser +from langchain_core.runnables import RunnableConfig + +from nemoguardrails import LLMRails +from nemoguardrails.actions.actions import ActionResult +from nemoguardrails.context import streaming_handler_var +from nemoguardrails.kb.kb import KnowledgeBase +from nemoguardrails.streaming import StreamingHandler + +TEMPLATE = """Use the following pieces of context to answer the question at the end. +If you don't know the answer, just say that you don't know, don't try to make up an answer. +Use three sentences maximum and keep the answer as concise as possible. +Always say "thanks for asking!" at the end of the answer. + +{context} + +Question: {question} + +Helpful Answer:""" + + +class ContinuousStreamingHandler(StreamingHandler): + async def _process(self, chunk: str): + """Processes a chunk of text. + + Stops the stream if the chunk is `""` or `None` (stopping chunks). + In case you want to keep the stream open, all non-stopping chunks can be piped to a specified handler. + """ + if chunk is None or chunk == "": + await self.queue.put(chunk) + self.streaming_finished_event.set() + self.top_k_nonempty_lines_event.set() + return + + await super()._process(chunk) + + +async def rag(context: dict, llm: BaseLLM, kb: KnowledgeBase) -> ActionResult: + user_message = context.get("last_user_message") + context_updates = {} + + # For our custom RAG, we re-use the built-in retrieval + chunks = await kb.search_relevant_chunks(user_message) + relevant_chunks = "\n".join([chunk["body"] for chunk in chunks]) + # Store the chunks for downstream use + context_updates["relevant_chunks"] = relevant_chunks + + # Use a custom prompt template + prompt_template = PromptTemplate.from_template(TEMPLATE) + input_variables = {"question": user_message, "context": relevant_chunks} + # Store the template for downstream use + context_updates["_last_bot_prompt"] = prompt_template.format(**input_variables) + + print(f"💬 RAG :: prompt_template: {context_updates['_last_bot_prompt']}") + + # Put together a simple LangChain chain + output_parser = StrOutputParser() + chain = prompt_template | llm | output_parser + + # 💡 Enable streaming + streaming_handler: StreamingHandler = streaming_handler_var.get() + local_streaming_handler = ContinuousStreamingHandler() + local_streaming_handler.set_pipe_to(streaming_handler) + + config = RunnableConfig(callbacks=[local_streaming_handler]) + answer = await chain.ainvoke(input_variables, config) + + return ActionResult(return_value=answer, context_updates=context_updates) + + +async def disclaimer() -> ActionResult: + return ActionResult( + return_value="I learn something new every day, so my answers may not always be perfect." + ) + + +def init(app: LLMRails): + app.register_action(rag, "rag") + app.register_action(disclaimer, "disclaimer") diff --git a/examples/configs/rag/custom_rag_streaming/config.yml b/examples/configs/rag/custom_rag_streaming/config.yml new file mode 100644 index 000000000..ff52d1070 --- /dev/null +++ b/examples/configs/rag/custom_rag_streaming/config.yml @@ -0,0 +1,6 @@ +streaming: True + +models: + - type: main + engine: openai + model: gpt-3.5-turbo diff --git a/examples/configs/rag/custom_rag_streaming/kb/report.md b/examples/configs/rag/custom_rag_streaming/kb/report.md new file mode 100644 index 000000000..5af73a794 --- /dev/null +++ b/examples/configs/rag/custom_rag_streaming/kb/report.md @@ -0,0 +1,135 @@ +# Jobs Report - March 2023 + +Technical information: + Household data: (202) 691-6378 * cpsinfo@bls.gov * www.bls.gov/cps + Establishment data: (202) 691-6555 * cesinfo@bls.gov * www.bls.gov/ces + +Media contact: (202) 691-5902 * PressOffice@bls.gov + + + THE EMPLOYMENT SITUATION -- MARCH 2023 + + +Total nonfarm payroll employment rose by 236,000 in March, and the unemployment rate +changed little at 3.5 percent, the U.S. Bureau of Labor Statistics reported today. +Employment continued to trend up in leisure and hospitality, government, professional +and business services, and health care. + +This news release presents statistics from two monthly surveys. The household survey +measures labor force status, including unemployment, by demographic characteristics. +The establishment survey measures nonfarm employment, hours, and earnings by industry. +For more information about the concepts and statistical methodology used in these two +surveys, see the Technical Note. + +## Household Survey Data + +Both the unemployment rate, at 3.5 percent, and the number of unemployed persons, at +5.8 million, changed little in March. These measures have shown little net movement +since early 2022. (See table A-1.) + +Among the major worker groups, the unemployment rate for Hispanics decreased to 4.6 +percent in March, essentially offsetting an increase in the prior month. The +unemployment rates for adult men (3.4 percent), adult women (3.1 percent), teenagers +(9.8 percent), Whites (3.2 percent), Blacks (5.0 percent), and Asians (2.8 percent) +showed little or no change over the month. (See tables A-1, A-2, and A-3.) + +Among the unemployed, the number of permanent job losers increased by 172,000 to 1.6 +million in March, and the number of reentrants to the labor force declined by 182,000 +to 1.7 million. (Reentrants are persons who previously worked but were not in the +labor force prior to beginning their job search.) (See table A-11.) + +The number of long-term unemployed (those jobless for 27 weeks or more) was little +changed at 1.1 million in March. These individuals accounted for 18.9 percent of all +unemployed persons. (See table A-12.) + +The labor force participation rate, at 62.6 percent, continued to trend up in March. +The employment-population ratio edged up over the month to 60.4 percent. These +measures remain below their pre-pandemic February 2020 levels (63.3 percent and 61.1 +percent, respectively). (See table A-1.) + +The number of persons employed part time for economic reasons was essentially +unchanged at 4.1 million in March. These individuals, who would have preferred full- +time employment, were working part time because their hours had been reduced or +they were unable to find full-time jobs. (See table A-8.) + +The number of persons not in the labor force who currently want a job was little +changed at 4.9 million in March and has returned to its February 2020 level. These +individuals were not counted as unemployed because they were not actively looking +for work during the 4 weeks preceding the survey or were unavailable to take a job. +(See table A-1.) + +Among those not in the labor force who wanted a job, the number of persons marginally +attached to the labor force was little changed at 1.3 million in March. These +individuals wanted and were available for work and had looked for a job sometime +in the prior 12 months but had not looked for work in the 4 weeks preceding the +survey. The number of discouraged workers, a subset of the marginally attached who +believed that no jobs were available for them, also was little changed over the month +at 351,000. (See Summary table A.) + +## Establishment Survey Data + +Total nonfarm payroll employment increased by 236,000 in March, compared with the +average monthly gain of 334,000 over the prior 6 months. In March, employment +continued to trend up in leisure and hospitality, government, professional and +business services, and health care. (See table B-1.) + +Leisure and hospitality added 72,000 jobs in March, lower than the average monthly +gain of 95,000 over the prior 6 months. Most of the job growth occurred in food +services and drinking places, where employment rose by 50,000 in March. Employment +in leisure and hospitality is below its pre-pandemic February 2020 level by 368,000, +or 2.2 percent. + +Government employment increased by 47,000 in March, the same as the average monthly +gain over the prior 6 months. Overall, employment in government is below its February +2020 level by 314,000, or 1.4 percent. + +Employment in professional and business services continued to trend up in March +(+39,000), in line with the average monthly growth over the prior 6 months (+34,000). +Within the industry, employment in professional, scientific, and technical services +continued its upward trend in March (+26,000). + +Over the month, health care added 34,000 jobs, lower than the average monthly gain +of 54,000 over the prior 6 months. In March, job growth occurred in home health +care services (+15,000) and hospitals (+11,000). Employment continued to trend up +in nursing and residential care facilities (+8,000). + +Employment in social assistance continued to trend up in March (+17,000), in line +with the average monthly growth over the prior 6 months (+22,000). + +In March, employment in transportation and warehousing changed little (+10,000). +Couriers and messengers (+7,000) and air transportation (+6,000) added jobs, while +warehousing and storage lost jobs (-12,000). Employment in transportation and +warehousing has shown little net change in recent months. + +Employment in retail trade changed little in March (-15,000). Job losses in building +material and garden equipment and supplies dealers (-9,000) and in furniture, home +furnishings, electronics, and appliance retailers (-9,000) were partially offset +by a job gain in department stores (+15,000). Retail trade employment is little +changed on net over the year. + +Employment showed little change over the month in other major industries, including +mining, quarrying, and oil and gas extraction; construction; manufacturing; wholesale +trade; information; financial activities; and other services. + +In March, average hourly earnings for all employees on private nonfarm payrolls +rose by 9 cents, or 0.3 percent, to $33.18. Over the past 12 months, average hourly +earnings have increased by 4.2 percent. In March, average hourly earnings of +private-sector production and nonsupervisory employees rose by 9 cents, or 0.3 +percent, to $28.50. (See tables B-3 and B-8.) + +The average workweek for all employees on private nonfarm payrolls edged down by +0.1 hour to 34.4 hours in March. In manufacturing, the average workweek was unchanged +at 40.3 hours, and overtime remained at 3.0 hours. The average workweek for production +and nonsupervisory employees on private nonfarm payrolls was unchanged at 33.9 hours. +(See tables B-2 and B-7.) + +The change in total nonfarm payroll employment for January was revised down by +32,000, from +504,000 to +472,000, and the change for February was revised up by +15,000, from +311,000 to +326,000. With these revisions, employment in January and +February combined is 17,000 lower than previously reported. (Monthly revisions result +from additional reports received from businesses and government agencies since the +last published estimates and from the recalculation of seasonal factors.) + +_____________ +The Employment Situation for April is scheduled to be released on Friday, +May 5, 2023, at 8:30 a.m. (ET). diff --git a/examples/configs/rag/custom_rag_streaming/rails/output.co b/examples/configs/rag/custom_rag_streaming/rails/output.co new file mode 100644 index 000000000..ca6ceb583 --- /dev/null +++ b/examples/configs/rag/custom_rag_streaming/rails/output.co @@ -0,0 +1,11 @@ +define user ask about report + "What was last month's unemployment rate?" + "Which industry added the most jobs?" + "How many jobs were added in the transportation industry?" + +define flow answer report question + user ... + $answer = execute rag + bot $answer + $disclaimer = execute disclaimer + bot $disclaimer diff --git a/nemoguardrails/streaming.py b/nemoguardrails/streaming.py index ec1d1861c..a31d2e4e7 100644 --- a/nemoguardrails/streaming.py +++ b/nemoguardrails/streaming.py @@ -143,6 +143,12 @@ async def _process(self, chunk: str): If we're in buffering mode, we just record it. If we need to pipe it to another streaming handler, we do that. """ + # Don't process the chunk if it equals the entire sentence that has been streamed so far. + # This occurs when the streaming handler is used in actions that return the full completion; Which is useful in + # some cases of downstream processing. + if chunk == self.completion: + return + if self.enable_buffer: self.buffer += chunk