Run Pipe Agent with Tools and Streaming

This example demonstrates how to run a pipe agent with tools to use function calling with streaming responses.


Run Pipe Agent with Tools and Streaming Example

Run Pipe Agent with Tools and Streaming Example

import 'dotenv/config';
import { Langbase, Message, Tools, getRunner, getToolsFromRunStream, getTextPart, ChunkStream } from 'langbase';

const langbase = new Langbase({
	apiKey: process.env.LANGBASE_API_KEY!,
});

async function main() {
    await createSummaryAgent();

	// Call summary agent pipe
	const response = await langbase.pipes.run({
        stream: true,
		name: 'summary-agent',
		messages: [
			{
				role: 'user',
				content: "What's the weather in SF ?"
			},
		],
		tools: [weatherToolSchema],
	});

	const [streamForResponse, streamForToolCall] = response.stream.tee();

	const toolCalls = await getToolsFromRunStream(streamForToolCall);
	const hasToolCalls = toolCalls.length > 0;
	const threadId = response.threadId

	if (hasToolCalls) {
        // Process each tool call
        const toolResultPromises = toolCalls.map(async (toolCall): Promise<Message> => {
			const toolName = toolCall.function.name;
			const toolParameters = JSON.parse(toolCall.function.arguments);
			const toolFunction = tools[toolName as keyof typeof tools];

			// Call the tool function with the parameters
			const toolResponse = await toolFunction(toolParameters);

			// Return the tool result
			return {
				role: 'tool',
				name: toolName,
				content: toolResponse,
				tool_call_id: toolCall.id,
			};
		});

		// Wait for all tool calls to complete
		const toolResults = await Promise.all(toolResultPromises);
		
        // Call the agent pipe again with the updated messages
        const finalResponse = await langbase.pipes.run({
            stream: true,
            name: 'summary-agent',
            threadId: threadId!,
            messages: toolResults,
            tools: [weatherToolSchema],
        });

        const runner = await getRunner(finalResponse.stream);
        for await (const chunk of runner) {
            const textContent = getTextPart(chunk as ChunkStream);
			process.stdout.write(textContent);
        }
        console.log("\n");
	}
	else {
		console.log("Direct response (no tools called):");
		const runner = await getRunner(streamForResponse);
		for await (const chunk of runner) {
			const textContent = getTextPart(chunk as ChunkStream);
			process.stdout.write(textContent);
		}
	}
}

// Mock implementation of the weather function
async function getCurrentWeather(args: { location: string }) {
	return 'Sunny, 75°F';
}

// Weather tool schema
const weatherToolSchema: Tools = {
	type: 'function',
	function: {
		name: 'getCurrentWeather',
		description: 'Get the current weather of a given location',
		parameters: {
			type: 'object',
			required: ['location'],
			properties: {
				unit: {
					enum: ['celsius', 'fahrenheit'],
					type: 'string',
				},
				location: {
					type: 'string',
					description: 'The city and state, e.g. San Francisco, CA',
				},
			},
		},
	},
};

// Object to hold all tools
const tools = {
	getCurrentWeather
};

/**
 * Creates a summary agent pipe if it doesn't already exist.
 *
 * This function checks if a pipe with the name 'summary-agent' exists in the system.
 * If the pipe doesn't exist, it creates a new private pipe with a system message
 * configuring it as a helpful assistant.
 *
 * @async
 * @returns {Promise<void>} A promise that resolves when the operation is complete
 * @throws {Error} Logs any errors encountered during the creation process
 */
async function createSummaryAgent() {
	try {
		await langbase.pipes.create({
		    name: 'summary-agent',
			upsert: true,
			status: 'private',
			messages: [
				{
					role: 'system',
					content:'You are a helpful assistant that can answer questions and help with tasks in json format',
				}
			]
		});
	} catch (error) {
		console.error('Error creating summary agent:', error);
	}
}

main();