Some checks are pending
CodeQL / Analyze (csharp) (push) Waiting to run
CodeQL / Analyze (python) (push) Waiting to run
dotnet-build-and-test / paths-filter (push) Waiting to run
dotnet-build-and-test / dotnet-build-and-test (Debug, windows-latest, net9.0) (push) Blocked by required conditions
dotnet-build-and-test / dotnet-build-and-test (Release, integration, true, ubuntu-latest, net10.0) (push) Blocked by required conditions
dotnet-build-and-test / dotnet-build-and-test (Release, integration, true, windows-latest, net472) (push) Blocked by required conditions
dotnet-build-and-test / dotnet-build-and-test (Release, ubuntu-latest, net8.0) (push) Blocked by required conditions
dotnet-build-and-test / dotnet-build-and-test-check (push) Blocked by required conditions
70 lines
1.8 KiB
Python
70 lines
1.8 KiB
Python
# Copyright (c) Microsoft. All rights reserved.
|
|
|
|
import asyncio
|
|
from random import randint
|
|
from typing import Annotated
|
|
|
|
from agent_framework.anthropic import AnthropicClient
|
|
|
|
"""
|
|
Anthropic Chat Agent Example
|
|
|
|
This sample demonstrates using Anthropic with an agent and a single custom tool.
|
|
"""
|
|
|
|
|
|
def get_weather(
|
|
location: Annotated[str, "The location to get the weather for."],
|
|
) -> str:
|
|
"""Get the weather for a given location."""
|
|
conditions = ["sunny", "cloudy", "rainy", "stormy"]
|
|
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
|
|
|
|
|
|
async def non_streaming_example() -> None:
|
|
"""Example of non-streaming response (get the complete result at once)."""
|
|
print("=== Non-streaming Response Example ===")
|
|
|
|
agent = AnthropicClient(
|
|
).as_agent(
|
|
name="WeatherAgent",
|
|
instructions="You are a helpful weather agent.",
|
|
tools=get_weather,
|
|
)
|
|
|
|
query = "What's the weather like in Seattle?"
|
|
print(f"User: {query}")
|
|
result = await agent.run(query)
|
|
print(f"Result: {result}\n")
|
|
|
|
|
|
async def streaming_example() -> None:
|
|
"""Example of streaming response (get results as they are generated)."""
|
|
print("=== Streaming Response Example ===")
|
|
|
|
agent = AnthropicClient(
|
|
).as_agent(
|
|
name="WeatherAgent",
|
|
instructions="You are a helpful weather agent.",
|
|
tools=get_weather,
|
|
)
|
|
|
|
query = "What's the weather like in Portland and in Paris?"
|
|
print(f"User: {query}")
|
|
print("Agent: ", end="", flush=True)
|
|
async for chunk in agent.run_stream(query):
|
|
if chunk.text:
|
|
print(chunk.text, end="", flush=True)
|
|
print("\n")
|
|
|
|
|
|
async def main() -> None:
|
|
print("=== Anthropic Example ===")
|
|
|
|
await streaming_example()
|
|
await non_streaming_example()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|