Module naptha_sdk.client.hub
Functions
async def list_nodes(node_ip: str) ‑> List
-
Expand source code
async def list_nodes(node_ip: str) -> List: hub_username = os.getenv("HUB_USERNAME") hub_password = os.getenv("HUB_PASSWORD") hub_url = os.getenv("HUB_URL") async with Hub(hub_url) as hub: try: _, _, _ = await hub.signin(hub_username, hub_password) except Exception as auth_error: raise ConnectionError(f"Failed to authenticate with Hub: {str(auth_error)}") node = await hub.list_nodes(node_ip=node_ip) return node
async def user_setup_flow(hub_url, public_key)
-
Expand source code
async def user_setup_flow(hub_url, public_key): async with Hub(hub_url, public_key) as hub: username, password = os.getenv("HUB_USERNAME"), os.getenv("HUB_PASSWORD") username_exists, password_exists = len(username) > 1, len(password) > 1 public_key = get_public_key(os.getenv("PRIVATE_KEY")) if os.getenv("PRIVATE_KEY") else None logger.info(f"Checking if user exists... User: {username}") user = await hub.get_user_by_username(username) user_public_key = await hub.get_user_by_public_key(public_key) existing_public_key_user = user_public_key is not None and user_public_key.get('username') != username match user, username_exists, password_exists, existing_public_key_user: case _, True, _, True: # Public key exists and doesn't match the provided username and password raise Exception(f"Using user credentials in .env. User with public key {public_key} already exists but doesn't match the provided username and password. Please use a different private key (or set blank in the .env file to randomly generate with launch.sh).") case _, False, False, True: # Public key exists and no username/password provided raise Exception(f"Using private key in .env. User with public key {public_key} already exists. Cannot create new user. Please use a different private key (or set blank in the .env file to randomly generate with launch.sh).") case None, False, _, False: # User doesn't exist and credentials are missing logger.info("No credentials provided in .env.") create_new = input("Would you like to create a new user? (yes/no): ").lower() if create_new != 'yes': raise Exception("User does not exist and new user creation was declined.") logger.info("Creating new user...") while True: username = input("Enter username: ") existing_user = await hub.get_user_by_username(username) if existing_user: print(f"Username '{username}' already exists. Please choose a different username.") continue password = input("Enter password: ") public_key, private_key_path = generate_keypair(f"{username}.pem") print(f"Signing up user: {username} with public key: {public_key}") success, token, user_id = await hub.signup(username, password, public_key) if success: add_credentials_to_env(username, password, private_key_path) logger.info("Sign up successful!") return token, user_id else: logger.error("Sign up failed. Please try again.") case None, True, True, False: logger.info("User doesn't exist but some credentials are provided in .env. Using them to create new user.") private_key_path = None if not public_key: logger.info("No public key provided. Generating new keypair...") public_key, private_key_path = generate_keypair(f"{username}.pem") print(f"Signing up user: {username} with public key: {public_key}") success, token, user_id = await hub.signup(username, password, public_key) if success: if private_key_path: add_credentials_to_env(username, password, private_key_path) logger.info("Sign up successful!") return token, user_id else: logger.error("Sign up failed.") raise Exception("Sign up failed.") case dict(), True, True, False: # User exists, attempt to sign in logger.info("Using user credentials in .env. User exists. Attempting to sign in...") if os.getenv("PRIVATE_KEY") and is_hex(os.getenv("PRIVATE_KEY")): write_private_key_to_file(os.getenv("PRIVATE_KEY"), username) success, token, user_id = await hub.signin(username, password) if success: logger.info("Sign in successful!") return token, user_id else: logger.error("Sign in failed. Please check your credentials in the .env file.") raise Exception("Sign in failed. Please check your credentials in the .env file.") case _: logger.error("Unexpected case encountered in user setup flow.") raise Exception("Unexpected error in user setup. Please check your configuration and try again.")
Classes
class Hub (hub_url, public_key=None, *args, **kwargs)
-
Expand source code
class Hub: """The Hub class is the entry point into Naptha AI Hub.""" def __init__(self, hub_url, public_key=None, *args, **kwargs): self.hub_url = hub_url self.public_key = public_key self.ns = "naptha" self.db = "naptha" self.surrealdb = Surreal(hub_url) self.is_authenticated = False self.user_id = None self.token = None logger.info(f"Hub URL: {hub_url}") async def connect(self): """Connect to the database and authenticate""" if not self.is_authenticated: try: await self.surrealdb.connect() await self.surrealdb.use(namespace=self.ns, database=self.db) except Exception as e: logger.error(f"Connection failed: {e}") raise def _decode_token(self, token: str) -> str: return jwt.decode(token, options={"verify_signature": False})["ID"] async def signin(self, username: str, password: str) -> Tuple[bool, Optional[str], Optional[str]]: logger.info(f"Attempting authentication for user: {username}") try: user = await self.surrealdb.signin({ "NS": self.ns, "DB": self.db, "AC": "user", "username": username, "password": password, }) self.user_id = self._decode_token(user) local_public_key = self.public_key or get_public_key(os.getenv("PRIVATE_KEY")) hub_public_key = self.user_id.split(":")[1] if local_public_key != hub_public_key: logger.error("Public key mismatch", extra={ "local_key": local_public_key, "hub_key": hub_public_key }) raise Exception( "Public key mismatch. Please verify your private key in .env file. " f"Local key: {local_public_key}, Hub key: {hub_public_key}" ) self.token = user self.is_authenticated = True logger.info(f"Successfully authenticated user: {username}") return True, user, self.user_id except Exception as e: raise Exception(f"Authentication failed: {str(e)}. Please ensure you have set HUB_URL, HUB_USERNAME, and HUB_PASSWORD in the .env file, and that you have run `naptha signup` for this user.") async def signup(self, username: str, password: str, public_key: str) -> Tuple[bool, Optional[str], Optional[str]]: user = await self.surrealdb.signup({ "NS": self.ns, "DB": self.db, "AC": "user", "name": username, "username": username, "password": password, "public_key": public_key, }) if not user: return False, None, None self.user_id = self._decode_token(user) return True, user, self.user_id async def get_user(self, user_id: str) -> Optional[Dict]: return await self.surrealdb.select(user_id) async def get_user_by_username(self, username: str) -> Optional[Dict]: result = await self.surrealdb.query( "SELECT * FROM user WHERE username = $username LIMIT 1", {"username": username} ) if result and result[0]["result"]: return result[0]["result"][0] return None async def get_user_by_public_key(self, public_key: str) -> Optional[Dict]: result = await self.surrealdb.query( "SELECT * FROM user WHERE public_key = $public_key LIMIT 1", {"public_key": public_key} ) if result and result[0]["result"]: return result[0]["result"][0] return None async def get_credits(self) -> List: user = await self.get_user(self.user_id) return user['credits'] async def get_node(self, node_id: str) -> Optional[Dict]: return await self.surrealdb.select(node_id) async def list_servers(self) -> List: servers = await self.surrealdb.query("SELECT * FROM server;") return servers[0]['result'] async def list_nodes(self, node_ip=None) -> List: if not node_ip: nodes = await self.surrealdb.query("SELECT * FROM node;") return nodes[0]['result'] else: nodes = await self.surrealdb.query("SELECT * FROM node WHERE ip=$node_ip;", {"node_ip": node_ip}) node = nodes[0]['result'][0] server_ids = node['servers'] servers = [] for server_id in server_ids: server = await self.surrealdb.select(server_id) servers.append(server) node['servers'] = servers alt_ports = [ server['port'] for server in servers if server['server_type'] in ['ws', 'grpc'] ] node['ports'] = alt_ports return node async def list_agents(self, agent_name=None) -> List: if not agent_name: agents = await self.surrealdb.query("SELECT * FROM agent;") return agents[0]['result'] else: agent = await self.surrealdb.query("SELECT * FROM agent WHERE id=$agent_name;", {"agent_name": agent_name}) return agent[0]['result'] async def list_tools(self, tool_name=None) -> List: if not tool_name: tools = await self.surrealdb.query("SELECT * FROM tool;") return tools[0]['result'] else: tool = await self.surrealdb.query("SELECT * FROM tool WHERE name=$tool_name;", {"tool_name": tool_name}) return tool[0]['result'] async def list_orchestrators(self, orchestrator_name=None) -> List: if not orchestrator_name: orchestrators = await self.surrealdb.query("SELECT * FROM orchestrator;") return orchestrators[0]['result'] else: orchestrator = await self.surrealdb.query("SELECT * FROM orchestrator WHERE id=$orchestrator_name;", {"orchestrator_name": orchestrator_name}) return orchestrator[0]['result'] async def list_environments(self, environment_name=None) -> List: if not environment_name: environments = await self.surrealdb.query("SELECT * FROM environment;") return environments[0]['result'] else: environment = await self.surrealdb.query("SELECT * FROM environment WHERE id=$environment_name;", {"environment_name": environment_name}) return environment[0]['result'] async def list_personas(self, persona_name=None) -> List: if not persona_name: personas = await self.surrealdb.query("SELECT * FROM persona;") return personas[0]['result'] else: if not "persona:" in persona_name: persona_name = f"persona:{persona_name}" persona = await self.surrealdb.query("SELECT * FROM persona WHERE id=$persona_name;", {"persona_name": persona_name}) return persona[0]['result'] async def list_memories(self, memory_name=None) -> List: if not memory_name: memories = await self.surrealdb.query("SELECT * FROM memory;") return memories[0]['result'] else: memory = await self.surrealdb.query("SELECT * FROM memory WHERE id=$memory_name;", {"memory_name": memory_name}) return memory[0]['result'] async def list_kbs(self, kb_name=None) -> List: if not kb_name: kbs = await self.surrealdb.query("SELECT * FROM kb;") return kbs[0]['result'] else: kb = await self.surrealdb.query("SELECT * FROM kb WHERE name=$kb_name;", {"kb_name": kb_name}) return kb[0]['result'] async def list_modules(self, module_type, module_name) -> List: if module_type == 'agent': modules = await self.surrealdb.query("SELECT * FROM agent WHERE id=$module_name;", {"module_name": module_name}) elif module_type == 'tool': modules = await self.surrealdb.query("SELECT * FROM tool WHERE id=$module_name;", {"module_name": module_name}) elif module_type == 'orchestrator': modules = await self.surrealdb.query("SELECT * FROM orchestrator WHERE id=$module_name;", {"module_name": module_name}) elif module_type == 'environment': modules = await self.surrealdb.query("SELECT * FROM environment WHERE id=$module_name;", {"module_name": module_name}) elif module_type == 'persona': modules = await self.surrealdb.query("SELECT * FROM persona WHERE id=$module_name;", {"module_name": module_name}) elif module_type == 'memory': modules = await self.surrealdb.query("SELECT * FROM memory WHERE id=$module_name;", {"module_name": module_name}) elif module_type == 'kb': modules = await self.surrealdb.query("SELECT * FROM kb WHERE id=$module_name;", {"module_name": module_name}) return modules[0]['result'] async def list_kb_content(self, kb_name: str) -> List: kb_content = await self.surrealdb.query("SELECT * FROM kb_content WHERE kb_id=$kb_id;", {"kb_id": f"kb:{kb_name}"}) return kb_content[0]['result'] async def delete_agent(self, agent_id: str) -> Tuple[bool, Optional[Dict]]: if ":" not in agent_id: agent_id = f"agent:{agent_id}".strip() print(f"Deleting agent: {agent_id}") success = await self.surrealdb.delete(agent_id) if success: print("Deleted agent") else: print("Failed to delete agent") return success async def delete_tool(self, tool_id: str) -> Tuple[bool, Optional[Dict]]: if ":" not in tool_id: tool_id = f"tool:{tool_id}".strip() print(f"Deleting tool: {tool_id}") success = await self.surrealdb.delete(tool_id) if success: print("Deleted tool") else: print("Failed to delete tool") return success async def delete_orchestrator(self, orchestrator_id: str) -> Tuple[bool, Optional[Dict]]: if ":" not in orchestrator_id: orchestrator_id = f"orchestrator:{orchestrator_id}".strip() print(f"Deleting orchestrator: {orchestrator_id}") success = await self.surrealdb.delete(orchestrator_id) if success: print("Deleted orchestrator") else: print("Failed to delete orchestrator") return success async def delete_environment(self, environment_id: str) -> Tuple[bool, Optional[Dict]]: if ":" not in environment_id: environment_id = f"environment:{environment_id}".strip() print(f"Deleting environment: {environment_id}") success = await self.surrealdb.delete(environment_id) if success: print("Deleted environment") else: print("Failed to delete environment") return success async def delete_persona(self, persona_id: str) -> Tuple[bool, Optional[Dict]]: if ":" not in persona_id: persona_id = f"persona:{persona_id}".strip() print(f"Deleting persona: {persona_id}") success = await self.surrealdb.delete(persona_id) if success: print("Deleted persona") else: print("Failed to delete persona") return success async def delete_memory(self, memory_id: str) -> Tuple[bool, Optional[Dict]]: if ":" not in memory_id: memory_id = f"memory:{memory_id}".strip() print(f"Deleting memory: {memory_id}") success = await self.surrealdb.delete(memory_id) if success: print("Deleted memory") else: print("Failed to delete memory") return success async def delete_kb(self, kb_id: str) -> Tuple[bool, Optional[Dict]]: if ":" not in kb_id: kb_id = f"kb:{kb_id}".strip() print(f"Deleting knowledge base: {kb_id}") success = await self.surrealdb.delete(kb_id) if success: print("Deleted knowledge base") else: print("Failed to delete knowledge base") return success async def create_agent(self, agent_config: Dict) -> Tuple[bool, Optional[Dict]]: if not agent_config.get('id'): return await self.surrealdb.create("agent", agent_config) else: return await self.surrealdb.create(agent_config.pop('id'), agent_config) async def create_tool(self, tool_config: Dict) -> Tuple[bool, Optional[Dict]]: if not tool_config.get('id'): return await self.surrealdb.create("tool", tool_config) else: return await self.surrealdb.create(tool_config.pop('id'), tool_config) async def create_orchestrator(self, orchestrator_config: Dict) -> Tuple[bool, Optional[Dict]]: if not orchestrator_config.get('id'): return await self.surrealdb.create("orchestrator", orchestrator_config) else: return await self.surrealdb.create(orchestrator_config.pop('id'), orchestrator_config) async def create_environment(self, environment_config: Dict) -> Tuple[bool, Optional[Dict]]: if not environment_config.get('id'): return await self.surrealdb.create("environment", environment_config) else: return await self.surrealdb.create(environment_config.pop('id'), environment_config) async def create_persona(self, persona_config: Dict) -> Tuple[bool, Optional[Dict]]: if not persona_config.get('id'): return await self.surrealdb.create("persona", persona_config) else: return await self.surrealdb.create(persona_config.pop('id'), persona_config) async def create_memory(self, memory_config: Dict) -> Tuple[bool, Optional[Dict]]: if not memory_config.get('id'): return await self.surrealdb.create("memory", memory_config) else: return await self.surrealdb.create(memory_config.pop('id'), memory_config) async def create_kb(self, kb_config: Dict) -> Tuple[bool, Optional[Dict]]: if not kb_config.get('id'): return await self.surrealdb.create("kb", kb_config) else: return await self.surrealdb.create(kb_config.pop('id'), kb_config) async def update_agent(self, agent_config: Dict) -> Tuple[bool, Optional[Dict]]: return await self.surrealdb.update("agent", agent_config) async def create_or_update_module(self, module_type, module_config: Dict) -> Tuple[bool, Optional[Dict]]: list_modules = await self.list_modules(module_type, module_config.get('id')) if not list_modules: logger.info(f"Module does not exist. Registering new module: {module_config.get('id')}") return await self.surrealdb.create(module_type, module_config) else: logger.info(f"Module already exists. Updating existing module: {module_config.get('id')}") return await self.surrealdb.update(module_config.pop('id'), module_config) async def close(self): """Close the database connection""" try: await self.surrealdb.close() except Exception as e: logger.error(f"Error closing database connection: {e}") finally: self.is_authenticated = False self.user_id = None self.token = None async def __aenter__(self): """Async enter method for context manager""" await self.connect() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async exit method for context manager""" await self.close()
The Hub class is the entry point into Naptha AI Hub.
Methods
async def close(self)
-
Expand source code
async def close(self): """Close the database connection""" try: await self.surrealdb.close() except Exception as e: logger.error(f"Error closing database connection: {e}") finally: self.is_authenticated = False self.user_id = None self.token = None
Close the database connection
async def connect(self)
-
Expand source code
async def connect(self): """Connect to the database and authenticate""" if not self.is_authenticated: try: await self.surrealdb.connect() await self.surrealdb.use(namespace=self.ns, database=self.db) except Exception as e: logger.error(f"Connection failed: {e}") raise
Connect to the database and authenticate
async def create_agent(self, agent_config: Dict) ‑> Tuple[bool, Dict | None]
-
Expand source code
async def create_agent(self, agent_config: Dict) -> Tuple[bool, Optional[Dict]]: if not agent_config.get('id'): return await self.surrealdb.create("agent", agent_config) else: return await self.surrealdb.create(agent_config.pop('id'), agent_config)
async def create_environment(self, environment_config: Dict) ‑> Tuple[bool, Dict | None]
-
Expand source code
async def create_environment(self, environment_config: Dict) -> Tuple[bool, Optional[Dict]]: if not environment_config.get('id'): return await self.surrealdb.create("environment", environment_config) else: return await self.surrealdb.create(environment_config.pop('id'), environment_config)
async def create_kb(self, kb_config: Dict) ‑> Tuple[bool, Dict | None]
-
Expand source code
async def create_kb(self, kb_config: Dict) -> Tuple[bool, Optional[Dict]]: if not kb_config.get('id'): return await self.surrealdb.create("kb", kb_config) else: return await self.surrealdb.create(kb_config.pop('id'), kb_config)
async def create_memory(self, memory_config: Dict) ‑> Tuple[bool, Dict | None]
-
Expand source code
async def create_memory(self, memory_config: Dict) -> Tuple[bool, Optional[Dict]]: if not memory_config.get('id'): return await self.surrealdb.create("memory", memory_config) else: return await self.surrealdb.create(memory_config.pop('id'), memory_config)
async def create_or_update_module(self, module_type, module_config: Dict) ‑> Tuple[bool, Dict | None]
-
Expand source code
async def create_or_update_module(self, module_type, module_config: Dict) -> Tuple[bool, Optional[Dict]]: list_modules = await self.list_modules(module_type, module_config.get('id')) if not list_modules: logger.info(f"Module does not exist. Registering new module: {module_config.get('id')}") return await self.surrealdb.create(module_type, module_config) else: logger.info(f"Module already exists. Updating existing module: {module_config.get('id')}") return await self.surrealdb.update(module_config.pop('id'), module_config)
async def create_orchestrator(self, orchestrator_config: Dict) ‑> Tuple[bool, Dict | None]
-
Expand source code
async def create_orchestrator(self, orchestrator_config: Dict) -> Tuple[bool, Optional[Dict]]: if not orchestrator_config.get('id'): return await self.surrealdb.create("orchestrator", orchestrator_config) else: return await self.surrealdb.create(orchestrator_config.pop('id'), orchestrator_config)
async def create_persona(self, persona_config: Dict) ‑> Tuple[bool, Dict | None]
-
Expand source code
async def create_persona(self, persona_config: Dict) -> Tuple[bool, Optional[Dict]]: if not persona_config.get('id'): return await self.surrealdb.create("persona", persona_config) else: return await self.surrealdb.create(persona_config.pop('id'), persona_config)
async def create_tool(self, tool_config: Dict) ‑> Tuple[bool, Dict | None]
-
Expand source code
async def create_tool(self, tool_config: Dict) -> Tuple[bool, Optional[Dict]]: if not tool_config.get('id'): return await self.surrealdb.create("tool", tool_config) else: return await self.surrealdb.create(tool_config.pop('id'), tool_config)
async def delete_agent(self, agent_id: str) ‑> Tuple[bool, Dict | None]
-
Expand source code
async def delete_agent(self, agent_id: str) -> Tuple[bool, Optional[Dict]]: if ":" not in agent_id: agent_id = f"agent:{agent_id}".strip() print(f"Deleting agent: {agent_id}") success = await self.surrealdb.delete(agent_id) if success: print("Deleted agent") else: print("Failed to delete agent") return success
async def delete_environment(self, environment_id: str) ‑> Tuple[bool, Dict | None]
-
Expand source code
async def delete_environment(self, environment_id: str) -> Tuple[bool, Optional[Dict]]: if ":" not in environment_id: environment_id = f"environment:{environment_id}".strip() print(f"Deleting environment: {environment_id}") success = await self.surrealdb.delete(environment_id) if success: print("Deleted environment") else: print("Failed to delete environment") return success
async def delete_kb(self, kb_id: str) ‑> Tuple[bool, Dict | None]
-
Expand source code
async def delete_kb(self, kb_id: str) -> Tuple[bool, Optional[Dict]]: if ":" not in kb_id: kb_id = f"kb:{kb_id}".strip() print(f"Deleting knowledge base: {kb_id}") success = await self.surrealdb.delete(kb_id) if success: print("Deleted knowledge base") else: print("Failed to delete knowledge base") return success
async def delete_memory(self, memory_id: str) ‑> Tuple[bool, Dict | None]
-
Expand source code
async def delete_memory(self, memory_id: str) -> Tuple[bool, Optional[Dict]]: if ":" not in memory_id: memory_id = f"memory:{memory_id}".strip() print(f"Deleting memory: {memory_id}") success = await self.surrealdb.delete(memory_id) if success: print("Deleted memory") else: print("Failed to delete memory") return success
async def delete_orchestrator(self, orchestrator_id: str) ‑> Tuple[bool, Dict | None]
-
Expand source code
async def delete_orchestrator(self, orchestrator_id: str) -> Tuple[bool, Optional[Dict]]: if ":" not in orchestrator_id: orchestrator_id = f"orchestrator:{orchestrator_id}".strip() print(f"Deleting orchestrator: {orchestrator_id}") success = await self.surrealdb.delete(orchestrator_id) if success: print("Deleted orchestrator") else: print("Failed to delete orchestrator") return success
async def delete_persona(self, persona_id: str) ‑> Tuple[bool, Dict | None]
-
Expand source code
async def delete_persona(self, persona_id: str) -> Tuple[bool, Optional[Dict]]: if ":" not in persona_id: persona_id = f"persona:{persona_id}".strip() print(f"Deleting persona: {persona_id}") success = await self.surrealdb.delete(persona_id) if success: print("Deleted persona") else: print("Failed to delete persona") return success
async def delete_tool(self, tool_id: str) ‑> Tuple[bool, Dict | None]
-
Expand source code
async def delete_tool(self, tool_id: str) -> Tuple[bool, Optional[Dict]]: if ":" not in tool_id: tool_id = f"tool:{tool_id}".strip() print(f"Deleting tool: {tool_id}") success = await self.surrealdb.delete(tool_id) if success: print("Deleted tool") else: print("Failed to delete tool") return success
async def get_credits(self) ‑> List
-
Expand source code
async def get_credits(self) -> List: user = await self.get_user(self.user_id) return user['credits']
async def get_node(self, node_id: str) ‑> Dict | None
-
Expand source code
async def get_node(self, node_id: str) -> Optional[Dict]: return await self.surrealdb.select(node_id)
async def get_user(self, user_id: str) ‑> Dict | None
-
Expand source code
async def get_user(self, user_id: str) -> Optional[Dict]: return await self.surrealdb.select(user_id)
async def get_user_by_public_key(self, public_key: str) ‑> Dict | None
-
Expand source code
async def get_user_by_public_key(self, public_key: str) -> Optional[Dict]: result = await self.surrealdb.query( "SELECT * FROM user WHERE public_key = $public_key LIMIT 1", {"public_key": public_key} ) if result and result[0]["result"]: return result[0]["result"][0] return None
async def get_user_by_username(self, username: str) ‑> Dict | None
-
Expand source code
async def get_user_by_username(self, username: str) -> Optional[Dict]: result = await self.surrealdb.query( "SELECT * FROM user WHERE username = $username LIMIT 1", {"username": username} ) if result and result[0]["result"]: return result[0]["result"][0] return None
async def list_agents(self, agent_name=None) ‑> List
-
Expand source code
async def list_agents(self, agent_name=None) -> List: if not agent_name: agents = await self.surrealdb.query("SELECT * FROM agent;") return agents[0]['result'] else: agent = await self.surrealdb.query("SELECT * FROM agent WHERE id=$agent_name;", {"agent_name": agent_name}) return agent[0]['result']
async def list_environments(self, environment_name=None) ‑> List
-
Expand source code
async def list_environments(self, environment_name=None) -> List: if not environment_name: environments = await self.surrealdb.query("SELECT * FROM environment;") return environments[0]['result'] else: environment = await self.surrealdb.query("SELECT * FROM environment WHERE id=$environment_name;", {"environment_name": environment_name}) return environment[0]['result']
async def list_kb_content(self, kb_name: str) ‑> List
-
Expand source code
async def list_kb_content(self, kb_name: str) -> List: kb_content = await self.surrealdb.query("SELECT * FROM kb_content WHERE kb_id=$kb_id;", {"kb_id": f"kb:{kb_name}"}) return kb_content[0]['result']
async def list_kbs(self, kb_name=None) ‑> List
-
Expand source code
async def list_kbs(self, kb_name=None) -> List: if not kb_name: kbs = await self.surrealdb.query("SELECT * FROM kb;") return kbs[0]['result'] else: kb = await self.surrealdb.query("SELECT * FROM kb WHERE name=$kb_name;", {"kb_name": kb_name}) return kb[0]['result']
async def list_memories(self, memory_name=None) ‑> List
-
Expand source code
async def list_memories(self, memory_name=None) -> List: if not memory_name: memories = await self.surrealdb.query("SELECT * FROM memory;") return memories[0]['result'] else: memory = await self.surrealdb.query("SELECT * FROM memory WHERE id=$memory_name;", {"memory_name": memory_name}) return memory[0]['result']
async def list_modules(self, module_type, module_name) ‑> List
-
Expand source code
async def list_modules(self, module_type, module_name) -> List: if module_type == 'agent': modules = await self.surrealdb.query("SELECT * FROM agent WHERE id=$module_name;", {"module_name": module_name}) elif module_type == 'tool': modules = await self.surrealdb.query("SELECT * FROM tool WHERE id=$module_name;", {"module_name": module_name}) elif module_type == 'orchestrator': modules = await self.surrealdb.query("SELECT * FROM orchestrator WHERE id=$module_name;", {"module_name": module_name}) elif module_type == 'environment': modules = await self.surrealdb.query("SELECT * FROM environment WHERE id=$module_name;", {"module_name": module_name}) elif module_type == 'persona': modules = await self.surrealdb.query("SELECT * FROM persona WHERE id=$module_name;", {"module_name": module_name}) elif module_type == 'memory': modules = await self.surrealdb.query("SELECT * FROM memory WHERE id=$module_name;", {"module_name": module_name}) elif module_type == 'kb': modules = await self.surrealdb.query("SELECT * FROM kb WHERE id=$module_name;", {"module_name": module_name}) return modules[0]['result']
async def list_nodes(self, node_ip=None) ‑> List
-
Expand source code
async def list_nodes(self, node_ip=None) -> List: if not node_ip: nodes = await self.surrealdb.query("SELECT * FROM node;") return nodes[0]['result'] else: nodes = await self.surrealdb.query("SELECT * FROM node WHERE ip=$node_ip;", {"node_ip": node_ip}) node = nodes[0]['result'][0] server_ids = node['servers'] servers = [] for server_id in server_ids: server = await self.surrealdb.select(server_id) servers.append(server) node['servers'] = servers alt_ports = [ server['port'] for server in servers if server['server_type'] in ['ws', 'grpc'] ] node['ports'] = alt_ports return node
async def list_orchestrators(self, orchestrator_name=None) ‑> List
-
Expand source code
async def list_orchestrators(self, orchestrator_name=None) -> List: if not orchestrator_name: orchestrators = await self.surrealdb.query("SELECT * FROM orchestrator;") return orchestrators[0]['result'] else: orchestrator = await self.surrealdb.query("SELECT * FROM orchestrator WHERE id=$orchestrator_name;", {"orchestrator_name": orchestrator_name}) return orchestrator[0]['result']
async def list_personas(self, persona_name=None) ‑> List
-
Expand source code
async def list_personas(self, persona_name=None) -> List: if not persona_name: personas = await self.surrealdb.query("SELECT * FROM persona;") return personas[0]['result'] else: if not "persona:" in persona_name: persona_name = f"persona:{persona_name}" persona = await self.surrealdb.query("SELECT * FROM persona WHERE id=$persona_name;", {"persona_name": persona_name}) return persona[0]['result']
async def list_servers(self) ‑> List
-
Expand source code
async def list_servers(self) -> List: servers = await self.surrealdb.query("SELECT * FROM server;") return servers[0]['result']
async def list_tools(self, tool_name=None) ‑> List
-
Expand source code
async def list_tools(self, tool_name=None) -> List: if not tool_name: tools = await self.surrealdb.query("SELECT * FROM tool;") return tools[0]['result'] else: tool = await self.surrealdb.query("SELECT * FROM tool WHERE name=$tool_name;", {"tool_name": tool_name}) return tool[0]['result']
async def signin(self, username: str, password: str) ‑> Tuple[bool, str | None, str | None]
-
Expand source code
async def signin(self, username: str, password: str) -> Tuple[bool, Optional[str], Optional[str]]: logger.info(f"Attempting authentication for user: {username}") try: user = await self.surrealdb.signin({ "NS": self.ns, "DB": self.db, "AC": "user", "username": username, "password": password, }) self.user_id = self._decode_token(user) local_public_key = self.public_key or get_public_key(os.getenv("PRIVATE_KEY")) hub_public_key = self.user_id.split(":")[1] if local_public_key != hub_public_key: logger.error("Public key mismatch", extra={ "local_key": local_public_key, "hub_key": hub_public_key }) raise Exception( "Public key mismatch. Please verify your private key in .env file. " f"Local key: {local_public_key}, Hub key: {hub_public_key}" ) self.token = user self.is_authenticated = True logger.info(f"Successfully authenticated user: {username}") return True, user, self.user_id except Exception as e: raise Exception(f"Authentication failed: {str(e)}. Please ensure you have set HUB_URL, HUB_USERNAME, and HUB_PASSWORD in the .env file, and that you have run `naptha signup` for this user.")
async def signup(self, username: str, password: str, public_key: str) ‑> Tuple[bool, str | None, str | None]
-
Expand source code
async def signup(self, username: str, password: str, public_key: str) -> Tuple[bool, Optional[str], Optional[str]]: user = await self.surrealdb.signup({ "NS": self.ns, "DB": self.db, "AC": "user", "name": username, "username": username, "password": password, "public_key": public_key, }) if not user: return False, None, None self.user_id = self._decode_token(user) return True, user, self.user_id
async def update_agent(self, agent_config: Dict) ‑> Tuple[bool, Dict | None]
-
Expand source code
async def update_agent(self, agent_config: Dict) -> Tuple[bool, Optional[Dict]]: return await self.surrealdb.update("agent", agent_config)