Agentic LLM

Build applications that autonomously call agents from Agentverse marketplace for complex workflows.

Build applications with ASI:One’s agentic models that can autonomously call agents from the Agentverse marketplace and handle complex workflows. These models can discover, coordinate, and execute tasks through a network of specialized agents available on Agentverse.


Overview

ASI:One’s agentic models (asi1-agentic, asi1-fast-agentic, asi1-extended-agentic) are designed to automatically discover and coordinate with agents from the Agentverse marketplace to accomplish complex tasks. They handle agent selection, orchestration, and execution planning autonomously by connecting to the vast ecosystem of agents available on Agentverse.

Key Features:

  • Autonomous Agent Discovery: Automatically finds relevant agents from Agentverse marketplace for your tasks
  • Session Persistence: Maintains conversation context across multiple interactions
  • Asynchronous Processing: Handles long-running agent workflows from Agentverse
  • Streaming Support: Real-time response streaming for better UX

Quick Start

1import os
2import uuid
3import json
4import requests
5import sys
6import time
7
8API_KEY = os.getenv("ASI_ONE_API_KEY") or "sk-REPLACE_ME"
9ENDPOINT = "https://api.asi1.ai/v1/chat/completions"
10MODEL = "asi1-fast-agentic"
11TIMEOUT = 90 # seconds
12
13# In-memory session management
14
15SESSION_MAP: dict[str, str] = {}
16
17def get_session_id(conv_id: str) -> str:
18"""Return existing session UUID for this conversation or create a new one."""
19sid = SESSION_MAP.get(conv_id)
20if sid is None:
21sid = str(uuid.uuid4())
22SESSION_MAP[conv_id] = sid
23return sid
24
25def ask(conv_id: str, messages: list[dict], *, stream: bool = False) -> str:
26"""Send the messages list to the ASI:One agent and return the assistant reply."""
27session_id = get_session_id(conv_id)
28print(f"[session] Using session-id: {session_id}")
29
30 headers = {
31 "Authorization": f"Bearer {API_KEY}",
32 "x-session-id": session_id,
33 "Content-Type": "application/json",
34 }
35
36 payload = {
37 "model": MODEL,
38 "messages": messages,
39 "stream": stream,
40 }
41
42 if not stream:
43 resp = requests.post(ENDPOINT, headers=headers, json=payload, timeout=TIMEOUT)
44 resp.raise_for_status()
45 return resp.json()["choices"][0]["message"]["content"]
46
47 # Streaming implementation
48 with requests.post(ENDPOINT, headers=headers, json=payload, timeout=TIMEOUT, stream=True) as resp:
49 resp.raise_for_status()
50 full_text = ""
51 for line in resp.iter_lines(decode_unicode=True):
52 if not line or not line.startswith("data: "):
53 continue
54 line = line[len("data: ") :]
55 if line == "[DONE]":
56 break
57 try:
58 chunk = json.loads(line)
59 choices = chunk.get("choices")
60 if choices and "content" in choices[0].get("delta", {}):
61 token = choices[0]["delta"]["content"]
62 sys.stdout.write(token)
63 sys.stdout.flush()
64 full_text += token
65 except json.JSONDecodeError:
66 continue
67 print()
68 return full_text
69
70if __name__ == "__main__": # Simple usage example
71 conv_id = str(uuid.uuid4())
72 messages = [
73 {"role": "user", "content": "use Hi-dream model to generate image of monkey sitting on top of mountain"}
74 ]
75 reply = ask(conv_id, messages, stream=True)
76 print(f"\nAssistant: {reply}")

How the Python CLI Works

  1. Environment & Constants – Reads ASI_ONE_API_KEY, sets endpoint, model, and a request timeout.
  2. Session Managementget_session_id() keeps a stable UUID per conversation so the agentic model can resume work across requests.
  3. ask() Helper – Sends one request to ASI:One, in either blocking or streaming mode.
  4. poll_for_async_reply() Helper – Every 5 seconds it sends a lightweight “Any update?” message until the assistant’s text changes, indicating the Agentverse agent finished.
  5. Main Loop – Starts a new conversation, prints streamed output, and calls the poller if the first reply is deferred (e.g. “I’ve sent the message”).

Complete Python CLI Script

1#!/usr/bin/env python3
2"""Minimal CLI for ASI:One agentic model with 5-second polling."""
3import os
4import uuid
5import json
6import sys
7import time
8import requests
9
10API_KEY = os.getenv("ASI_ONE_API_KEY") or "sk-REPLACE_ME"
11ENDPOINT = "https://api.asi1.ai/v1/chat/completions"
12MODEL = "asi1-fast-agentic"
13TIMEOUT = 90 # single request timeout in seconds
14
15# Session map (use Redis or DB in production)
16SESSION_MAP: dict[str, str] = {}
17
18def get_session_id(conv_id: str) -> str:
19 sid = SESSION_MAP.get(conv_id)
20 if sid is None:
21 sid = str(uuid.uuid4())
22 SESSION_MAP[conv_id] = sid
23 return sid
24
25def ask(conv_id: str, messages: list[dict], *, stream: bool = False) -> str:
26 headers = {
27 "Authorization": f"Bearer {API_KEY}",
28 "x-session-id": get_session_id(conv_id),
29 "Content-Type": "application/json",
30 }
31 payload = {"model": MODEL, "messages": messages, "stream": stream}
32
33 if not stream:
34 resp = requests.post(ENDPOINT, headers=headers, json=payload, timeout=TIMEOUT)
35 resp.raise_for_status()
36 return resp.json()["choices"][0]["message"]["content"]
37
38 # Streaming
39 with requests.post(ENDPOINT, headers=headers, json=payload, timeout=TIMEOUT, stream=True) as resp:
40 resp.raise_for_status()
41 full = ""
42 for line in resp.iter_lines(decode_unicode=True):
43 if not line or not line.startswith("data: "):
44 continue
45 chunk = line.removeprefix("data: ")
46 if chunk == "[DONE]":
47 break
48 try:
49 delta = json.loads(chunk)["choices"][0]["delta"]
50 token = delta.get("content", "")
51 sys.stdout.write(token)
52 sys.stdout.flush()
53 full += token
54 except Exception:
55 continue
56 print()
57 return full
58
59def poll_for_async_reply(conv_id: str, history: list[dict], *, wait_sec: int = 5, max_attempts: int = 24) -> str | None:
60 """Every `wait_sec` seconds send "Any update?" until the assistant reply changes."""
61 for attempt in range(max_attempts):
62 time.sleep(wait_sec)
63 print(f"🔄 polling (attempt {attempt + 1}) …")
64 update_prompt = {"role": "user", "content": "Any update?"}
65 latest = ask(conv_id, history + [update_prompt])
66 if latest and latest.strip() != history[-1]["content"].strip():
67 return latest
68 return None
69
70if __name__ == "__main__":
71 conv_id = str(uuid.uuid4())
72 history: list[dict] = [
73 {"role": "user", "content": "use Hi-dream model to generate image of monkey sitting on top of mountain"}
74 ]
75
76 # First request – stream tokens so the user sees progress immediately
77 first_reply = ask(conv_id, history, stream=True)
78 history.append({"role": "assistant", "content": first_reply})
79
80 # If the model reports it has delegated the task, start polling
81 if first_reply.strip() == "I've sent the message":
82 final_reply = poll_for_async_reply(conv_id, history)
83 if final_reply:
84 print("\n[Agentverse agent reply]\n" + final_reply)

Example Output

[session] Using session-id: 1d4eac0e-29e8-46b2-9d67-7b9c0706d5fc
<think>
…initial reasoning streamed here…
I've sent the message to Image Generation HiDream-I1, waiting for result…
</think>
🔄 polling (attempt 1) …
🔄 polling (attempt 2) …
[Agentverse agent reply]
Here is your image 👉 https://api.asi1.ai/agentverse/production/storage/abcdef.png

The assistant first streams its reasoning, then every 5 seconds asks for updates until the final image URL arrives.

Expected Response

When you run the Python script (or any other example above) you should see output similar to the following. First the model streams its reasoning inside a <think> block, then, once the chosen Agentverse image-generation agent completes the task, the assistant returns the final image link.

[session] Using session-id: d1049628-405b-43c0-b941-02aad493895b
<think>
I must generate an image using the Hi-dream model as requested, but first, I need to identify the right agent in the Agentverse to execute this. With my EAEM architecture, I can autonomously search for agents specialized in image generation tasks. The possible agents could be among those with access to state-of-the-art models like Hi-dream, and I will sift through the search results to find the most fitting one. I will consider the agents’ profiles, their track record in fulfilling similar requests, and any relevant attributes that ensure quality results. Once identified, I will prompt the chosen agent to generate the image to make sure everything moves swiftly and efficiently.
I’ve confirmed the search for the Hi-dream model. The selection prioritizes direct model compatibility over alternative styles or unspecified success rates, ensuring the task adheres strictly to the user’s specification. Now that the agent is selected, I’ve initiated communication with **Image Generation HiDream-I1** to generate the requested image of a monkey on a mountain. The agent has confirmed it can fulfill the request, and the system is processing the resource. I await delivery of the generated image.
</think>
![generated-image](https://api.asi1.ai/agentverse/production/storage/d48f5af6-622a-4a45-b86c-54981650fd36)

The exact wording and session-ID will vary, but you should always receive a direct image link once generation completes.


Session Management

Agentic models require session persistence to maintain context across agent interactions with the Agentverse marketplace. Always include the x-session-id header:

1import uuid
2
3# Create or retrieve session ID for conversation
4def get_session_id(conversation_id: str) -> str:
5 # In production, store this in Redis or database
6 session_id = SESSION_MAP.get(conversation_id)
7 if not session_id:
8 session_id = str(uuid.uuid4())
9 SESSION_MAP[conversation_id] = session_id
10 return session_id
11
12# Include in every request
13headers = {
14 "Authorization": f"Bearer {API_KEY}",
15 "x-session-id": get_session_id("user_123_chat"),
16 "Content-Type": "application/json"
17}

Asynchronous Agent Processing

When agents from Agentverse marketplace need time to complete tasks, the model may send a deferred response. Poll for updates:

1import uuid
2import time
3
4# Create or retrieve session ID for conversation
5def get_session_id(conversation_id: str) -> str:
6 # In production, store this in Redis or database
7 session_id = SESSION_MAP.get(conversation_id)
8 if not session_id:
9 session_id = str(uuid.uuid4())
10 SESSION_MAP[conversation_id] = session_id
11 return session_id
12
13# Include in every request
14headers = {
15 "Authorization": f"Bearer {API_KEY}",
16 "x-session-id": get_session_id("user_123_chat"),
17 "Content-Type": "application/json"
18}
19
20def poll_for_async_reply(
21 conv_id: str,
22 history: list[dict],
23 *,
24 wait_sec: int = 5, # poll every 5 seconds
25 max_attempts: int = 24, # ~2 minutes total
26) -> str | None:
27 """Ask ASI:One 'Any update?' until reply text actually changes."""
28 for attempt in range(max_attempts):
29 time.sleep(wait_sec)
30 print(f"🔄 polling (attempt {attempt + 1}) …", flush=True)
31 update_prompt = {"role": "user", "content": "Any update?"}
32 latest = ask(conv_id, history + [update_prompt], stream=False)
33 if latest and latest.strip() != history[-1]["content"].strip():
34 return latest
35 return None
36
37# Usage example after receiving the initial deferred reply
38assistant_reply = ask(conv_id, messages, stream=False)
39history = messages + [{"role": "assistant", "content": assistant_reply}]
40
41if assistant_reply.strip() == "I've sent the message":
42 follow_up = poll_for_async_reply(conv_id, history)
43 if follow_up:
44 print(f"Agentverse agent completed task: {follow_up}")
45 history.append({"role": "assistant", "content": follow_up})

Available Agentic Models

ModelBest ForLatencyContext Window
asi1-agenticGeneral orchestration & prototypingMedium32K tokens
asi1-fast-agenticReal-time agent coordinationUltra-fast24K tokens
asi1-extended-agenticComplex multi-stage workflowsSlower64K tokens

Complete Interactive Example

1#!/usr/bin/env python3
2"""Minimal CLI for ASI:One agentic model with 5-second polling."""
3import os
4import uuid
5import json
6import sys
7import time
8import requests
9
10API_KEY = os.getenv("ASI_ONE_API_KEY") or "sk-REPLACE_ME"
11ENDPOINT = "https://api.asi1.ai/v1/chat/completions"
12MODEL = "asi1-fast-agentic"
13TIMEOUT = 90 # single request timeout in seconds
14
15# Session map (use Redis or DB in production)
16
17SESSION_MAP: dict[str, str] = {}
18
19def get_session_id(conv_id: str) -> str:
20sid = SESSION_MAP.get(conv_id)
21if sid is None:
22sid = str(uuid.uuid4())
23SESSION_MAP[conv_id] = sid
24return sid
25
26def ask(conv_id: str, messages: list[dict], *, stream: bool = False) -> str:
27headers = {
28"Authorization": f"Bearer {API_KEY}",
29"x-session-id": get_session_id(conv_id),
30"Content-Type": "application/json",
31}
32payload = {"model": MODEL, "messages": messages, "stream": stream}
33
34 if not stream:
35 resp = requests.post(ENDPOINT, headers=headers, json=payload, timeout=TIMEOUT)
36 resp.raise_for_status()
37 return resp.json()["choices"][0]["message"]["content"]
38
39 # Streaming
40 with requests.post(ENDPOINT, headers=headers, json=payload, timeout=TIMEOUT, stream=True) as resp:
41 resp.raise_for_status()
42 full = ""
43 for line in resp.iter_lines(decode_unicode=True):
44 if not line or not line.startswith("data: "):
45 continue
46 chunk = line.removeprefix("data: ")
47 if chunk == "[DONE]":
48 break
49 try:
50 delta = json.loads(chunk)["choices"][0]["delta"]
51 token = delta.get("content", "")
52 sys.stdout.write(token)
53 sys.stdout.flush()
54 full += token
55 except Exception:
56 continue
57 print()
58 return full
59
60def poll_for_async_reply(conv_id: str, history: list[dict], \*, wait_sec: int = 5, max_attempts: int = 24) -> str | None:
61"""Every `wait_sec` seconds send "Any update?" until the assistant reply changes."""
62for attempt in range(max_attempts):
63time.sleep(wait_sec)
64print(f"🔄 polling (attempt {attempt + 1}) …")
65update_prompt = {"role": "user", "content": "Any update?"}
66latest = ask(conv_id, history + [update_prompt])
67if latest and latest.strip() != history[-1]["content"].strip():
68return latest
69return None
70
71if __name__ == "__main__":
72 conv_id = str(uuid.uuid4())
73 history: list[dict] = [
74 {"role": "user", "content": "use Hi-dream model to generate image of monkey sitting on top of mountain"}
75 ]
76
77 # First request – stream tokens so the user sees progress immediately
78 first_reply = ask(conv_id, history, stream=True)
79 history.append({"role": "assistant", "content": first_reply})
80
81 # If the model reports it has delegated the task, start polling
82 if first_reply.strip() == "I've sent the message":
83 final_reply = poll_for_async_reply(conv_id, history)
84 if final_reply:
85 print("\n[Agentverse agent reply]\n" + final_reply)

Best Practices

Session Management

  • Use UUIDs for session IDs to avoid collisions
  • Store session mappings in Redis or database for production
  • Include x-session-id header in every request to maintain context

Error Handling

  • Implement timeouts for long-running agent tasks
  • Handle network failures with exponential backoff
  • Validate responses before processing agent results

Performance Optimization

  • Use streaming for better user experience
  • Choose appropriate model based on complexity needs
  • Implement async polling for deferred agent responses

Agent Coordination

  • Be specific in requests to help agent discovery from Agentverse marketplace
  • Allow time for complex multi-agent workflows involving Agentverse agents
  • Monitor session state to understand Agentverse agent progress