Compare commits

...

2 Commits

Author SHA1 Message Date
446a33c3a4 Update script to handle streaming properly.
If a response was set to streaming, the sub agent will return unusable data.
2025-03-26 02:34:14 +00:00
7fd6363421 Update to add Event Emitter calls and updated README.md 2025-03-24 05:37:35 -07:00
2 changed files with 62 additions and 10 deletions

View File

@@ -2,11 +2,9 @@
A powerful pipe function for Open WebUI that enables collaboration between multiple AI models as agents.
[TOC]
## Overview
OWUI-Multi-Agent-Processing (v0.5.3) allows you to leverage the strengths of different language models by having them work together on the same prompt. The system functions as a pipeline where each agent processes the input sequentially, and a final operator model synthesizes all the agent responses.
OWUI-Multi-Agent-Processing (v0.5.6) allows you to leverage the strengths of different language models by having them work together on the same prompt. The system functions as a pipeline where each agent processes the input sequentially, and a final operator model synthesizes all the agent responses.
## Features

View File

@@ -1,16 +1,41 @@
"""
title: Multi Agent Collaboration System for Open WebUI
Description: Allows for Multiple Models to act as Agents in collaboration
version: 0.5.3
version: 0.7.6
"""
from pydantic import BaseModel, Field
from fastapi import Request
from typing import Optional
from typing import Callable, Any
from open_webui.models.users import Users
from open_webui.utils.chat import generate_chat_completion
class EventEmitter:
def __init__(self, event_emitter: Callable[[dict], Any] = None):
self.event_emitter = event_emitter
async def emit(self, description="Unknown state", status="in_progress", done=False):
"""
Send a status event to the event emitter.
:param description: Event description
:param status: Event status
:param done: Whether the event is complete
"""
if self.event_emitter:
await self.event_emitter(
{
"type": "status",
"data": {
"status": status,
"description": description,
"done": done,
},
}
)
class Pipe:
class UserValves(BaseModel):
@@ -26,19 +51,47 @@ class Pipe:
def __init__(self):
pass
async def pipe(self, body: dict, __user__: dict, __request__: Request) -> str:
async def pipe(
self,
body: dict,
__user__: dict,
__request__: Request,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
:param __user__: User dictionary containing the user ID
:param __event_emitter__: Optional event emitter for tracking status"
:param body: body dictionary containing the prompt and messages and other settings sent to the LLM(s)
:param __request__: User Request object
:return: The final response from the operator model after processing through all agents
"""
# Initialize Event Emitter
emitter = EventEmitter(__event_emitter__)
# Use the unified endpoint with the updated signature
user = Users.get_user_by_id(__user__["id"])
agents = __user__["valves"].agent_list
operator_model = __user__["valves"].operator_model
number_of_agents = len(agents)
if "### Task:" in body["messages"][0]["content"]:
body["model"] = operator_model
print("Internal Request")
return await generate_chat_completion(__request__, body, user)
# Capture Stream Setting
original_stream = body["stream"]
if number_of_agents > 0:
# Process through each agent in the list
for agent_model in agents:
body["stream"] = False
# Temporarily change the model to the agent model
body["model"] = agent_model
print(f"Model being use: {agent_model}")
message = f"Processing request through agent model {agent_model}"
await emitter.emit(
description=message, status="agent_processing", done=True
)
response = await generate_chat_completion(__request__, body, user)
content = response["choices"][0]["message"]["content"]
print(f"This is the content from {agent_model}: {content}")
# Add Agent response as context
body["messages"].append(
{
@@ -46,8 +99,9 @@ class Pipe:
"content": f"{response} \n (Provided by Agent: {agent_model})",
}
)
# set Operator for final processing
body["model"] = operator_model
body["stream"] = original_stream
print(f"Model being use: {operator_model}")
#print(f"Body Response: {body['messages']}")
message = f"Final Response from {operator_model}"
await emitter.emit(description=message, status="final_processing", done=True)
return await generate_chat_completion(__request__, body, user)