Run Pipe Agent with Tools and Streaming
This example demonstrates how to run a pipe agent with tools to use function calling with streaming responses.
Run Pipe Agent with Tools and Streaming Example
Run Pipe Agent with Tools and Streaming Example
import 'dotenv/config';
import { Langbase, Message, Tools, getRunner, getToolsFromRunStream, getTextPart, ChunkStream } from 'langbase';
const langbase = new Langbase({
apiKey: process.env.LANGBASE_API_KEY!,
});
async function main() {
await createSummaryAgent();
// Call summary agent pipe
const response = await langbase.pipes.run({
stream: true,
name: 'summary-agent',
messages: [
{
role: 'user',
content: "What's the weather in SF ?"
},
],
tools: [weatherToolSchema],
});
const [streamForResponse, streamForToolCall] = response.stream.tee();
const toolCalls = await getToolsFromRunStream(streamForToolCall);
const hasToolCalls = toolCalls.length > 0;
const threadId = response.threadId
if (hasToolCalls) {
// Process each tool call
const toolResultPromises = toolCalls.map(async (toolCall): Promise<Message> => {
const toolName = toolCall.function.name;
const toolParameters = JSON.parse(toolCall.function.arguments);
const toolFunction = tools[toolName as keyof typeof tools];
// Call the tool function with the parameters
const toolResponse = await toolFunction(toolParameters);
// Return the tool result
return {
role: 'tool',
name: toolName,
content: toolResponse,
tool_call_id: toolCall.id,
};
});
// Wait for all tool calls to complete
const toolResults = await Promise.all(toolResultPromises);
// Call the agent pipe again with the updated messages
const finalResponse = await langbase.pipes.run({
stream: true,
name: 'summary-agent',
threadId: threadId!,
messages: toolResults,
tools: [weatherToolSchema],
});
const runner = await getRunner(finalResponse.stream);
for await (const chunk of runner) {
const textContent = getTextPart(chunk as ChunkStream);
process.stdout.write(textContent);
}
console.log("\n");
}
else {
console.log("Direct response (no tools called):");
const runner = await getRunner(streamForResponse);
for await (const chunk of runner) {
const textContent = getTextPart(chunk as ChunkStream);
process.stdout.write(textContent);
}
}
}
// Mock implementation of the weather function
async function getCurrentWeather(args: { location: string }) {
return 'Sunny, 75°F';
}
// Weather tool schema
const weatherToolSchema: Tools = {
type: 'function',
function: {
name: 'getCurrentWeather',
description: 'Get the current weather of a given location',
parameters: {
type: 'object',
required: ['location'],
properties: {
unit: {
enum: ['celsius', 'fahrenheit'],
type: 'string',
},
location: {
type: 'string',
description: 'The city and state, e.g. San Francisco, CA',
},
},
},
},
};
// Object to hold all tools
const tools = {
getCurrentWeather
};
/**
* Creates a summary agent pipe if it doesn't already exist.
*
* This function checks if a pipe with the name 'summary-agent' exists in the system.
* If the pipe doesn't exist, it creates a new private pipe with a system message
* configuring it as a helpful assistant.
*
* @async
* @returns {Promise<void>} A promise that resolves when the operation is complete
* @throws {Error} Logs any errors encountered during the creation process
*/
async function createSummaryAgent() {
try {
await langbase.pipes.create({
name: 'summary-agent',
upsert: true,
status: 'private',
messages: [
{
role: 'system',
content:'You are a helpful assistant that can answer questions and help with tasks in json format',
}
]
});
} catch (error) {
console.error('Error creating summary agent:', error);
}
}
main();