Skip to content

Commit

Permalink
feat(actions): enable streaming in custom actions
Browse files Browse the repository at this point in the history
  • Loading branch information
niels-garve committed Sep 7, 2024
1 parent c468d64 commit 0156df1
Show file tree
Hide file tree
Showing 6 changed files with 304 additions and 5 deletions.
68 changes: 68 additions & 0 deletions examples/configs/rag/custom_rag_streaming/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Custom RAG with streaming

It is possible to use streaming in custom actions, such as RAG.

This is because the streaming handler is defined and available as a context variable.

```python
import contextvars
streaming_handler_var = contextvars.ContextVar("streaming_handler", default=None)
```

But let's first clarify the folder structure of this example:

- `kb/` - A folder containing our knowledge base to retrieve context from and fact-check against.
This folder includes the March 2023 US Jobs report in `kb/report.md`.
- `rails/output.co` - A colang file that contains a flow that routes all user messages into our
custom RAG.
- `config.py` - The config file containing the custom RAG action, the disclaimer action, and the init function that gets
called as part of the initialization of the LLMRails instance.
- `config.yml` - The config file holding all the configuration options.

The following code samples demonstrate the core of this example in action:

```colang
# output.co
define flow answer report question
user ...
$answer = execute rag()
bot $answer
$disclaimer = execute disclaimer()
bot $disclaimer
```

```python
# config.py

async def rag(context: dict, llm: BaseLLM, kb: KnowledgeBase) -> ActionResult:

# ...

chain = prompt_template | llm | output_parser
# 💡 Enable streaming
config = RunnableConfig(callbacks=[streaming_handler_var.get()])
answer = await chain.ainvoke(input_variables, config)

return ActionResult(return_value=answer, context_updates=context_updates)
```

Here's what's happening, step by step:

1. We define a custom RAG chain using LangChain's LCEL, but it could be any library. For example, you could call the
`openai` library directly.
2. We then define a `RunnableConfig` with the streaming handler as a callback.
3. We then invoke the chain with the config, which will trigger the streaming handler to be called.
4. Finally, we return the final answer as `ActionResult` which enables downstream processing. In this example, we define
a `disclaimer` action that just prints a sentence; it could also access the final answer or other context
variables we define as `context_updates`.

_Note: For simplicity, we re-use the LLM instance configured in [config.yml](./config.yml) as well as the
built-in retrieval via the knowledge base._

## Run the example

```shell
$ export OPENAI_API_KEY='sk-xxx'
$ python -m nemoguardrails.__main__ chat --config /<path_to>/examples/configs/rag/custom_rag_streaming --streaming
```
74 changes: 74 additions & 0 deletions examples/configs/rag/custom_rag_streaming/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from langchain.llms.base import BaseLLM
from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableConfig

from nemoguardrails import LLMRails
from nemoguardrails.actions.actions import ActionResult
from nemoguardrails.context import streaming_handler_var
from nemoguardrails.kb.kb import KnowledgeBase

TEMPLATE = """Use the following pieces of context to answer the question at the end.
If you don't know the answer, just say that you don't know, don't try to make up an answer.
Use three sentences maximum and keep the answer as concise as possible.
Always say "thanks for asking!" at the end of the answer.
{context}
Question: {question}
Helpful Answer:"""


async def rag(context: dict, llm: BaseLLM, kb: KnowledgeBase) -> ActionResult:
user_message = context.get("last_user_message")
context_updates = {}

# For our custom RAG, we re-use the built-in retrieval
chunks = await kb.search_relevant_chunks(user_message)
relevant_chunks = "\n".join([chunk["body"] for chunk in chunks])
# Store the chunks for downstream use
context_updates["relevant_chunks"] = relevant_chunks

# Use a custom prompt template
prompt_template = PromptTemplate.from_template(TEMPLATE)
input_variables = {"question": user_message, "context": relevant_chunks}
# Store the template for downstream use
context_updates["_last_bot_prompt"] = prompt_template.format(**input_variables)

print(f"💬 RAG :: prompt_template: {context_updates['_last_bot_prompt']}")

# Put together a simple LangChain chain
output_parser = StrOutputParser()
chain = prompt_template | llm | output_parser
# 💡 Enable streaming
config = RunnableConfig(callbacks=[streaming_handler_var.get()])
answer = await chain.ainvoke(input_variables, config)

return ActionResult(return_value=answer, context_updates=context_updates)


async def disclaimer() -> ActionResult:
return ActionResult(
return_value="I learn something new every day, so my answers may not always be perfect."
)


def init(app: LLMRails):
app.register_action(rag, "rag")
app.register_action(disclaimer, "disclaimer")
6 changes: 6 additions & 0 deletions examples/configs/rag/custom_rag_streaming/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
streaming: True

models:
- type: main
engine: openai
model: gpt-3.5-turbo
135 changes: 135 additions & 0 deletions examples/configs/rag/custom_rag_streaming/kb/report.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# Jobs Report - March 2023

Technical information:
Household data: (202) 691-6378 * [email protected] * www.bls.gov/cps
Establishment data: (202) 691-6555 * [email protected] * www.bls.gov/ces

Media contact: (202) 691-5902 * [email protected]


THE EMPLOYMENT SITUATION -- MARCH 2023


Total nonfarm payroll employment rose by 236,000 in March, and the unemployment rate
changed little at 3.5 percent, the U.S. Bureau of Labor Statistics reported today.
Employment continued to trend up in leisure and hospitality, government, professional
and business services, and health care.

This news release presents statistics from two monthly surveys. The household survey
measures labor force status, including unemployment, by demographic characteristics.
The establishment survey measures nonfarm employment, hours, and earnings by industry.
For more information about the concepts and statistical methodology used in these two
surveys, see the Technical Note.

## Household Survey Data

Both the unemployment rate, at 3.5 percent, and the number of unemployed persons, at
5.8 million, changed little in March. These measures have shown little net movement
since early 2022. (See table A-1.)

Among the major worker groups, the unemployment rate for Hispanics decreased to 4.6
percent in March, essentially offsetting an increase in the prior month. The
unemployment rates for adult men (3.4 percent), adult women (3.1 percent), teenagers
(9.8 percent), Whites (3.2 percent), Blacks (5.0 percent), and Asians (2.8 percent)
showed little or no change over the month. (See tables A-1, A-2, and A-3.)

Among the unemployed, the number of permanent job losers increased by 172,000 to 1.6
million in March, and the number of reentrants to the labor force declined by 182,000
to 1.7 million. (Reentrants are persons who previously worked but were not in the
labor force prior to beginning their job search.) (See table A-11.)

The number of long-term unemployed (those jobless for 27 weeks or more) was little
changed at 1.1 million in March. These individuals accounted for 18.9 percent of all
unemployed persons. (See table A-12.)

The labor force participation rate, at 62.6 percent, continued to trend up in March.
The employment-population ratio edged up over the month to 60.4 percent. These
measures remain below their pre-pandemic February 2020 levels (63.3 percent and 61.1
percent, respectively). (See table A-1.)

The number of persons employed part time for economic reasons was essentially
unchanged at 4.1 million in March. These individuals, who would have preferred full-
time employment, were working part time because their hours had been reduced or
they were unable to find full-time jobs. (See table A-8.)

The number of persons not in the labor force who currently want a job was little
changed at 4.9 million in March and has returned to its February 2020 level. These
individuals were not counted as unemployed because they were not actively looking
for work during the 4 weeks preceding the survey or were unavailable to take a job.
(See table A-1.)

Among those not in the labor force who wanted a job, the number of persons marginally
attached to the labor force was little changed at 1.3 million in March. These
individuals wanted and were available for work and had looked for a job sometime
in the prior 12 months but had not looked for work in the 4 weeks preceding the
survey. The number of discouraged workers, a subset of the marginally attached who
believed that no jobs were available for them, also was little changed over the month
at 351,000. (See Summary table A.)

## Establishment Survey Data

Total nonfarm payroll employment increased by 236,000 in March, compared with the
average monthly gain of 334,000 over the prior 6 months. In March, employment
continued to trend up in leisure and hospitality, government, professional and
business services, and health care. (See table B-1.)

Leisure and hospitality added 72,000 jobs in March, lower than the average monthly
gain of 95,000 over the prior 6 months. Most of the job growth occurred in food
services and drinking places, where employment rose by 50,000 in March. Employment
in leisure and hospitality is below its pre-pandemic February 2020 level by 368,000,
or 2.2 percent.

Government employment increased by 47,000 in March, the same as the average monthly
gain over the prior 6 months. Overall, employment in government is below its February
2020 level by 314,000, or 1.4 percent.

Employment in professional and business services continued to trend up in March
(+39,000), in line with the average monthly growth over the prior 6 months (+34,000).
Within the industry, employment in professional, scientific, and technical services
continued its upward trend in March (+26,000).

Over the month, health care added 34,000 jobs, lower than the average monthly gain
of 54,000 over the prior 6 months. In March, job growth occurred in home health
care services (+15,000) and hospitals (+11,000). Employment continued to trend up
in nursing and residential care facilities (+8,000).

Employment in social assistance continued to trend up in March (+17,000), in line
with the average monthly growth over the prior 6 months (+22,000).

In March, employment in transportation and warehousing changed little (+10,000).
Couriers and messengers (+7,000) and air transportation (+6,000) added jobs, while
warehousing and storage lost jobs (-12,000). Employment in transportation and
warehousing has shown little net change in recent months.

Employment in retail trade changed little in March (-15,000). Job losses in building
material and garden equipment and supplies dealers (-9,000) and in furniture, home
furnishings, electronics, and appliance retailers (-9,000) were partially offset
by a job gain in department stores (+15,000). Retail trade employment is little
changed on net over the year.

Employment showed little change over the month in other major industries, including
mining, quarrying, and oil and gas extraction; construction; manufacturing; wholesale
trade; information; financial activities; and other services.

In March, average hourly earnings for all employees on private nonfarm payrolls
rose by 9 cents, or 0.3 percent, to $33.18. Over the past 12 months, average hourly
earnings have increased by 4.2 percent. In March, average hourly earnings of
private-sector production and nonsupervisory employees rose by 9 cents, or 0.3
percent, to $28.50. (See tables B-3 and B-8.)

The average workweek for all employees on private nonfarm payrolls edged down by
0.1 hour to 34.4 hours in March. In manufacturing, the average workweek was unchanged
at 40.3 hours, and overtime remained at 3.0 hours. The average workweek for production
and nonsupervisory employees on private nonfarm payrolls was unchanged at 33.9 hours.
(See tables B-2 and B-7.)

The change in total nonfarm payroll employment for January was revised down by
32,000, from +504,000 to +472,000, and the change for February was revised up by
15,000, from +311,000 to +326,000. With these revisions, employment in January and
February combined is 17,000 lower than previously reported. (Monthly revisions result
from additional reports received from businesses and government agencies since the
last published estimates and from the recalculation of seasonal factors.)

_____________
The Employment Situation for April is scheduled to be released on Friday,
May 5, 2023, at 8:30 a.m. (ET).
11 changes: 11 additions & 0 deletions examples/configs/rag/custom_rag_streaming/rails/output.co
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
define user ask about report
"What was last month's unemployment rate?"
"Which industry added the most jobs?"
"How many jobs were added in the transportation industry?"

define flow answer report question
user ...
$answer = execute rag()
bot $answer
$disclaimer = execute disclaimer()
bot $disclaimer
15 changes: 10 additions & 5 deletions nemoguardrails/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@ async def _process(self, chunk: str):
If we're in buffering mode, we just record it.
If we need to pipe it to another streaming handler, we do that.
"""
# Don't process the chunk if it contains the entire sentence that has been streamed so far.
# This occurs when the streaming handler is used in actions that return the whole LLM response, which is
# necessary to enable downstream processing in custom actions.
if chunk == self.completion:
return

if self.enable_buffer:
self.buffer += chunk

Expand Down Expand Up @@ -291,11 +297,12 @@ async def on_llm_new_token(
**kwargs: Any,
) -> None:
"""Run on new LLM token. Only available when streaming is enabled."""
# If the first token is an empty one, we ignore.
if self.first_token:
self.first_token = False
if token == "":
return

# If the token is an empty one, we ignore.
if token == "":
return

await self.push_chunk(chunk)

Expand All @@ -316,8 +323,6 @@ async def on_llm_end(
await self._process(self.current_chunk)
self.current_chunk = ""

await self._process("")

# We explicitly print a new line here
if self.enable_print:
print("")
Expand Down

0 comments on commit 0156df1

Please sign in to comment.