2025-03-12 07:46:45 -07:00
"""
title : Claude Sonnet 3.7 Reasoning for Bedrock
author : Josh Knapp
date : 2025 - 03 - 10
license : MIT
description : A pipeline to connect to Amazon Bedrock ' s Claude 3.7 Sonnet model for text generation and reasoning tasks
requirements : requests , boto3
"""
import base64
import json
import logging
from io import BytesIO
from typing import List , Union , Generator , Iterator , Dict , Optional , Tuple , Any , Union
import boto3
from pydantic import BaseModel
import os
import requests
from utils . pipelines . main import pop_system_message
REASONING_EFFORT_BUDGET_TOKEN_MAP = {
" none " : None ,
" low " : 1024 ,
" medium " : 4096 ,
" high " : 16384 ,
" max " : 32768 ,
}
# Maximum combined token limit for Claude 3.7
MAX_COMBINED_TOKENS = 64000
class Pipeline :
class Valves ( BaseModel ) :
USE_AWS_CREDS : bool = False
AWS_ACCESS_KEY : str = " "
AWS_SECRET_KEY : str = " "
AWS_REGION_NAME : str = " us-east-1 "
MODEL_ID : str = " us.anthropic.claude-3-7-sonnet-20250219-v1:0 "
def __init__ ( self ) :
self . type = " manifold "
# Optionally, you can set the id and name of the pipeline.
# Best practice is to not specify the id so that it can be automatically inferred from the filename, so that users can install multiple versions of the same pipeline.
# The identifier must be unique across all pipelines.
# The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
# self.id = "openai_pipeline"
self . name = " Bedrock: "
self . valves = self . Valves (
* * {
" USE_AWS_CREDS " : os . getenv ( " USE_AWS_CREDS " , " false " ) . lower ( ) == " true " ,
" AWS_ACCESS_KEY " : os . getenv ( " AWS_ACCESS_KEY " , " your-aws-access-key-here " ) ,
" AWS_SECRET_KEY " : os . getenv ( " AWS_SECRET_KEY " , " your-aws-secret-key-here " ) ,
" AWS_REGION_NAME " : os . getenv ( " AWS_REGION_NAME " , " your-aws-region-name-here " ) ,
" MODEL_ID " : os . getenv ( " MODEL_ID " , " us.anthropic.claude-3-7-sonnet-20250219-v1:0 " )
}
)
if ( self . valves . USE_AWS_CREDS is True ) :
self . bedrock = boto3 . client ( aws_access_key_id = self . valves . AWS_ACCESS_KEY ,
aws_secret_access_key = self . valves . AWS_SECRET_KEY ,
service_name = " bedrock " ,
region_name = self . valves . AWS_REGION_NAME )
self . bedrock_runtime = boto3 . client ( aws_access_key_id = self . valves . AWS_ACCESS_KEY ,
aws_secret_access_key = self . valves . AWS_SECRET_KEY ,
service_name = " bedrock-runtime " ,
region_name = self . valves . AWS_REGION_NAME )
else :
self . bedrock = boto3 . client ( service_name = " bedrock " ,
region_name = self . valves . AWS_REGION_NAME )
self . bedrock_runtime = boto3 . client ( service_name = " bedrock-runtime " ,
region_name = self . valves . AWS_REGION_NAME )
def get_models ( self ) :
return [
{ " id " : self . valves . MODEL_ID , " name " : f " { self . valves . MODEL_ID } -Reasoning " }
]
async def on_startup ( self ) :
# This function is called when the server is started.
print ( f " on_startup: { __name__ } " )
pass
async def on_shutdown ( self ) :
# This function is called when the server is stopped.
print ( f " on_shutdown: { __name__ } " )
pass
async def on_valves_updated ( self ) :
# This function is called when the valves are updated.
print ( f " on_valves_updated: { __name__ } " )
if ( self . valves . USE_AWS_CREDS is True ) :
self . bedrock = boto3 . client ( aws_access_key_id = self . valves . AWS_ACCESS_KEY ,
aws_secret_access_key = self . valves . AWS_SECRET_KEY ,
service_name = " bedrock " ,
region_name = self . valves . AWS_REGION_NAME )
self . bedrock_runtime = boto3 . client ( aws_access_key_id = self . valves . AWS_ACCESS_KEY ,
aws_secret_access_key = self . valves . AWS_SECRET_KEY ,
service_name = " bedrock-runtime " ,
region_name = self . valves . AWS_REGION_NAME )
else :
self . bedrock = boto3 . client ( service_name = " bedrock " ,
region_name = self . valves . AWS_REGION_NAME )
self . bedrock_runtime = boto3 . client ( service_name = " bedrock-runtime " ,
region_name = self . valves . AWS_REGION_NAME )
def pipelines ( self ) - > List [ dict ] :
return self . get_models ( )
def pipe (
self , user_message : str , model_id : str , messages : List [ dict ] , body : dict
) - > Union [ str , Generator , Iterator ] :
# This is where you can add your custom pipelines like RAG.
print ( f " pipe: { __name__ } " )
system_message , messages = pop_system_message ( messages )
logging . info ( f " pop_system_message: { json . dumps ( messages ) } " )
try :
processed_messages = [ ]
image_count = 0
for message in messages :
processed_content = [ ]
if isinstance ( message . get ( " content " ) , list ) :
for item in message [ " content " ] :
if item [ " type " ] == " text " :
processed_content . append ( { " text " : item [ " text " ] } )
elif item [ " type " ] == " image_url " :
if image_count > = 20 :
raise ValueError ( " Maximum of 20 images per API call exceeded " )
processed_image = self . process_image ( item [ " image_url " ] )
processed_content . append ( processed_image )
image_count + = 1
else :
processed_content = [ { " text " : message . get ( " content " , " " ) } ]
processed_messages . append ( { " role " : message [ " role " ] , " content " : processed_content } )
2025-03-16 21:59:44 -07:00
# Set budget tokens for reasoning
reasoning_effort = body . get ( " reasoning_effort " , " medium " )
budget_tokens = REASONING_EFFORT_BUDGET_TOKEN_MAP . get ( reasoning_effort )
# Allow users to input an integer value representing budget tokens
if (
not budget_tokens
and reasoning_effort not in REASONING_EFFORT_BUDGET_TOKEN_MAP . keys ( )
) :
try :
budget_tokens = int ( reasoning_effort )
except ValueError as e :
print ( " Failed to convert reasoning effort to int " , e )
budget_tokens = 4096
# Do not use thinking if budget_tokens is set to None
if budget_tokens is not None :
reasoning_config = {
" thinking " : {
" type " : " enabled " ,
" budget_tokens " : budget_tokens
}
2025-03-16 12:02:57 -07:00
}
2025-03-16 21:59:44 -07:00
else :
reasoning_config = { }
2025-03-16 22:50:26 -07:00
# If budget_tokens is greater than max_tokens, adjust max_tokens to MAX_COMBINED_TOKENS
max_tokens = body . get ( " max_tokens " , MAX_COMBINED_TOKENS )
if max_tokens < budget_tokens and budget_tokens > 0 :
max_tokens = MAX_COMBINED_TOKENS
2025-03-16 12:02:57 -07:00
payload = { " modelId " : model_id ,
" messages " : processed_messages ,
2025-03-16 22:24:29 -07:00
" system " : [ { ' text ' : system_message [ ' content ' ] if system_message else ' you are an intelligent ai assistant ' } ] ,
2025-03-16 22:50:26 -07:00
" inferenceConfig " : { " temperature " : 1 , " maxTokens " : max_tokens } ,
2025-03-16 12:02:57 -07:00
" additionalModelRequestFields " : reasoning_config
}
2025-03-16 18:10:57 -07:00
if body . get ( " stream " , False ) :
return self . stream_response ( model_id , payload )
else :
return self . get_completion ( model_id , payload )
2025-03-12 07:46:45 -07:00
except Exception as e :
return f " Error: { e } "
def process_image ( self , image : str ) :
img_stream = None
if image [ " url " ] . startswith ( " data:image " ) :
if ' , ' in image [ " url " ] :
base64_string = image [ " url " ] . split ( ' , ' ) [ 1 ]
image_data = base64 . b64decode ( base64_string )
img_stream = BytesIO ( image_data )
else :
img_stream = requests . get ( image [ " url " ] ) . content
return {
" image " : { " format " : " png " if image [ " url " ] . endswith ( " .png " ) else " jpeg " ,
" source " : { " bytes " : img_stream . read ( ) } }
}
def stream_response ( self , model_id : str , payload : dict ) - > Generator :
2025-03-16 12:02:57 -07:00
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/bedrock-runtime/client/converse_stream.html
2025-03-12 07:46:45 -07:00
streaming_response = self . bedrock_runtime . converse_stream ( * * payload )
2025-03-16 20:58:40 -07:00
thinking_block = None
2025-03-12 07:46:45 -07:00
for chunk in streaming_response [ " stream " ] :
2025-03-16 21:24:11 -07:00
if " contentBlockStop " in chunk and chunk [ " contentBlockStop " ] [ " contentBlockIndex " ] == thinking_block :
print ( " Thinking End " )
yield ' </thinking> \n \n '
2025-03-12 07:46:45 -07:00
if " contentBlockDelta " in chunk :
2025-03-16 18:10:57 -07:00
delta = chunk [ " contentBlockDelta " ] [ " delta " ]
# Handle reasoning content (Chain of Thought)
if " reasoningContent " in delta and " text " in delta [ " reasoningContent " ] :
2025-03-16 20:58:40 -07:00
if thinking_block is None :
2025-03-16 20:48:18 -07:00
thinking_block = chunk [ " contentBlockDelta " ] [ " contentBlockIndex " ]
2025-03-16 20:58:40 -07:00
yield ' <thinking> \n '
2025-03-16 19:34:19 -07:00
yield delta [ " reasoningContent " ] [ " text " ]
2025-03-16 20:12:48 -07:00
2025-03-16 18:10:57 -07:00
# Handle regular response text
if " text " in delta :
2025-03-16 19:34:19 -07:00
yield delta [ " text " ]
2025-03-12 07:46:45 -07:00
def get_completion ( self , model_id : str , payload : dict ) - > str :
2025-03-16 12:02:57 -07:00
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/bedrock-runtime/client/converse.html
2025-03-12 07:46:45 -07:00
response = self . bedrock_runtime . converse ( * * payload )
content_blocks = response [ " output " ] [ " message " ] [ " content " ]
reasoning = None
text = None
# Process each content block to find reasoning and response text
for block in content_blocks :
if " reasoningContent " in block :
reasoning = block [ " reasoningContent " ] [ " reasoningText " ] [ " text " ]
if " text " in block :
text = block [ " text " ]
2025-03-16 17:54:47 -07:00
combined_text = f ' <details type= " reasoning " done= " true " > \n <summary>Thinking…</summary> \n { reasoning } \n </details> \n \n { text } '
2025-03-16 12:21:29 -07:00
return combined_text