Module naptha_sdk.configs

Functions

async def check_register_user(deployment, user_id=None)
Expand source code
async def check_register_user(deployment, user_id=None):
    node = UserClient(deployment["node"])
    user = await node.check_user(user_input={"public_key": user_id.split(":")[-1]})

    if user['is_registered'] == True:
        print("Found user...", user)
    else:
        print("No user found. Registering user...")
        user = await node.register_user(user_input=user)
        print(f"User registered: {user}.")
def load_llm_configs(llm_configs_path)
Expand source code
def load_llm_configs(llm_configs_path):
    with open(llm_configs_path, "r") as file:
        llm_configs = json.loads(file.read())
    return [LLMConfig(**config) for config in llm_configs]
async def load_module_config_data(module_type, deployment, load_persona_data=False)
Expand source code
async def load_module_config_data(module_type, deployment, load_persona_data=False):

    config_map = {
        "agent": AgentConfig,
        "environment": EnvironmentConfig,
        "tool": ToolConfig,
        "orchestrator": OrchestratorConfig,
        "kb": KBConfig,
        "memory": MemoryConfig
    }

    if "llm_config" in deployment["config"] and deployment["config"]["llm_config"] is not None:
        config_name = deployment["config"]["llm_config"]["config_name"]
        config_path = f"{Path.cwd().name}/configs/llm_configs.json"
        llm_configs = load_llm_configs(config_path)
        llm_config = next(config for config in llm_configs if config.config_name == config_name)
        deployment["config"]["llm_config"] = llm_config
    if load_persona_data:
        persona_data = await load_persona(deployment["config"]["persona_module"])
        deployment["config"]["system_prompt"]["persona"] = persona_data

    deployment["config"] = config_map[module_type](**deployment["config"])

    return deployment
async def load_node_metadata(deployment, node_url, is_subdeployment)
Expand source code
async def load_node_metadata(deployment, node_url, is_subdeployment):
    if node_url is None:
        node_url = os.getenv("NODE_URL")
        if node_url is None:
            raise Exception("Node URL is required. Please make sure you've added NODE_URL=<node_url> to your .env file.")

    print(f"Loading node metadata for {deployment['node']['ip']}")
    if not is_subdeployment:
        deployment["node"] = url_to_node(node_url)
    else:
        deployment["node"] = await list_nodes(deployment["node"]["ip"])
        deployment["node"] = NodeConfig(**deployment["node"])
    print(f"Node metadata loaded {deployment['node']}")
    return deployment
async def load_subdeployments(deployment, node_url=None, user_id=None)
Expand source code
async def load_subdeployments(deployment, node_url=None, user_id=None):

    configs_path = Path(f"{Path.cwd().name}/configs")

    if "agent_deployments" in deployment and deployment["agent_deployments"]:
        # Update defaults with non-None values from input
        agent_deployments = []
        for i, agent_deployment in enumerate(deployment["agent_deployments"]):
            deployment_name = deployment["agent_deployments"][i]["name"]
            agent_deployment = await setup_module_deployment("agent", configs_path / "agent_deployments.json", node_url, user_id, deployment_name, is_subdeployment=True)
            agent_deployments.append(agent_deployment)
        deployment["agent_deployments"] = agent_deployments
    if "tool_deployments" in deployment and deployment["tool_deployments"]:
        tool_deployments = []
        for i, tool_deployment in enumerate(deployment["tool_deployments"]):
            deployment_name = deployment["tool_deployments"][i]["name"]
            tool_deployment = await setup_module_deployment("tool", configs_path / "tool_deployments.json", node_url, user_id, deployment_name, is_subdeployment=True)
            tool_deployments.append(tool_deployment)
        deployment["tool_deployments"] = tool_deployments
    if "environment_deployments" in deployment and deployment["environment_deployments"]:
        environment_deployments = []
        for i, environment_deployment in enumerate(deployment["environment_deployments"]):
            deployment_name = deployment["environment_deployments"][i]["name"]
            environment_deployment = await setup_module_deployment("environment", configs_path / "environment_deployments.json", node_url, user_id, deployment_name, is_subdeployment=True)
            environment_deployments.append(environment_deployment)
        deployment["environment_deployments"] = environment_deployments
    if "kb_deployments" in deployment and deployment["kb_deployments"]:
        kb_deployments = []
        for i, kb_deployment in enumerate(deployment["kb_deployments"]):
            deployment_name = deployment["kb_deployments"][i]["name"]
            kb_deployment = await setup_module_deployment("kb", configs_path / "kb_deployments.json", node_url, user_id, deployment_name, is_subdeployment=True)
            kb_deployments.append(kb_deployment)
        deployment["kb_deployments"] = kb_deployments
    if "memory_deployments" in deployment and deployment["memory_deployments"]:
        memory_deployments = []
        for i, memory_deployment in enumerate(deployment["memory_deployments"]):
            deployment_name = deployment["memory_deployments"][i]["name"]
            memory_deployment = await setup_module_deployment("memory", configs_path / "memory_deployments.json", node_url, user_id, deployment_name, is_subdeployment=True)
            memory_deployments.append(memory_deployment)
        deployment["memory_deployments"] = memory_deployments
    print(f"Subdeployments loaded {deployment}")
    return deployment
async def setup_module_deployment(module_type: str,
deployment_path: str,
node_url: str = None,
user_id: str = None,
deployment_name: str = None,
load_persona_data=False,
is_subdeployment: bool = False)
Expand source code
async def setup_module_deployment(module_type: str, deployment_path: str, node_url: str = None, user_id: str = None, deployment_name: str = None, load_persona_data=False, is_subdeployment: bool = False):

    # Map deployment types to their corresponding classes
    deployment_map = {
        "agent": AgentDeployment,
        "tool": ToolDeployment,
        "environment": EnvironmentDeployment,
        "kb": KBDeployment,
        "memory": MemoryDeployment,
        "orchestrator": OrchestratorDeployment
    }

    # Load default deployment config from module
    with open(deployment_path, "r") as file:
        deployment = json.loads(file.read())

    if deployment_name is None:
        deployment = deployment[0]
    else:
        # Get the first deployment with matching name
        deployment = next((d for d in deployment if d["name"] == deployment_name), None)
        if deployment is None:
            raise ValueError(f"No deployment found with name {deployment_name}")

    if node_url is not None:
        deployment = await load_node_metadata(deployment, node_url, is_subdeployment)
    if user_id is not None:
        await check_register_user(deployment, user_id)
    deployment = await load_module_config_data(module_type, deployment, load_persona_data)
    deployment = await load_subdeployments(deployment, node_url, user_id)
    return deployment_map[module_type](**deployment)