Module naptha_sdk.client.node

Functions

def prepare_files(file_path: str) ‑> List[Tuple[str, str]]
Expand source code
def prepare_files(file_path: str) -> List[Tuple[str, str]]:
    """Prepare files for upload."""
    if os.path.isdir(file_path):
        with tempfile.NamedTemporaryFile(delete=False, suffix='.zip') as tmpfile:
            zip_directory(file_path, tmpfile.name)
            tmpfile.close()  
            file = {'file': open(tmpfile.name, 'rb')}
    else:
        file = {'file': open(file_path, 'rb')}
    
    return file

Prepare files for upload.

def zip_directory(file_path, zip_path)
Expand source code
def zip_directory(file_path, zip_path):
    """Utility function to zip the content of a directory while preserving the folder structure."""
    with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
        for root, dirs, files in os.walk(file_path):
            for file in files:
                file_path = os.path.join(root, file)
                arcname = os.path.relpath(file_path, start=os.path.abspath(file_path).split(os.sep)[0])
                zipf.write(file_path, arcname)

Utility function to zip the content of a directory while preserving the folder structure.

Classes

class NodeClient (node: NodeConfig)
Expand source code
class NodeClient:
    def __init__(self, node: NodeConfig):
        self.node = node
        self.server_type = node.server_type
        self.node_url = self.node_to_url(node)
        self.connections = {}

        self.access_token = None
        logger.info(f"Node URL: {self.node_url}")

    def node_to_url(self, node: NodeConfig):
        ports = node.ports
        if len(ports) == 0:
            raise ValueError("No ports found for node")
        if node.server_type == 'ws':
            return f"ws://{node.ip}:{random.choice(ports)}"
        elif node.server_type == 'grpc':
            return f"{node.ip}:{random.choice(ports)}"
        else:
            raise ValueError("Invalid server type. Server type must be either 'ws' or 'grpc'.")

    async def check_user(self, user_input: Dict[str, str]) -> Dict[str, Any]:
        if self.node.server_type == 'ws':
            return await self.check_user_ws(user_input)
        elif self.node.server_type == 'grpc':
            return await self.check_user_grpc(user_input)
        else:
            raise ValueError("Invalid server type. Server type must be either 'ws' or 'grpc'.")

    async def check_user_ws(self, user_input: Dict[str, str]):
        response = await self.send_receive_ws(user_input, "user/check")
        logger.info(f"Check user response: {response}")
        return response

    async def check_user_grpc(self, user_input: Dict[str, str]):
        async with grpc.aio.insecure_channel(self.node_url) as channel:
            stub = grpc_server_pb2_grpc.GrpcServerStub(channel)
            request = grpc_server_pb2.CheckUserRequest(
                user_id=user_input.get('user_id', ''),
                public_key=user_input.get('public_key', '')
            )
            response = await stub.CheckUser(request)
            logger.info(f"Check user response: {response}")
            return MessageToDict(response, preserving_proto_field_name=True)

    async def register_user(self, user_input: Dict[str, str]) -> Dict[str, Any]:
        if self.node.server_type == 'ws':
            return await self.register_user_ws(user_input)
        elif self.node.server_type == 'grpc':
            return await self.register_user_grpc(user_input)
        else:
            raise ValueError("Invalid server type. Server type must be either 'ws' or 'grpc'.")
        
    async def register_user_ws(self, user_input: Dict[str, str]):
        response = await self.send_receive_ws(user_input, "user/register")
        logger.info(f"Register user response: {response}")
        return response

    async def register_user_grpc(self, user_input: Dict[str, str]):
        async with grpc.aio.insecure_channel(self.node_url) as channel:
            stub = grpc_server_pb2_grpc.GrpcServerStub(channel)
            request = grpc_server_pb2.RegisterUserRequest(
                public_key=user_input.get('public_key', '')
            )
            response = await stub.RegisterUser(request)
            return {
                'id': response.id,
                'public_key': response.public_key,
            }

    async def run_module(self, module_type: str, run_input: Union[AgentRunInput, KBRunInput, ToolRunInput, EnvironmentRunInput]):
        if self.node.server_type == 'ws':
            return await self.run_module_ws(module_type, run_input)
        elif self.node.server_type == 'grpc':
            return await self.run_module_grpc(module_type, run_input)
        else:
            raise ValueError("Invalid server type. Server type must be either 'ws' or 'grpc'.")

    async def run_module_ws(self, module_type: str, run_input):
        response = await self.send_receive_ws(run_input, f"{module_type}/run")
        
        output_types = {
            "agent": AgentRun,
            "kb": KBRun,
            "tool": ToolRun,
            "environment": EnvironmentRun
        }
        
        if response['status'] == 'success':
            return output_types[module_type](**response['data'])
        else:
            logger.error(f"Error running {module_type}: {response['message']}")
            logger.error(f"Full traceback: {traceback.format_exc()}")
            raise Exception(response['message'])

    async def run_module_grpc(self, module_type: str, run_input):
        async with grpc.aio.insecure_channel(self.node_url) as channel:
            stub = grpc_server_pb2_grpc.GrpcServerStub(channel)

            # Convert inputs to Struct
            input_struct = struct_pb2.Struct()
            if run_input.inputs:
                if isinstance(run_input.inputs, dict):
                    input_data = run_input.inputs.dict() if hasattr(run_input.inputs, 'dict') else run_input.inputs
                    input_struct.update(input_data)

            # Create node config
            node_config = grpc_server_pb2.NodeConfigUser(
                ip=run_input.deployment.node.ip,
                http_port=run_input.deployment.node.http_port,
                server_type=run_input.deployment.node.server_type
            )

            # Create module
            module = grpc_server_pb2.Module(
                id=run_input.deployment.module.get('id', ''),
                name=run_input.deployment.module.get('name', ''),
                description=run_input.deployment.module.get('description', ''),
                author=run_input.deployment.module.get('author', ''),
                module_url=run_input.deployment.module.get('module_url', ''),
                module_type=module_type,
                module_version=run_input.deployment.module.get('module_version', ''),
                module_entrypoint=run_input.deployment.module.get('module_entrypoint', '')
            )

            # Create config struct
            config_struct = struct_pb2.Struct()
            if run_input.deployment.config:
                if isinstance(run_input.deployment.config, dict):
                    config_struct.update(run_input.deployment.config)
                else:
                    config_struct.update(run_input.deployment.config.dict())

            # Create deployment based on module type
            deployment_classes = {
                "agent": grpc_server_pb2.AgentDeployment,
                "kb": grpc_server_pb2.BaseDeployment,
                "tool": grpc_server_pb2.ToolDeployment,
                "environment": grpc_server_pb2.BaseDeployment
            }
            
            DeploymentClass = deployment_classes[module_type]
            deployment = DeploymentClass(
                node_input=node_config,
                name=run_input.deployment.name,
                module=module,
                config=config_struct,
                initialized=False
            )

            # Create request with appropriate deployment field
            request_args = {
                "module_type": module_type,
                "consumer_id": run_input.consumer_id,
                "inputs": input_struct,
                f"{module_type}_deployment": deployment
            }
            
            request = grpc_server_pb2.ModuleRunRequest(**request_args)

            final_response = None
            async for response in stub.RunModule(request):
                final_response = response
                logger.info(f"Got response: {response}")

            output_types = {
                "agent": AgentRun,
                "kb": KBRun,
                "tool": ToolRun,
                "environment": EnvironmentRun
            }

            return output_types[module_type](
                consumer_id=run_input.consumer_id,
                inputs=run_input.inputs,
                deployment=run_input.deployment,
                orchestrator_runs=[],
                status=final_response.status,
                error=final_response.error,
                id=final_response.id,
                results=list(final_response.results),
                error_message=final_response.error_message,
                created_time=final_response.created_time,
                start_processing_time=final_response.start_processing_time,
                completed_time=final_response.completed_time,
                duration=final_response.duration
            )
    
    async def connect_ws(self, action: str):
        client_id = str(uuid.uuid4())
        full_url = f"{self.node_url}/ws/{action}/{client_id}"
        logger.info(f"Connecting to WebSocket: {full_url}")
        ws = await websockets.connect(full_url)
        self.connections[client_id] = ws
        self.current_client_id = client_id
        return client_id

    async def disconnect_ws(self, client_id: str):
        if client_id in self.connections:
            await self.connections[client_id].close()
            del self.connections[client_id]
        if self.current_client_id == client_id:
            self.current_client_id = None

    async def send_receive_ws(self, data, action: str):
        client_id = await self.connect_ws(action)
        
        try:
            if isinstance(data, AgentRunInput) or isinstance(data, OrchestratorRunInput):
                message = data.model_dump()
            else:
                message = data
            await self.connections[client_id].send(json.dumps(message))
            
            response = await self.connections[client_id].recv()
            return json.loads(response)
        finally:
            await self.disconnect_ws(client_id)

Methods

async def check_user(self, user_input: Dict[str, str]) ‑> Dict[str, Any]
Expand source code
async def check_user(self, user_input: Dict[str, str]) -> Dict[str, Any]:
    if self.node.server_type == 'ws':
        return await self.check_user_ws(user_input)
    elif self.node.server_type == 'grpc':
        return await self.check_user_grpc(user_input)
    else:
        raise ValueError("Invalid server type. Server type must be either 'ws' or 'grpc'.")
async def check_user_grpc(self, user_input: Dict[str, str])
Expand source code
async def check_user_grpc(self, user_input: Dict[str, str]):
    async with grpc.aio.insecure_channel(self.node_url) as channel:
        stub = grpc_server_pb2_grpc.GrpcServerStub(channel)
        request = grpc_server_pb2.CheckUserRequest(
            user_id=user_input.get('user_id', ''),
            public_key=user_input.get('public_key', '')
        )
        response = await stub.CheckUser(request)
        logger.info(f"Check user response: {response}")
        return MessageToDict(response, preserving_proto_field_name=True)
async def check_user_ws(self, user_input: Dict[str, str])
Expand source code
async def check_user_ws(self, user_input: Dict[str, str]):
    response = await self.send_receive_ws(user_input, "user/check")
    logger.info(f"Check user response: {response}")
    return response
async def connect_ws(self, action: str)
Expand source code
async def connect_ws(self, action: str):
    client_id = str(uuid.uuid4())
    full_url = f"{self.node_url}/ws/{action}/{client_id}"
    logger.info(f"Connecting to WebSocket: {full_url}")
    ws = await websockets.connect(full_url)
    self.connections[client_id] = ws
    self.current_client_id = client_id
    return client_id
async def disconnect_ws(self, client_id: str)
Expand source code
async def disconnect_ws(self, client_id: str):
    if client_id in self.connections:
        await self.connections[client_id].close()
        del self.connections[client_id]
    if self.current_client_id == client_id:
        self.current_client_id = None
def node_to_url(self,
node: NodeConfig)
Expand source code
def node_to_url(self, node: NodeConfig):
    ports = node.ports
    if len(ports) == 0:
        raise ValueError("No ports found for node")
    if node.server_type == 'ws':
        return f"ws://{node.ip}:{random.choice(ports)}"
    elif node.server_type == 'grpc':
        return f"{node.ip}:{random.choice(ports)}"
    else:
        raise ValueError("Invalid server type. Server type must be either 'ws' or 'grpc'.")
async def register_user(self, user_input: Dict[str, str]) ‑> Dict[str, Any]
Expand source code
async def register_user(self, user_input: Dict[str, str]) -> Dict[str, Any]:
    if self.node.server_type == 'ws':
        return await self.register_user_ws(user_input)
    elif self.node.server_type == 'grpc':
        return await self.register_user_grpc(user_input)
    else:
        raise ValueError("Invalid server type. Server type must be either 'ws' or 'grpc'.")
async def register_user_grpc(self, user_input: Dict[str, str])
Expand source code
async def register_user_grpc(self, user_input: Dict[str, str]):
    async with grpc.aio.insecure_channel(self.node_url) as channel:
        stub = grpc_server_pb2_grpc.GrpcServerStub(channel)
        request = grpc_server_pb2.RegisterUserRequest(
            public_key=user_input.get('public_key', '')
        )
        response = await stub.RegisterUser(request)
        return {
            'id': response.id,
            'public_key': response.public_key,
        }
async def register_user_ws(self, user_input: Dict[str, str])
Expand source code
async def register_user_ws(self, user_input: Dict[str, str]):
    response = await self.send_receive_ws(user_input, "user/register")
    logger.info(f"Register user response: {response}")
    return response
async def run_module(self,
module_type: str,
run_input: AgentRunInput | KBRunInput | ToolRunInput | EnvironmentRunInput)
Expand source code
async def run_module(self, module_type: str, run_input: Union[AgentRunInput, KBRunInput, ToolRunInput, EnvironmentRunInput]):
    if self.node.server_type == 'ws':
        return await self.run_module_ws(module_type, run_input)
    elif self.node.server_type == 'grpc':
        return await self.run_module_grpc(module_type, run_input)
    else:
        raise ValueError("Invalid server type. Server type must be either 'ws' or 'grpc'.")
async def run_module_grpc(self, module_type: str, run_input)
Expand source code
async def run_module_grpc(self, module_type: str, run_input):
    async with grpc.aio.insecure_channel(self.node_url) as channel:
        stub = grpc_server_pb2_grpc.GrpcServerStub(channel)

        # Convert inputs to Struct
        input_struct = struct_pb2.Struct()
        if run_input.inputs:
            if isinstance(run_input.inputs, dict):
                input_data = run_input.inputs.dict() if hasattr(run_input.inputs, 'dict') else run_input.inputs
                input_struct.update(input_data)

        # Create node config
        node_config = grpc_server_pb2.NodeConfigUser(
            ip=run_input.deployment.node.ip,
            http_port=run_input.deployment.node.http_port,
            server_type=run_input.deployment.node.server_type
        )

        # Create module
        module = grpc_server_pb2.Module(
            id=run_input.deployment.module.get('id', ''),
            name=run_input.deployment.module.get('name', ''),
            description=run_input.deployment.module.get('description', ''),
            author=run_input.deployment.module.get('author', ''),
            module_url=run_input.deployment.module.get('module_url', ''),
            module_type=module_type,
            module_version=run_input.deployment.module.get('module_version', ''),
            module_entrypoint=run_input.deployment.module.get('module_entrypoint', '')
        )

        # Create config struct
        config_struct = struct_pb2.Struct()
        if run_input.deployment.config:
            if isinstance(run_input.deployment.config, dict):
                config_struct.update(run_input.deployment.config)
            else:
                config_struct.update(run_input.deployment.config.dict())

        # Create deployment based on module type
        deployment_classes = {
            "agent": grpc_server_pb2.AgentDeployment,
            "kb": grpc_server_pb2.BaseDeployment,
            "tool": grpc_server_pb2.ToolDeployment,
            "environment": grpc_server_pb2.BaseDeployment
        }
        
        DeploymentClass = deployment_classes[module_type]
        deployment = DeploymentClass(
            node_input=node_config,
            name=run_input.deployment.name,
            module=module,
            config=config_struct,
            initialized=False
        )

        # Create request with appropriate deployment field
        request_args = {
            "module_type": module_type,
            "consumer_id": run_input.consumer_id,
            "inputs": input_struct,
            f"{module_type}_deployment": deployment
        }
        
        request = grpc_server_pb2.ModuleRunRequest(**request_args)

        final_response = None
        async for response in stub.RunModule(request):
            final_response = response
            logger.info(f"Got response: {response}")

        output_types = {
            "agent": AgentRun,
            "kb": KBRun,
            "tool": ToolRun,
            "environment": EnvironmentRun
        }

        return output_types[module_type](
            consumer_id=run_input.consumer_id,
            inputs=run_input.inputs,
            deployment=run_input.deployment,
            orchestrator_runs=[],
            status=final_response.status,
            error=final_response.error,
            id=final_response.id,
            results=list(final_response.results),
            error_message=final_response.error_message,
            created_time=final_response.created_time,
            start_processing_time=final_response.start_processing_time,
            completed_time=final_response.completed_time,
            duration=final_response.duration
        )
async def run_module_ws(self, module_type: str, run_input)
Expand source code
async def run_module_ws(self, module_type: str, run_input):
    response = await self.send_receive_ws(run_input, f"{module_type}/run")
    
    output_types = {
        "agent": AgentRun,
        "kb": KBRun,
        "tool": ToolRun,
        "environment": EnvironmentRun
    }
    
    if response['status'] == 'success':
        return output_types[module_type](**response['data'])
    else:
        logger.error(f"Error running {module_type}: {response['message']}")
        logger.error(f"Full traceback: {traceback.format_exc()}")
        raise Exception(response['message'])
async def send_receive_ws(self, data, action: str)
Expand source code
async def send_receive_ws(self, data, action: str):
    client_id = await self.connect_ws(action)
    
    try:
        if isinstance(data, AgentRunInput) or isinstance(data, OrchestratorRunInput):
            message = data.model_dump()
        else:
            message = data
        await self.connections[client_id].send(json.dumps(message))
        
        response = await self.connections[client_id].recv()
        return json.loads(response)
    finally:
        await self.disconnect_ws(client_id)
class UserClient (node: NodeConfigUser)
Expand source code
class UserClient:
    def __init__(self, node: NodeConfigUser):
        self.node = node
        self.node_url = node_to_url(node)
        self.connections = {}
        
        self.access_token = None
        logger.info(f"Node URL: {self.node_url}")

    async def create(self, module_type: str,
                     module_request: Union[AgentDeployment, EnvironmentDeployment, KBDeployment, OrchestratorDeployment, ToolDeployment]):
        """Generic method to create either an agent, orchestrator, environment, tool, kb or memory.

        Args:
            module_type: Either agent, orchestrator, environment, tool, kb or memory
            module_request: Either AgentDeployment, EnvironmentDeployment, OrchestratorDeployment, ToolDeployment, KBDeployment or MemoryDeployment
        """

        print(f"Creating {module_type}...")

        endpoint = f"{self.node_url}/{module_type}/create"
        try:
            async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
                headers = {
                    'Content-Type': 'application/json',
                    'Authorization': f'Bearer {self.access_token}',
                }
                response = await client.post(
                    endpoint,
                    json=module_request.model_dump(),
                    headers=headers
                )
                response.raise_for_status()

                # Convert response to appropriate return type
                return response.json()
        except HTTPStatusError as e:
            logger.info(f"HTTP error occurred: {e}")
            raise
        except RemoteProtocolError as e:
            error_msg = f"Run {module_type} failed to connect to the server at {self.node_url}. Please check if the server URL is correct and the server is running. Error details: {str(e)}"
            logger.error(error_msg)
            raise
        except Exception as e:
            print(f"An unexpected error occurred: {e}")
            raise

    async def _run_and_poll(self, run_input: Union[AgentRunInput, EnvironmentRunInput, OrchestratorRunInput, KBRunInput, ToolRunInput, Dict], module_type: str) -> Union[AgentRun, EnvironmentRun, OrchestratorRun, KBRun, ToolRun, Dict]:
        """Generic method to run and poll either an agent, orchestrator, environment, tool or KB.
        
        Args:
            run_input: Either AgentRunInput, OrchestratorRunInput, EnvironmentRunInput, KBRunInput, ToolRunInput or Dict
            module_type: Either 'agent', 'orchestrator', 'environment', 'tool' or 'kb'
        """
        print(f"Run input: {run_input}")
        print(f"Module type: {module_type}")
        # Start the run
        run = await getattr(self, f'run_{module_type}')(run_input)
        print(f"{module_type.title()} run started: {run}")

        current_results_len = 0
        while True:
            run = await getattr(self, f'check_{module_type}_run')(run)

            output = f"{run.status} {getattr(run, f'deployment').module['module_type']} {getattr(run, f'deployment').module['name']}"
            print(output)

            results = run.results
            status = run.status

            if len(results) > current_results_len:
                print("Output: ", results[-1])
                current_results_len += 1

            if status in ['completed', 'error']:
                break

            time.sleep(3)

        if status == 'completed':
            print(results)
        else:
            error_msg = run.error_message
            print(error_msg)
        return run

    async def run_agent_and_poll(self, agent_run_input: AgentRunInput) -> AgentRun:
        """Run an agent module and poll for results until completion."""
        return await self._run_and_poll(agent_run_input, 'agent')

    async def run_tool_and_poll(self, tool_run_input: ToolRunInput) -> ToolRun:
        """Run a tool module and poll for results until completion."""

        return await self._run_and_poll(tool_run_input, 'tool')

    async def run_orchestrator_and_poll(self, orchestrator_run_input: OrchestratorRunInput) -> OrchestratorRun:
        """Run an orchestrator module and poll for results until completion."""
        return await self._run_and_poll(orchestrator_run_input, 'orchestrator')

    async def run_environment_and_poll(self, environment_input: EnvironmentRunInput) -> EnvironmentRun:
        """Run an environment module and poll for results until completion."""
        return await self._run_and_poll(environment_input, 'environment')
    
    async def run_kb_and_poll(self, kb_input: KBDeployment) -> KBDeployment:
        """Run a knowledge base module and poll for results until completion."""
        return await self._run_and_poll(kb_input, 'kb')

    async def run_memory_and_poll(self, memory_input: MemoryDeployment) -> MemoryDeployment:
        """Run a memory module and poll for results until completion."""
        return await self._run_and_poll(memory_input, 'memory')

    async def check_user(self, user_input: Dict[str, Any]) -> Dict[str, Any]:
        """
        Check if a user exists on a node
        """
        endpoint = self.node_url + "/user/check"
        try:
            async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
                headers = {
                    'Content-Type': 'application/json', 
                }
                response = await client.post(
                    endpoint, 
                    json=user_input,
                    headers=headers
                )
                response.raise_for_status()
            return json.loads(response.text)
        except HTTPStatusError as e:
            logger.info(f"HTTP error occurred: {e}")
            raise  
        except RemoteProtocolError as e:
            error_msg = f"Check user failed to connect to the server at {self.node_url}. Please check if the server URL is correct and the server is running. Error details: {str(e)}"
            logger.info(error_msg)
            raise 
        except Exception as e:
            logger.info(f"An unexpected error occurred: {e}")
            raise

    async def register_user(self, user_input: Dict[str, Any]) -> Dict[str, Any]:
        """
        Register a user on a node
        """
        endpoint = self.node_url + "/user/register"
        try:
            async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
                headers = {
                    'Content-Type': 'application/json', 
                }
                response = await client.post(
                    endpoint, 
                    json=user_input,
                    headers=headers
                )
                response.raise_for_status()
            return json.loads(response.text)
        except HTTPStatusError as e:
            logger.info(f"HTTP error occurred: {e}")
            raise  
        except RemoteProtocolError as e:
            error_msg = f"Register user failed to connect to the server at {self.node_url}. Please check if the server URL is correct and the server is running. Error details: {str(e)}"
            logger.error(error_msg)
            raise 
        except Exception as e:
            logger.info(f"An unexpected error occurred: {e}")
            raise

    async def _run_module(self, run_input: Union[AgentRunInput, OrchestratorRunInput, EnvironmentRunInput, ToolRunInput], module_type: str) -> Union[AgentRun, OrchestratorRun, EnvironmentRun, ToolRun]:
        """
        Generic method to run either an agent, orchestrator, environment, or tool on a node
        
        Args:
            run_input: Either AgentRunInput, OrchestratorRunInput, EnvironmentRunInput, or ToolRunInput
            module_type: Either 'agent', 'orchestrator', 'environment', or 'tool'
        """
        print(f"Running {module_type}...")
        print(f"Run input: {run_input}")
        print(f"Node URL: {self.node_url}")

        endpoint = f"{self.node_url}/{module_type}/run"
        
        # Convert dict to appropriate input type if needed
        input_class = {
            'agent': AgentRunInput,
            'orchestrator': OrchestratorRunInput,
            'environment': EnvironmentRunInput,
            'kb': KBRunInput,
            'memory': MemoryRunInput,
            'tool': ToolRunInput
        }[module_type]
        
        if isinstance(run_input, dict):
            run_input = input_class(**run_input)

        try:
            async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
                headers = {
                    'Content-Type': 'application/json',
                    'Authorization': f'Bearer {self.access_token}',
                }
                response = await client.post(
                    endpoint,
                    json=run_input.model_dict(),
                    headers=headers
                )

                # Try to get error details even for error responses
                if response.status_code >= 400:
                    error_detail = response.json() if response.text else str(response)
                    logger.error(f"Server error response: {error_detail}")
                    raise Exception(f"Server returned error response: {error_detail}")
                    
                response.raise_for_status()
                
                # Convert response to appropriate return type
                return_class = {
                    'agent': AgentRun,
                    'orchestrator': OrchestratorRun,
                    'environment': EnvironmentRun,
                    'kb': KBRun,
                    'memory': MemoryRun,
                    'tool': ToolRun
                }[module_type]
                return return_class(**json.loads(response.text))
        except HTTPStatusError as e:
            logger.info(f"HTTP error occurred: {e}")
            raise
        except RemoteProtocolError as e:
            error_msg = f"Run {module_type} failed to connect to the server at {self.node_url}. Please check if the server URL is correct and the server is running. Error details: {str(e)}"
            logger.error(error_msg)
            raise
        except Exception as e:
            print(f"An unexpected error occurred: {e}")
            raise

    async def run_inference(self, inference_input: Union[ChatCompletionRequest, Dict]) -> ModelResponse:
        """
        Run inference on a node
        
        Args:
            inference_input: The inference input to run inference on
        """
        if isinstance(inference_input, dict):
            inference_input = ChatCompletionRequest(**inference_input)

        endpoint = f"{self.node_url}/inference/chat"

        try:
            async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
                headers = {
                    'Content-Type': 'application/json',
                    'Authorization': f'Bearer {self.access_token}',
                }
                response = await client.post(
                    endpoint,
                    json=inference_input.model_dump(),
                    headers=headers
                )
                print("Response: ", response.text)
                response.raise_for_status()
                return ModelResponse(**json.loads(response.text))
        except HTTPStatusError as e:
            logger.info(f"HTTP error occurred: {e}")
            raise
        except RemoteProtocolError as e:
            error_msg = f"Inference failed to connect to the server at {self.node_url}. Please check if the server URL is correct and the server is running. Error details: {str(e)}"
            logger.error(error_msg)
            raise
        except Exception as e:
            print(f"An unexpected error occurred: {e}")
            raise

    async def run_agent(self, agent_run_input: AgentRunInput) -> AgentRun:
        """Run an agent module on a node"""
        return await self._run_module(agent_run_input, 'agent')

    async def run_tool(self, tool_run_input: ToolRunInput) -> ToolRun:
        """Run a tool module on a node"""
        return await self._run_module(tool_run_input, 'tool')

    async def run_orchestrator(self, orchestrator_run_input: OrchestratorRunInput) -> OrchestratorRun:
        """Run an orchestrator module on a node"""
        return await self._run_module(orchestrator_run_input, 'orchestrator')
    
    async def run_environment(self, environment_run_input: EnvironmentRunInput) -> EnvironmentRun:
        """Run an environment module on a node"""
        return await self._run_module(environment_run_input, 'environment')

    async def run_kb(self, kb_run_input: KBRunInput) -> KBRun:
        """Run a knowledge base module on a node"""
        return await self._run_module(kb_run_input, 'kb')

    async def run_memory(self, memory_run_input: MemoryRunInput) -> MemoryRun:
        """Run a memory module on a node"""
        return await self._run_module(memory_run_input, 'memory')

    async def check_run(
        self, 
        module_run: Union[AgentRun, OrchestratorRun, EnvironmentRun, KBRun, MemoryRun, ToolRun], 
        module_type: str
    ) -> Union[AgentRun, OrchestratorRun, EnvironmentRun, KBRun, MemoryRun, ToolRun]:
        """Generic method to check the status of a module run.
        
        Args:
            module_run: Either AgentRun, OrchestratorRun, EnvironmentRun, ToolRun, KBRun or MemoryRun object
            module_type: Either 'agent', 'orchestrator', 'environment', 'tool', 'kb' or 'memory'
        """
        try:
            async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
                response = await client.post(
                    f"{self.node_url}/{module_type}/check", 
                    json=module_run.model_dump()
                )
                response.raise_for_status()
            
            return_class = {
                'agent': AgentRun,
                'orchestrator': OrchestratorRun,
                'environment': EnvironmentRun,
                'kb': KBRun,
                'memory': MemoryRun,
                'tool': ToolRun
            }[module_type]
            return return_class(**json.loads(response.text))
        except HTTPStatusError as e:
            logger.info(f"HTTP error occurred: {e}")
            raise  
        except Exception as e:
            logger.info(f"An unexpected error occurred: {e}")

    # Update existing methods to use the new generic one
    async def check_agent_run(self, agent_run: AgentRun) -> AgentRun:
        return await self.check_run(agent_run, 'agent')

    async def check_tool_run(self, tool_run: ToolRun) -> ToolRun:
        return await self.check_run(tool_run, 'tool')

    async def check_orchestrator_run(self, orchestrator_run: OrchestratorRun) -> OrchestratorRun:
        return await self.check_run(orchestrator_run, 'orchestrator')

    async def check_environment_run(self, environment_run: EnvironmentRun) -> EnvironmentRun:
        return await self.check_run(environment_run, 'environment')

    async def check_kb_run(self, kb_run: KBRun) -> KBRun:
        return await self.check_run(kb_run, 'kb')

    async def check_memory_run(self, memory_run: MemoryRun) -> MemoryRun:
        return await self.check_run(memory_run, 'memory')

    async def create_agent_run(self, agent_run_input: AgentRunInput) -> AgentRun:
        try:
            async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
                response = await client.post(
                    f"{self.node_url}/monitor/create_agent_run", json=agent_run_input.model_dump()
                )
                response.raise_for_status()
            return AgentRun(**json.loads(response.text))
        except HTTPStatusError as e:
            logger.info(f"HTTP error occurred: {e}")
            raise  
        except Exception as e:
            logger.info(f"An unexpected error occurred: {e}")
            logger.info(f"Full traceback: {traceback.format_exc()}")

    async def update_agent_run(self, agent_run: AgentRun):
        try:
            async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
                response = await client.post(
                    f"{self.node_url}/monitor/update_agent_run", json=agent_run.model_dump()
                )
                response.raise_for_status()
            return AgentRun(**json.loads(response.text))
        except HTTPStatusError as e:
            logger.info(f"HTTP error occurred: {e}")
            raise  
        except Exception as e:
            print(f"An unexpected error occurred: {e}")
            error_details = traceback.format_exc()
            print(f"Full traceback: {error_details}")

    async def read_storage(self, agent_run_id: str, output_dir: str, ipfs: bool = False) -> str:
        print("Reading from storage...")
        try:
            endpoint = f"{self.node_url}/{'storage/read_ipfs' if ipfs else 'storage/read'}/{agent_run_id}"

            async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
                response = await client.get(endpoint)
                response.raise_for_status()
                storage = response.content  
                print("Retrieved storage.")
            
                # Temporary file handling
                temp_file_name = None
                with tempfile.NamedTemporaryFile(delete=False, mode='wb') as tmp_file:
                    tmp_file.write(storage)  # storage is a bytes-like object
                    temp_file_name = tmp_file.name
        
                # Ensure output directory exists
                output_path = Path(output_dir)
                output_path.mkdir(parents=True, exist_ok=True)
        
                # Check if the file is a zip file and extract if true
                if zipfile.is_zipfile(temp_file_name):
                    with zipfile.ZipFile(temp_file_name, 'r') as zip_ref:
                        zip_ref.extractall(output_path)
                    print(f"Extracted storage to {output_dir}.")
                else:
                    shutil.copy(temp_file_name, output_path)
                    print(f"Copied storage to {output_dir}.")

                # Cleanup temporary file
                Path(temp_file_name).unlink(missing_ok=True)
        
                return output_dir         
        except HTTPStatusError as e:
            logger.info(f"HTTP error occurred: {e}")
            raise  
        except Exception as e:
            logger.info(f"An unexpected error occurred: {e}")
            logger.info(f"Full traceback: {traceback.format_exc()}")

    async def write_storage(self, storage_input: str, ipfs: bool = False, publish_to_ipns: bool = False, update_ipns_name: str = None) -> Dict[str, Any]:
        """Write storage to the node."""
        print("Writing storage")
        try:
            file = prepare_files(storage_input)
            endpoint = f"{self.node_url}/storage/write_ipfs" if ipfs else f"{self.node_url}/storage/write"
            
            if update_ipns_name:
                publish_to_ipns = True

            data = {
                "publish_to_ipns": publish_to_ipns,
                "update_ipns_name": update_ipns_name
            }
            async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
                response = await client.post(
                    endpoint, 
                    files=file,
                    data=data,
                    timeout=600
                )
                response.raise_for_status()
                return response.json()
        except HTTPStatusError as e:
            logger.info(f"HTTP error occurred: {e}")
            raise  
        except Exception as e:
            logger.info(f"An unexpected error occurred: {e}")
            logger.info(f"Full traceback: {traceback.format_exc()}")
            return {}

    async def create_table(self, table_name: str, schema: Dict[str, Dict[str, Any]]) -> Dict[str, Any]:
        async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
            response = await client.post(
                f"{self.node_url}/local-db/create-table",
                json={"table_name": table_name, "schema": schema}
            )
            response.raise_for_status()
            return response.json()

    async def add_row(self, table_name: str, data: Dict[str, Any], schema: Optional[Dict[str, Dict[str, Any]]] = None) -> Dict[str, Any]:
        async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
            response = await client.post(
                f"{self.node_url}/local-db/add-row",
                json={"table_name": table_name, "data": data, "schema": schema}
            )
            response.raise_for_status()
            return response.json()

    async def update_row(self, table_name: str, data: Dict[str, Any], condition: Dict[str, Any], schema: Optional[Dict[str, Dict[str, Any]]] = None) -> Dict[str, Any]:
        async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
            response = await client.post(
                f"{self.node_url}/local-db/update-row",
                json={
                    "table_name": table_name,
                    "data": data,
                    "condition": condition,
                    "schema": schema
                }
            )
            response.raise_for_status()
            return response.json()

    async def delete_row(self, table_name: str, condition: Dict[str, Any]) -> Dict[str, Any]:
        async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
            response = await client.post(
                f"{self.node_url}/local-db/delete-row",
                json={"table_name": table_name, "condition": condition}
            )
            response.raise_for_status()
            return response.json()

    async def list_tables(self) -> Dict[str, Any]:
        async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
            response = await client.get(f"{self.node_url}/local-db/tables")
            response.raise_for_status()
            return response.json()

    async def get_table_schema(self, table_name: str) -> Dict[str, Any]:
        async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
            response = await client.get(f"{self.node_url}/local-db/table/{table_name}")
            response.raise_for_status()
            return response.json()

    async def query_table(self, table_name: str, columns: Optional[str] = None, condition: Optional[Union[str, Dict]] = None, order_by: Optional[str] = None, limit: Optional[int] = None) -> Dict[str, Any]:
        params = {"table_name": table_name}
        if columns:
            params["columns"] = columns
        if condition:
            params["condition"] = json.dumps(condition) if isinstance(condition, dict) else condition
        if order_by:
            params["order_by"] = order_by
        if limit:
            params["limit"] = limit

        async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
            response = await client.get(
                f"{self.node_url}/local-db/table/{table_name}/rows",
                params=params
            )
            response.raise_for_status()
            return response.json()

    async def vector_search(
        self,
        table_name: str,
        vector_column: str,
        query_vector: List[float],
        columns: Optional[List[str]] = None,
        top_k: int = 5,
        include_similarity: bool = True
    ) -> Dict[str, Any]:
        """
        Perform a pgvector-based similarity search on a table's vector column.
        """
        payload = {
            "table_name": table_name,
            "vector_column": vector_column,
            "query_vector": query_vector,
            "columns": columns or ["text"],
            "top_k": top_k,
            "include_similarity": include_similarity,
        }
        async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
            response = await client.post(
                f"{self.node_url}/local-db/vector_search",
                json=payload
            )
            response.raise_for_status()
            return response.json()

Methods

async def add_row(self,
table_name: str,
data: Dict[str, Any],
schema: Dict[str, Dict[str, Any]] | None = None) ‑> Dict[str, Any]
Expand source code
async def add_row(self, table_name: str, data: Dict[str, Any], schema: Optional[Dict[str, Dict[str, Any]]] = None) -> Dict[str, Any]:
    async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
        response = await client.post(
            f"{self.node_url}/local-db/add-row",
            json={"table_name": table_name, "data": data, "schema": schema}
        )
        response.raise_for_status()
        return response.json()
async def check_agent_run(self,
agent_run: AgentRun) ‑> AgentRun
Expand source code
async def check_agent_run(self, agent_run: AgentRun) -> AgentRun:
    return await self.check_run(agent_run, 'agent')
async def check_environment_run(self,
environment_run: EnvironmentRun) ‑> EnvironmentRun
Expand source code
async def check_environment_run(self, environment_run: EnvironmentRun) -> EnvironmentRun:
    return await self.check_run(environment_run, 'environment')
async def check_kb_run(self,
kb_run: KBRun) ‑> KBRun
Expand source code
async def check_kb_run(self, kb_run: KBRun) -> KBRun:
    return await self.check_run(kb_run, 'kb')
async def check_memory_run(self,
memory_run: MemoryRun) ‑> MemoryRun
Expand source code
async def check_memory_run(self, memory_run: MemoryRun) -> MemoryRun:
    return await self.check_run(memory_run, 'memory')
async def check_orchestrator_run(self,
orchestrator_run: OrchestratorRun) ‑> OrchestratorRun
Expand source code
async def check_orchestrator_run(self, orchestrator_run: OrchestratorRun) -> OrchestratorRun:
    return await self.check_run(orchestrator_run, 'orchestrator')
async def check_run(self,
module_run: AgentRun | OrchestratorRun | EnvironmentRun | KBRun | MemoryRun | ToolRun,
module_type: str) ‑> AgentRun | OrchestratorRun | EnvironmentRun | KBRun | MemoryRun | ToolRun
Expand source code
async def check_run(
    self, 
    module_run: Union[AgentRun, OrchestratorRun, EnvironmentRun, KBRun, MemoryRun, ToolRun], 
    module_type: str
) -> Union[AgentRun, OrchestratorRun, EnvironmentRun, KBRun, MemoryRun, ToolRun]:
    """Generic method to check the status of a module run.
    
    Args:
        module_run: Either AgentRun, OrchestratorRun, EnvironmentRun, ToolRun, KBRun or MemoryRun object
        module_type: Either 'agent', 'orchestrator', 'environment', 'tool', 'kb' or 'memory'
    """
    try:
        async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
            response = await client.post(
                f"{self.node_url}/{module_type}/check", 
                json=module_run.model_dump()
            )
            response.raise_for_status()
        
        return_class = {
            'agent': AgentRun,
            'orchestrator': OrchestratorRun,
            'environment': EnvironmentRun,
            'kb': KBRun,
            'memory': MemoryRun,
            'tool': ToolRun
        }[module_type]
        return return_class(**json.loads(response.text))
    except HTTPStatusError as e:
        logger.info(f"HTTP error occurred: {e}")
        raise  
    except Exception as e:
        logger.info(f"An unexpected error occurred: {e}")

Generic method to check the status of a module run.

Args

module_run
Either AgentRun, OrchestratorRun, EnvironmentRun, ToolRun, KBRun or MemoryRun object
module_type
Either 'agent', 'orchestrator', 'environment', 'tool', 'kb' or 'memory'
async def check_tool_run(self,
tool_run: ToolRun) ‑> ToolRun
Expand source code
async def check_tool_run(self, tool_run: ToolRun) -> ToolRun:
    return await self.check_run(tool_run, 'tool')
async def check_user(self, user_input: Dict[str, Any]) ‑> Dict[str, Any]
Expand source code
async def check_user(self, user_input: Dict[str, Any]) -> Dict[str, Any]:
    """
    Check if a user exists on a node
    """
    endpoint = self.node_url + "/user/check"
    try:
        async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
            headers = {
                'Content-Type': 'application/json', 
            }
            response = await client.post(
                endpoint, 
                json=user_input,
                headers=headers
            )
            response.raise_for_status()
        return json.loads(response.text)
    except HTTPStatusError as e:
        logger.info(f"HTTP error occurred: {e}")
        raise  
    except RemoteProtocolError as e:
        error_msg = f"Check user failed to connect to the server at {self.node_url}. Please check if the server URL is correct and the server is running. Error details: {str(e)}"
        logger.info(error_msg)
        raise 
    except Exception as e:
        logger.info(f"An unexpected error occurred: {e}")
        raise

Check if a user exists on a node

async def create(self,
module_type: str,
module_request: AgentDeployment | EnvironmentDeployment | KBDeployment | OrchestratorDeployment | ToolDeployment)
Expand source code
async def create(self, module_type: str,
                 module_request: Union[AgentDeployment, EnvironmentDeployment, KBDeployment, OrchestratorDeployment, ToolDeployment]):
    """Generic method to create either an agent, orchestrator, environment, tool, kb or memory.

    Args:
        module_type: Either agent, orchestrator, environment, tool, kb or memory
        module_request: Either AgentDeployment, EnvironmentDeployment, OrchestratorDeployment, ToolDeployment, KBDeployment or MemoryDeployment
    """

    print(f"Creating {module_type}...")

    endpoint = f"{self.node_url}/{module_type}/create"
    try:
        async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
            headers = {
                'Content-Type': 'application/json',
                'Authorization': f'Bearer {self.access_token}',
            }
            response = await client.post(
                endpoint,
                json=module_request.model_dump(),
                headers=headers
            )
            response.raise_for_status()

            # Convert response to appropriate return type
            return response.json()
    except HTTPStatusError as e:
        logger.info(f"HTTP error occurred: {e}")
        raise
    except RemoteProtocolError as e:
        error_msg = f"Run {module_type} failed to connect to the server at {self.node_url}. Please check if the server URL is correct and the server is running. Error details: {str(e)}"
        logger.error(error_msg)
        raise
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        raise

Generic method to create either an agent, orchestrator, environment, tool, kb or memory.

Args

module_type
Either agent, orchestrator, environment, tool, kb or memory
module_request
Either AgentDeployment, EnvironmentDeployment, OrchestratorDeployment, ToolDeployment, KBDeployment or MemoryDeployment
async def create_agent_run(self,
agent_run_input: AgentRunInput) ‑> AgentRun
Expand source code
async def create_agent_run(self, agent_run_input: AgentRunInput) -> AgentRun:
    try:
        async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
            response = await client.post(
                f"{self.node_url}/monitor/create_agent_run", json=agent_run_input.model_dump()
            )
            response.raise_for_status()
        return AgentRun(**json.loads(response.text))
    except HTTPStatusError as e:
        logger.info(f"HTTP error occurred: {e}")
        raise  
    except Exception as e:
        logger.info(f"An unexpected error occurred: {e}")
        logger.info(f"Full traceback: {traceback.format_exc()}")
async def create_table(self, table_name: str, schema: Dict[str, Dict[str, Any]]) ‑> Dict[str, Any]
Expand source code
async def create_table(self, table_name: str, schema: Dict[str, Dict[str, Any]]) -> Dict[str, Any]:
    async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
        response = await client.post(
            f"{self.node_url}/local-db/create-table",
            json={"table_name": table_name, "schema": schema}
        )
        response.raise_for_status()
        return response.json()
async def delete_row(self, table_name: str, condition: Dict[str, Any]) ‑> Dict[str, Any]
Expand source code
async def delete_row(self, table_name: str, condition: Dict[str, Any]) -> Dict[str, Any]:
    async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
        response = await client.post(
            f"{self.node_url}/local-db/delete-row",
            json={"table_name": table_name, "condition": condition}
        )
        response.raise_for_status()
        return response.json()
async def get_table_schema(self, table_name: str) ‑> Dict[str, Any]
Expand source code
async def get_table_schema(self, table_name: str) -> Dict[str, Any]:
    async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
        response = await client.get(f"{self.node_url}/local-db/table/{table_name}")
        response.raise_for_status()
        return response.json()
async def list_tables(self) ‑> Dict[str, Any]
Expand source code
async def list_tables(self) -> Dict[str, Any]:
    async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
        response = await client.get(f"{self.node_url}/local-db/tables")
        response.raise_for_status()
        return response.json()
async def query_table(self,
table_name: str,
columns: str | None = None,
condition: str | Dict | None = None,
order_by: str | None = None,
limit: int | None = None) ‑> Dict[str, Any]
Expand source code
async def query_table(self, table_name: str, columns: Optional[str] = None, condition: Optional[Union[str, Dict]] = None, order_by: Optional[str] = None, limit: Optional[int] = None) -> Dict[str, Any]:
    params = {"table_name": table_name}
    if columns:
        params["columns"] = columns
    if condition:
        params["condition"] = json.dumps(condition) if isinstance(condition, dict) else condition
    if order_by:
        params["order_by"] = order_by
    if limit:
        params["limit"] = limit

    async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
        response = await client.get(
            f"{self.node_url}/local-db/table/{table_name}/rows",
            params=params
        )
        response.raise_for_status()
        return response.json()
async def read_storage(self, agent_run_id: str, output_dir: str, ipfs: bool = False) ‑> str
Expand source code
async def read_storage(self, agent_run_id: str, output_dir: str, ipfs: bool = False) -> str:
    print("Reading from storage...")
    try:
        endpoint = f"{self.node_url}/{'storage/read_ipfs' if ipfs else 'storage/read'}/{agent_run_id}"

        async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
            response = await client.get(endpoint)
            response.raise_for_status()
            storage = response.content  
            print("Retrieved storage.")
        
            # Temporary file handling
            temp_file_name = None
            with tempfile.NamedTemporaryFile(delete=False, mode='wb') as tmp_file:
                tmp_file.write(storage)  # storage is a bytes-like object
                temp_file_name = tmp_file.name
    
            # Ensure output directory exists
            output_path = Path(output_dir)
            output_path.mkdir(parents=True, exist_ok=True)
    
            # Check if the file is a zip file and extract if true
            if zipfile.is_zipfile(temp_file_name):
                with zipfile.ZipFile(temp_file_name, 'r') as zip_ref:
                    zip_ref.extractall(output_path)
                print(f"Extracted storage to {output_dir}.")
            else:
                shutil.copy(temp_file_name, output_path)
                print(f"Copied storage to {output_dir}.")

            # Cleanup temporary file
            Path(temp_file_name).unlink(missing_ok=True)
    
            return output_dir         
    except HTTPStatusError as e:
        logger.info(f"HTTP error occurred: {e}")
        raise  
    except Exception as e:
        logger.info(f"An unexpected error occurred: {e}")
        logger.info(f"Full traceback: {traceback.format_exc()}")
async def register_user(self, user_input: Dict[str, Any]) ‑> Dict[str, Any]
Expand source code
async def register_user(self, user_input: Dict[str, Any]) -> Dict[str, Any]:
    """
    Register a user on a node
    """
    endpoint = self.node_url + "/user/register"
    try:
        async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
            headers = {
                'Content-Type': 'application/json', 
            }
            response = await client.post(
                endpoint, 
                json=user_input,
                headers=headers
            )
            response.raise_for_status()
        return json.loads(response.text)
    except HTTPStatusError as e:
        logger.info(f"HTTP error occurred: {e}")
        raise  
    except RemoteProtocolError as e:
        error_msg = f"Register user failed to connect to the server at {self.node_url}. Please check if the server URL is correct and the server is running. Error details: {str(e)}"
        logger.error(error_msg)
        raise 
    except Exception as e:
        logger.info(f"An unexpected error occurred: {e}")
        raise

Register a user on a node

async def run_agent(self,
agent_run_input: AgentRunInput) ‑> AgentRun
Expand source code
async def run_agent(self, agent_run_input: AgentRunInput) -> AgentRun:
    """Run an agent module on a node"""
    return await self._run_module(agent_run_input, 'agent')

Run an agent module on a node

async def run_agent_and_poll(self,
agent_run_input: AgentRunInput) ‑> AgentRun
Expand source code
async def run_agent_and_poll(self, agent_run_input: AgentRunInput) -> AgentRun:
    """Run an agent module and poll for results until completion."""
    return await self._run_and_poll(agent_run_input, 'agent')

Run an agent module and poll for results until completion.

async def run_environment(self,
environment_run_input: EnvironmentRunInput) ‑> EnvironmentRun
Expand source code
async def run_environment(self, environment_run_input: EnvironmentRunInput) -> EnvironmentRun:
    """Run an environment module on a node"""
    return await self._run_module(environment_run_input, 'environment')

Run an environment module on a node

async def run_environment_and_poll(self,
environment_input: EnvironmentRunInput) ‑> EnvironmentRun
Expand source code
async def run_environment_and_poll(self, environment_input: EnvironmentRunInput) -> EnvironmentRun:
    """Run an environment module and poll for results until completion."""
    return await self._run_and_poll(environment_input, 'environment')

Run an environment module and poll for results until completion.

async def run_inference(self,
inference_input: ChatCompletionRequest | Dict) ‑> ModelResponse
Expand source code
async def run_inference(self, inference_input: Union[ChatCompletionRequest, Dict]) -> ModelResponse:
    """
    Run inference on a node
    
    Args:
        inference_input: The inference input to run inference on
    """
    if isinstance(inference_input, dict):
        inference_input = ChatCompletionRequest(**inference_input)

    endpoint = f"{self.node_url}/inference/chat"

    try:
        async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
            headers = {
                'Content-Type': 'application/json',
                'Authorization': f'Bearer {self.access_token}',
            }
            response = await client.post(
                endpoint,
                json=inference_input.model_dump(),
                headers=headers
            )
            print("Response: ", response.text)
            response.raise_for_status()
            return ModelResponse(**json.loads(response.text))
    except HTTPStatusError as e:
        logger.info(f"HTTP error occurred: {e}")
        raise
    except RemoteProtocolError as e:
        error_msg = f"Inference failed to connect to the server at {self.node_url}. Please check if the server URL is correct and the server is running. Error details: {str(e)}"
        logger.error(error_msg)
        raise
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        raise

Run inference on a node

Args

inference_input
The inference input to run inference on
async def run_kb(self,
kb_run_input: KBRunInput) ‑> KBRun
Expand source code
async def run_kb(self, kb_run_input: KBRunInput) -> KBRun:
    """Run a knowledge base module on a node"""
    return await self._run_module(kb_run_input, 'kb')

Run a knowledge base module on a node

async def run_kb_and_poll(self,
kb_input: KBDeployment) ‑> KBDeployment
Expand source code
async def run_kb_and_poll(self, kb_input: KBDeployment) -> KBDeployment:
    """Run a knowledge base module and poll for results until completion."""
    return await self._run_and_poll(kb_input, 'kb')

Run a knowledge base module and poll for results until completion.

async def run_memory(self,
memory_run_input: MemoryRunInput) ‑> MemoryRun
Expand source code
async def run_memory(self, memory_run_input: MemoryRunInput) -> MemoryRun:
    """Run a memory module on a node"""
    return await self._run_module(memory_run_input, 'memory')

Run a memory module on a node

async def run_memory_and_poll(self,
memory_input: MemoryDeployment) ‑> MemoryDeployment
Expand source code
async def run_memory_and_poll(self, memory_input: MemoryDeployment) -> MemoryDeployment:
    """Run a memory module and poll for results until completion."""
    return await self._run_and_poll(memory_input, 'memory')

Run a memory module and poll for results until completion.

async def run_orchestrator(self,
orchestrator_run_input: OrchestratorRunInput) ‑> OrchestratorRun
Expand source code
async def run_orchestrator(self, orchestrator_run_input: OrchestratorRunInput) -> OrchestratorRun:
    """Run an orchestrator module on a node"""
    return await self._run_module(orchestrator_run_input, 'orchestrator')

Run an orchestrator module on a node

async def run_orchestrator_and_poll(self,
orchestrator_run_input: OrchestratorRunInput) ‑> OrchestratorRun
Expand source code
async def run_orchestrator_and_poll(self, orchestrator_run_input: OrchestratorRunInput) -> OrchestratorRun:
    """Run an orchestrator module and poll for results until completion."""
    return await self._run_and_poll(orchestrator_run_input, 'orchestrator')

Run an orchestrator module and poll for results until completion.

async def run_tool(self,
tool_run_input: ToolRunInput) ‑> ToolRun
Expand source code
async def run_tool(self, tool_run_input: ToolRunInput) -> ToolRun:
    """Run a tool module on a node"""
    return await self._run_module(tool_run_input, 'tool')

Run a tool module on a node

async def run_tool_and_poll(self,
tool_run_input: ToolRunInput) ‑> ToolRun
Expand source code
async def run_tool_and_poll(self, tool_run_input: ToolRunInput) -> ToolRun:
    """Run a tool module and poll for results until completion."""

    return await self._run_and_poll(tool_run_input, 'tool')

Run a tool module and poll for results until completion.

async def update_agent_run(self,
agent_run: AgentRun)
Expand source code
async def update_agent_run(self, agent_run: AgentRun):
    try:
        async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
            response = await client.post(
                f"{self.node_url}/monitor/update_agent_run", json=agent_run.model_dump()
            )
            response.raise_for_status()
        return AgentRun(**json.loads(response.text))
    except HTTPStatusError as e:
        logger.info(f"HTTP error occurred: {e}")
        raise  
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        error_details = traceback.format_exc()
        print(f"Full traceback: {error_details}")
async def update_row(self,
table_name: str,
data: Dict[str, Any],
condition: Dict[str, Any],
schema: Dict[str, Dict[str, Any]] | None = None) ‑> Dict[str, Any]
Expand source code
async def update_row(self, table_name: str, data: Dict[str, Any], condition: Dict[str, Any], schema: Optional[Dict[str, Dict[str, Any]]] = None) -> Dict[str, Any]:
    async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
        response = await client.post(
            f"{self.node_url}/local-db/update-row",
            json={
                "table_name": table_name,
                "data": data,
                "condition": condition,
                "schema": schema
            }
        )
        response.raise_for_status()
        return response.json()
Expand source code
async def vector_search(
    self,
    table_name: str,
    vector_column: str,
    query_vector: List[float],
    columns: Optional[List[str]] = None,
    top_k: int = 5,
    include_similarity: bool = True
) -> Dict[str, Any]:
    """
    Perform a pgvector-based similarity search on a table's vector column.
    """
    payload = {
        "table_name": table_name,
        "vector_column": vector_column,
        "query_vector": query_vector,
        "columns": columns or ["text"],
        "top_k": top_k,
        "include_similarity": include_similarity,
    }
    async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
        response = await client.post(
            f"{self.node_url}/local-db/vector_search",
            json=payload
        )
        response.raise_for_status()
        return response.json()

Perform a pgvector-based similarity search on a table's vector column.

async def write_storage(self,
storage_input: str,
ipfs: bool = False,
publish_to_ipns: bool = False,
update_ipns_name: str = None) ‑> Dict[str, Any]
Expand source code
async def write_storage(self, storage_input: str, ipfs: bool = False, publish_to_ipns: bool = False, update_ipns_name: str = None) -> Dict[str, Any]:
    """Write storage to the node."""
    print("Writing storage")
    try:
        file = prepare_files(storage_input)
        endpoint = f"{self.node_url}/storage/write_ipfs" if ipfs else f"{self.node_url}/storage/write"
        
        if update_ipns_name:
            publish_to_ipns = True

        data = {
            "publish_to_ipns": publish_to_ipns,
            "update_ipns_name": update_ipns_name
        }
        async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
            response = await client.post(
                endpoint, 
                files=file,
                data=data,
                timeout=600
            )
            response.raise_for_status()
            return response.json()
    except HTTPStatusError as e:
        logger.info(f"HTTP error occurred: {e}")
        raise  
    except Exception as e:
        logger.info(f"An unexpected error occurred: {e}")
        logger.info(f"Full traceback: {traceback.format_exc()}")
        return {}

Write storage to the node.