Run agent with agent_run (Streaming)
agent_run provides a concise and thread-friendly way to run an agent while exposing real-time streaming output via MessageObserver. It is ideal for server-side or frontend event stream rendering, as well as MCP tool integration scenarios.
Quick Start
import json
import asyncio
import logging
from threading import Event
from nexent.core.agents.run_agent import agent_run
from nexent.core.agents.agent_model import (
AgentRunInfo,
AgentConfig,
ModelConfig
)
from nexent.core.utils.observer import MessageObserver
async def main():
# 1) Create message observer (for receiving streaming messages)
observer = MessageObserver(lang="en")
# 2) External stop flag (useful to interrupt from UI)
stop_event = Event()
# 3) Configure model
model_config = ModelConfig(
cite_name="gpt-4", # Model alias (custom, referenced by AgentConfig)
api_key="<YOUR_API_KEY>",
model_name="Qwen/Qwen2.5-32B-Instruct",
url="https://api.siliconflow.cn/v1",
temperature=0.3,
top_p=0.9
)
# 4) Configure Agent
agent_config = AgentConfig(
name="example_agent",
description="An example agent that can execute Python code and search the web",
prompt_templates=None,
tools=[],
max_steps=5,
model_name="gpt-4", # Corresponds to model_config.cite_name
provide_run_summary=False,
managed_agents=[]
)
# 5) Assemble run info
agent_run_info = AgentRunInfo(
query="How many letter r are in strrawberry?", # Example question
model_config_list=[model_config],
observer=observer,
agent_config=agent_config,
mcp_host=None, # Optional: MCP service addresses
history=None, # Optional: chat history
stop_event=stop_event
)
# 6) Run with streaming and consume messages
async for message in agent_run(agent_run_info):
message_data = json.loads(message)
message_type = message_data.get("type", "unknown")
content = message_data.get("content", "")
print(f"[{message_type}] {content}")
# 7) Read final answer (if any)
final_answer = observer.get_final_answer()
if final_answer:
print(f"\nFinal Answer: {final_answer}")
if __name__ == "__main__":
logging.disable(logging.CRITICAL)
asyncio.run(main())Tip: Store sensitive config such as api_key in environment variables or a secrets manager, not in code.
Message Stream Format and Handling
Internally, agent_run executes the agent in a background thread and continuously yields JSON strings from the MessageObserver message buffer. You can parse these fields for categorized display or logging.
- Important fields
type: message type (corresponds toProcessType)content: text contentagent_name: optional, which agent produced this message
Common type values (from ProcessType):
AGENT_NEW_RUN: new task startedSTEP_COUNT: step updatesMODEL_OUTPUT_THINKING/MODEL_OUTPUT_CODE: model thinking/code snippetsPARSE: code parsing resultsEXECUTION_LOGS: Python execution logsFINAL_ANSWER: final answerERROR: error information
Configuration Reference
ModelConfig
cite_name: model alias (referenced byAgentConfig.model_name)api_key: model service API keymodel_name: model invocation nameurl: base URL of the model servicetemperature/top_p: sampling params
AgentConfig
name: agent namedescription: agent descriptionprompt_templates: optional, Jinja template dicttools: tool configuration list (see ToolConfig)max_steps: maximum stepsmodel_name: model alias (corresponds toModelConfig.cite_name)provide_run_summary: whether sub-agents provide run summarymanaged_agents: list of sub-agent configurations
Pass Chat History (optional)
You can pass historical messages via AgentRunInfo.history, and Nexent will write them into internal memory:
from nexent.core.agents.agent_model import AgentHistory
history = [
AgentHistory(role="user", content="Hi"),
AgentHistory(role="assistant", content="Hello, how can I help you?"),
]
agent_run_info = AgentRunInfo(
# ... other fields omitted
history=history,
)MCP Tool Integration (optional)
If you provide mcp_host (list of MCP service addresses), Nexent will automatically pull remote tools through ToolCollection.from_mcp and inject them into the agent:
agent_run_info = AgentRunInfo(
# ... other fields omitted
mcp_host=["http://localhost:3000"],
)Friendly error messages (EN/ZH) will be produced if the connection fails.
Interrupt Execution
During execution, you can trigger interruption via stop_event.set():
stop_event.set() # The agent will gracefully stop after the current step completesRelation to CoreAgent
agent_runis a wrapper overNexentAgentandCoreAgent, responsible for:- Constructing
CoreAgent(including models and tools) - Injecting history into memory
- Driving streaming execution and forwarding buffered messages from
MessageObserver
- Constructing
- You can also directly use
CoreAgent.run(stream=True)to handle streaming yourself (seecore/agents.md);agent_runprovides a more convenient threaded and JSON-message oriented interface.
