Module naptha_sdk.cli

Functions

async def add_data_to_memory(naptha, memory_name, data, user_id=None, memory_node_url='http://localhost:7001')
Expand source code
async def add_data_to_memory(naptha, memory_name, data, user_id=None, memory_node_url="http://localhost:7001"):
    try:
        # Parse the data string into a dictionary
        data_dict = {}
        # Split by spaces, but keep quoted strings together
        parts = shlex.split(data)
        
        for part in parts:
            if '=' in part:
                key, value = part.split('=', 1)
                # Remove quotes if they exist
                value = value.strip("'\"")
                data_dict[key] = value

        data_dict = [data_dict]
        
        memory_run_input = {
            "consumer_id": user_id,
            "inputs": {
                "mode": "add_data",
                "data": json.dumps(data_dict)
            },
            "memory_deployment": {
                "name": memory_name,
                "module": {
                    "name": memory_name
                },
                "memory_node_url": memory_node_url
            }
        }

        memory_run = await naptha.node.run_memory_and_poll(memory_run_input)
        console = Console()
        console.print(f"\n[green]Successfully added data to memory:[/green] {memory_name}")
        console.print(memory_run)
        
    except Exception as e:
        console = Console()
        console.print(f"\n[red]Error adding data to memory:[/red] {str(e)}")
def cli()
Expand source code
def cli():
    import sys
    import traceback
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\nOperation cancelled by user")
        sys.exit(1)
    except Exception as e:
        print(f"Error: {str(e)}")
        print(f"Full traceback: {traceback.format_exc()}")
        sys.exit(1)
async def create(naptha,
module_name,
agent_modules=None,
agent_nodes=None,
environment_modules=None,
environment_nodes=None)
Expand source code
async def create(
        naptha,
        module_name,
        agent_modules = None,
        agent_nodes = None,
        environment_modules = None,
        environment_nodes = None
):
    module_type = module_name.split(":")[0] if ":" in module_name else "agent"
    module_name = module_name.split(":")[-1]  # Remove prefix if exists

    user = await naptha.node.check_user(user_input={"public_key": naptha.hub.public_key})

    if user['is_registered']:
        print("Found user...", user)
    else:
        print("No user found. Registering user...")
        user = await naptha.node.register_user(user_input=user)
        print(f"User registered: {user}.")

    # Create auxiliary deployments if needed
    aux_deployments = {
        "agent_deployments": [
            AgentDeployment(
                name=agent_module,
                module={"name": agent_module},
                node=url_to_node(agent_node)
            ) for agent_module, agent_node in zip(agent_modules or [], agent_nodes or [])
        ],
        "environment_deployments": [
            EnvironmentDeployment(
                name=env_module,
                module={"name": env_module},
                node=url_to_node(env_node)
            ) for env_module, env_node in zip(environment_modules or [], environment_nodes or [])
        ]
    }

    # Define deployment configurations for each module type
    deployment_configs = {
        "agent": lambda: AgentDeployment(
            name=module_name,
            module={"name": module_name},
            node=url_to_node(os.getenv("NODE_URL")),
        ),
        "tool": lambda: ToolDeployment(
            name=module_name,
            module={"name": module_name},
            node=url_to_node(os.getenv("NODE_URL"))
        ),
        "orchestrator": lambda: OrchestratorDeployment(
            name=module_name,
            module={"name": module_name},
            node=url_to_node(os.getenv("NODE_URL")),
            **aux_deployments
        ),
        "environment": lambda: EnvironmentDeployment(
            name=module_name,
            module={"name": module_name},
            node=url_to_node(os.getenv("NODE_URL"))
        ),
        "kb": lambda: KBDeployment(
            name=module_name,
            module={"name": module_name},
            node=url_to_node(os.getenv("NODE_URL"))
        ),
        "memory": lambda: MemoryDeployment(
            name=module_name,
            module={"name": module_name},
            node=url_to_node(os.getenv("NODE_URL"))
        )
    }

    # Get deployment configuration for module type
    if module_type not in deployment_configs:
        raise ValueError(f"Unsupported module type: {module_type}")

    print(f"Creating {module_type.title()}...")
    deployment = deployment_configs[module_type]()
    result = await naptha.node.create(module_type, deployment)
    print(f"{module_type.title()} creation result: {result}")
async def create_agent(naptha, agent_config)
Expand source code
async def create_agent(naptha, agent_config):
    print(f"Agent Config: {agent_config}")
    agent = await naptha.hub.create_agent(agent_config)
    if isinstance(agent, dict):
        print(f"Agent created: {agent}")
    elif isinstance(agent, list):
        print(f"Agent created: {agent[0]}")
async def create_environment(naptha, environment_config)
Expand source code
async def create_environment(naptha, environment_config):
    print(f"Environment Config: {environment_config}")
    environment = await naptha.hub.create_environment(environment_config)
    if isinstance(environment, dict):
        print(f"Environment created: {environment}")
    elif isinstance(environment, list):
        print(f"Environment created: {environment[0]}")
async def create_orchestrator(naptha, orchestrator_config)
Expand source code
async def create_orchestrator(naptha, orchestrator_config):
    print(f"Orchestrator Config: {orchestrator_config}")
    orchestrator = await naptha.hub.create_orchestrator(orchestrator_config)
    if isinstance(orchestrator, dict):
        print(f"Orchestrator created: {orchestrator}")
    elif isinstance(orchestrator, list):
        print(f"Orchestrator created: {orchestrator[0]}")
async def create_persona(naptha, persona_config)
Expand source code
async def create_persona(naptha, persona_config):
    print(f"Persona Config: {persona_config}")
    persona = await naptha.hub.create_persona(persona_config)
    if isinstance(persona, dict):
        print(f"Persona created: {persona}")
    elif isinstance(persona, list):
        print(f"Persona created: {persona[0]}")
async def list_agents(naptha)
Expand source code
async def list_agents(naptha):
    agents = await naptha.hub.list_agents()
    
    if not agents:
        console = Console()
        console.print("[red]No agents found.[/red]")
        return

    console = Console()
    table = Table(
        box=box.ROUNDED,
        show_lines=True,
        title="Available Agents",
        title_style="bold cyan",
        header_style="bold blue",
        row_styles=["", "dim"]  # Alternating row styles
    )

    # Define columns with specific formatting
    table.add_column("Name", justify="left", style="green")
    table.add_column("ID", justify="left")
    table.add_column("Author", justify="left")
    table.add_column("Description", justify="left", max_width=50)
    table.add_column("Parameters", justify="left", max_width=30)
    table.add_column("Module URL", justify="left", max_width=30)
    table.add_column("Module Version", justify="center")

    # Add rows
    for agent in agents:
        table.add_row(
            agent['name'],
            agent['id'],
            agent['author'],
            agent['description'],
            str(agent['parameters']),
            agent['module_url'],
            agent['module_version'],
        )

    # Print table and summary
    console.print()
    console.print(table)
    console.print(f"\n[green]Total agents:[/green] {len(agents)}")
async def list_environments(naptha)
Expand source code
async def list_environments(naptha):
    environments = await naptha.hub.list_environments()
    
    if not environments:
        console = Console()
        console.print("[red]No environments found.[/red]")
        return

    console = Console()
    table = Table(
        box=box.ROUNDED,
        show_lines=True,
        title="Available Environments",
        title_style="bold cyan",
        header_style="bold blue",
        row_styles=["", "dim"]  # Alternating row styles
    )

    # Define columns with specific formatting
    table.add_column("Name", justify="left", style="green")
    table.add_column("ID", justify="left")
    table.add_column("Author", justify="left")
    table.add_column("Description", justify="left", max_width=50)
    table.add_column("Parameters", justify="left", max_width=30)
    table.add_column("Module URL", justify="left", max_width=30)
    table.add_column("Module Version", justify="center")

    # Add rows
    for environment in environments:
        table.add_row(
            environment['name'],
            environment['id'],
            environment['author'],
            environment['description'],
            str(environment['parameters']),
            environment['module_url'],
            environment['module_version'],
        )

    # Print table and summary
    console.print()
    console.print(table)
    console.print(f"\n[green]Total environments:[/green] {len(environments)}")
async def list_kbs(naptha, kb_name=None)
Expand source code
async def list_kbs(naptha, kb_name=None):
    kbs = await naptha.hub.list_kbs(kb_name=kb_name)
    
    if not kbs:
        console = Console()
        console.print("[red]No knowledge bases found.[/red]")
        return

    console = Console()
    table = Table(
        box=box.ROUNDED,
        show_lines=True,
        title="Available Knowledge Bases",
        title_style="bold cyan", 
        header_style="bold blue",
        row_styles=["", "dim"]  # Alternating row styles
    )

    # Define columns with specific formatting
    table.add_column("Name", justify="left", style="green")
    table.add_column("ID", justify="left")
    table.add_column("Author", justify="left")
    table.add_column("Description", justify="left", max_width=50)
    table.add_column("Parameters", justify="left", max_width=40)
    table.add_column("Module URL", justify="left", max_width=40)
    table.add_column("Module Type", justify="left")
    table.add_column("Module Version", justify="center")

    # Add rows
    for kb in kbs:
        table.add_row(
            kb['name'],
            kb['id'],
            kb['author'],
            kb['description'],
            kb['parameters'],
            kb['module_url'],
            kb['module_type'],
            kb['module_version']
        )

    # Print table and summary
    console.print()
    console.print(table)
    console.print(f"\n[green]Total knowledge bases:[/green] {len(kbs)}")
async def list_memories(naptha, memory_name=None)
Expand source code
async def list_memories(naptha, memory_name=None):
    memories = await naptha.hub.list_memories(memory_name=memory_name)

    if not memories:
        console = Console()
        console.print("[red]No memories found.[/red]")
        return
    
    console = Console()
    table = Table(
        box=box.ROUNDED,
        show_lines=True,
        title="Available memories",
        title_style="bold cyan",
        header_style="bold blue",
        row_styles=["", "dim"]
    )

    table.add_column("Name", justify="left", style="green")
    table.add_column("ID", justify="left")
    table.add_column("Author", justify="left")
    table.add_column("Description", justify="left", max_width=50)
    table.add_column("Parameters", justify="left", max_width=40)
    table.add_column("Module URL", justify="left", max_width=40)
    table.add_column("Module Version", justify="center")

    # Add rows
    for memory in memories:
        table.add_row(
            memory['name'],
            memory['id'],
            memory['author'],
            memory['description'],
            memory['parameters'],
            memory['module_url'],
            memory['module_version']
        )

    # Print table and summary
    console.print()
    console.print(table)
    console.print(f"\n[green]Total memories:[/green] {len(memories)}")
async def list_memory_content(naptha, memory_name)
Expand source code
async def list_memory_content(naptha, memory_name):
    rows = await naptha.node.query_table(
        table_name=memory_name,   
        columns="*",
        condition=None,
        order_by=None,
        limit=None
    )
    
    if not rows.get('rows'):
        console = Console()
        console.print("[red]No content found in memory.[/red]")
        return

    console = Console()
    table = Table(
        box=box.ROUNDED,
        show_lines=True,
        title=f"Memory Content: {memory_name}",
        title_style="bold cyan",
        header_style="bold blue",
        row_styles=["", "dim"]
    )

    # Add headers
    headers = list(rows['rows'][0].keys())
    for header in headers:
        if header.lower() in ['id', 'module_url']:
            table.add_column(header, justify="left", max_width=40)
        elif header.lower() in ['title', 'name']:
            table.add_column(header, justify="left", style="green", max_width=40)
        elif header.lower() in ['text', 'description', 'content']:
            table.add_column(header, justify="left", max_width=60)
        else:
            table.add_column(header, justify="left", max_width=30)

    # Add rows
    for row in rows['rows']:
        table.add_row(*[str(row.get(key, '')) for key in headers])

    # Print table and summary
    console.print()
    console.print(table)
    console.print(f"\n[green]Total rows:[/green] {len(rows['rows'])}")
async def list_nodes(naptha)
Expand source code
async def list_nodes(naptha):
    nodes = await naptha.hub.list_nodes()
    
    if not nodes:
        console = Console()
        console.print("[red]No nodes found.[/red]")
        return

    console = Console()
    table = Table(
        box=box.ROUNDED,
        show_lines=True,
        title="Available Nodes",
        title_style="bold cyan",
        header_style="bold blue",
        row_styles=["", "dim"]  # Alternating row styles
    )

    # Get dynamic headers from first node
    headers = list(nodes[0].keys())

    # Define columns with specific formatting
    table.add_column("ID", justify="left")
    table.add_column("IP", justify="left")
    table.add_column("Owner", justify="left")
    table.add_column("OS", justify="left")
    table.add_column("Arch", justify="left")
    table.add_column("Num Servers", justify="left")
    table.add_column("Server Type", justify="left")
    table.add_column("HTTP Port", justify="left")
    table.add_column("Models", justify="left")
    table.add_column("Num GPUs", justify="left")
    table.add_column("Provider Types", justify="left")

    # Add rows
    for node in nodes:
        table.add_row(
            node['id'],
            node['ip'],
            node['owner'],
            node['os'],
            node['arch'],
            str(node['num_servers']),
            node['server_type'],
            str(node['http_port']),
            str(node['models']), 
            str(node['num_gpus']),
            str(node['provider_types']) 
        )
    # Print table and summary
    console.print()
    console.print(table)
    console.print(f"\n[green]Total nodes:[/green] {len(nodes)}")
async def list_orchestrators(naptha)
Expand source code
async def list_orchestrators(naptha):
    orchestrators = await naptha.hub.list_orchestrators()
    
    if not orchestrators:
        console = Console()
        console.print("[red]No orchestrators found.[/red]")
        return

    console = Console()
    table = Table(
        box=box.ROUNDED,
        show_lines=True,
        title="Available Orchestrators",
        title_style="bold cyan",
        header_style="bold blue",
        row_styles=["", "dim"]  # Alternating row styles
    )

    # Define columns with specific formatting
    table.add_column("Name", justify="left", style="green")
    table.add_column("ID", justify="left")
    table.add_column("Author", justify="left")
    table.add_column("Description", justify="left", max_width=50)
    table.add_column("Parameters", justify="left", max_width=30)
    table.add_column("Module URL", justify="left", max_width=30)
    table.add_column("Module Version", justify="center")

    # Add rows
    for orchestrator in orchestrators:
        table.add_row(
            orchestrator['name'],
            orchestrator['id'],
            orchestrator['author'],
            orchestrator['description'],
            str(orchestrator['parameters']),
            orchestrator['module_url'],
            orchestrator['module_version'],
        )

    # Print table and summary
    console.print()
    console.print(table)
    console.print(f"\n[green]Total orchestrators:[/green] {len(orchestrators)}")
async def list_personas(naptha)
Expand source code
async def list_personas(naptha):
    personas = await naptha.hub.list_personas()
    
    if not personas:
        console = Console()
        console.print("[red]No personas found.[/red]")
        return

    console = Console()
    table = Table(
        box=box.ROUNDED,
        show_lines=True,
        title="Available Personas",
        title_style="bold cyan",
        header_style="bold blue",
        row_styles=["", "dim"]  # Alternating row styles
    )

    # Define columns with specific formatting
    table.add_column("Name", justify="left", style="green")
    table.add_column("ID", justify="left")
    table.add_column("Author", justify="left")
    table.add_column("Description", justify="left", max_width=50)
    table.add_column("Parameters", justify="left", max_width=30)
    table.add_column("Module URL", justify="left", max_width=40)
    table.add_column("Module Version", justify="center")
    table.add_column("Module Entrypoint", justify="center")

    # Add rows
    for persona in personas:
        table.add_row(
            persona['name'],
            persona['id'],
            persona['author'],
            persona['description'],
            persona['parameters'],
            persona['module_url'],
            persona['module_version'],
            persona['module_entrypoint']
        )

    # Print table and summary
    console.print()
    console.print(table)
    console.print(f"\n[green]Total personas:[/green] {len(personas)}")
async def list_servers(naptha)
Expand source code
async def list_servers(naptha):
    servers = await naptha.hub.list_servers()
    
    if not servers:
        console = Console()
        console.print("[red]No servers found.[/red]")
        return

    console = Console()
    table = Table(
        box=box.ROUNDED,
        show_lines=True,
        title="Available Servers",
        title_style="bold cyan",
        header_style="bold blue",
        row_styles=["", "dim"]  # Alternating row styles
    )

    # Add columns
    table.add_column("Name", justify="left", style="green")
    table.add_column("ID", justify="left")
    table.add_column("Connection", justify="left")
    table.add_column("Node ID", justify="left", max_width=30)

    # Add rows
    for server in servers:
        table.add_row(
            server['name'],
            server['id'],
            server['connection_string'],
            server['node_id'][:30] + "..."  # Truncate long node ID
        )

    # Print table and summary
    console.print()
    console.print(table)
    console.print(f"\n[green]Total servers:[/green] {len(servers)}")
async def list_tools(naptha)
Expand source code
async def list_tools(naptha):
    tools = await naptha.hub.list_tools()
    
    if not tools:
        console = Console()
        console.print("[red]No tools found.[/red]")
        return

    console = Console()
    table = Table(
        box=box.ROUNDED,
        show_lines=True,
        title="Available Tools", 
        title_style="bold cyan",
        header_style="bold blue",
        row_styles=["", "dim"]  # Alternating row styles
    )

    # Define columns with specific formatting
    table.add_column("Name", justify="left", style="green")
    table.add_column("ID", justify="left")
    table.add_column("Author", justify="left")
    table.add_column("Description", justify="left", max_width=50)
    table.add_column("Parameters", justify="left", max_width=30)
    table.add_column("Module URL", justify="left", max_width=30)
    table.add_column("Module Version", justify="center")

    # Add rows
    for tool in tools:
        table.add_row(
            tool['name'],
            tool['id'],
            tool['author'],
            tool['description'],
            str(tool['parameters']),
            tool['module_url'],
            tool['module_version'],
        )

    # Print table and summary
    console.print()
    console.print(table)
    console.print(f"\n[green]Total tools:[/green] {len(tools)}")
def load_yaml_to_dict(file_path)
Expand source code
def load_yaml_to_dict(file_path):
    with open(file_path, 'r') as file:
        # Load the YAML content into a Python dictionary
        yaml_content = yaml.safe_load(file)
    return yaml_content
async def main()
Expand source code
async def main():
    public_key = get_public_key(os.getenv("PRIVATE_KEY")) if os.getenv("PRIVATE_KEY") else None
    hub_username = os.getenv("HUB_USERNAME")
    hub_password = os.getenv("HUB_PASSWORD")
    hub_url = os.getenv("HUB_URL")

    naptha = Naptha()

    parser = argparse.ArgumentParser(description="CLI with for Naptha")
    subparsers = parser.add_subparsers(title="commands", dest="command")

    # Node parser
    nodes_parser = subparsers.add_parser("nodes", help="List available nodes.")
    nodes_parser.add_argument("-s", '--list_servers', action='store_true', help='List servers')

    # Agent parser
    agents_parser = subparsers.add_parser("agents", help="List available agents.")
    agents_parser.add_argument('agent_name', nargs='?', help='Optional agent name')
    agents_parser.add_argument("-p", '--metadata', type=str, help='Metadata in "key=value" format')
    agents_parser.add_argument('-d', '--delete', action='store_true', help='Delete a agent')

    # Orchestrator parser
    orchestrators_parser = subparsers.add_parser("orchestrators", help="List available orchestrators.")
    orchestrators_parser.add_argument('orchestrator_name', nargs='?', help='Optional orchestrator name')
    orchestrators_parser.add_argument("-p", '--metadata', type=str, help='Metadata in "key=value" format')
    orchestrators_parser.add_argument('-d', '--delete', action='store_true', help='Delete an orchestrator')

    # Environment parser
    environments_parser = subparsers.add_parser("environments", help="List available environments.")
    environments_parser.add_argument('environment_name', nargs='?', help='Optional environment name')
    environments_parser.add_argument("-p", '--metadata', type=str, help='Metadata in "key=value" format')
    environments_parser.add_argument('-d', '--delete', action='store_true', help='Delete an environment')

    # Persona parser
    personas_parser = subparsers.add_parser("personas", help="List available personas.")
    personas_parser.add_argument('persona_name', nargs='?', help='Optional persona name')
    personas_parser.add_argument("-p", '--metadata', type=str, help='Metadata in "key=value" format')
    personas_parser.add_argument('-d', '--delete', action='store_true', help='Delete a persona')

    # Tool parser
    tools_parser = subparsers.add_parser("tools", help="List available tools.")
    tools_parser.add_argument('tool_name', nargs='?', help='Optional tool name')
    tools_parser.add_argument("-p", '--metadata', type=str, help='Metadata in "key=value" format')
    tools_parser.add_argument('-d', '--delete', action='store_true', help='Delete a tool')

    # Memory parser
    memories_parser = subparsers.add_parser("memories", help="List available memories.")
    memories_parser.add_argument('memory_name', nargs='?', help='Optional memory name')
    memories_parser.add_argument('-p', '--metadata', type=str, help='Metadata for memory registration in "key=value" format')
    memories_parser.add_argument('-d', '--delete', action='store_true', help='Delete a memory')
    memories_parser.add_argument('-m', '--memory_nodes', type=str, help='Memory nodes', default=["http://localhost:7001"])

    # Knowledge base parser
    kbs_parser = subparsers.add_parser("kbs", help="List available knowledge bases.")
    kbs_parser.add_argument('kb_name', nargs='?', help='Optional knowledge base name')
    kbs_parser.add_argument('-p', '--metadata', type=str, help='Metadata for knowledge base registration in "key=value" format')
    kbs_parser.add_argument('-d', '--delete', action='store_true', help='Delete a knowledge base')
    kbs_parser.add_argument('-k', '--kb_nodes', type=str, help='Knowledge base nodes')

    # Create parser
    create_parser = subparsers.add_parser("create", help="Execute create command.")
    create_parser.add_argument("module", help="Select the module to create")
    create_parser.add_argument("-a", "--agent_modules", help="Agent modules to create")
    create_parser.add_argument("-n", "--agent_nodes", help="Agent nodes to take part in orchestrator runs.")
    create_parser.add_argument("-e", "--environment_modules", help="Environment module to create")
    create_parser.add_argument("-m", "--environment_nodes", help="Environment nodes to store data during agent runs.")

    # Run parser
    run_parser = subparsers.add_parser("run", help="Execute run command.")
    run_parser.add_argument("agent", help="Select the agent to run")
    run_parser.add_argument("-p", '--parameters', type=str, help='Parameters in "key=value" format')
    run_parser.add_argument("-n", "--agent_nodes", help="Agent nodes to take part in module runs.")
    run_parser.add_argument("-t", "--tool_nodes", help="Tool nodes to take part in module runs.")
    run_parser.add_argument("-e", "--environment_nodes", help="Environment nodes to store data during module runs.")
    run_parser.add_argument('-k', '--kb_nodes', type=str, help='Knowledge base nodes to take part in module runs.')
    run_parser.add_argument('-m', '--memory_nodes', type=str, help='Memory nodes')
    run_parser.add_argument("-pm", "--persona_modules", help="Personas URLs to install before running the agent")
    run_parser.add_argument("-f", "--file", help="YAML file with module run parameters")

    # Inference parser
    inference_parser = subparsers.add_parser("inference", help="Run model inference.")
    inference_parser.add_argument("prompt", help="Input prompt for the model")
    inference_parser.add_argument("-m", "--model", help="Model to use for inference", default="phi3:mini")
    inference_parser.add_argument("-p", "--parameters", type=str, help='Additional model parameters in "key=value" format')

    # Storage parser
    storage_parser = subparsers.add_parser("storage", help="Interact with Node storage.")
    storage_parser.add_argument("storage_type", help="The type of storage", choices=["db", "fs", "ipfs"])
    storage_parser.add_argument("operation", help="The operation to run", choices=["create", "read", "update", "delete", "list", "search"])
    storage_parser.add_argument("path", help="The path to store the object")
    storage_parser.add_argument("-d", "--data", help="Data to write to storage")
    storage_parser.add_argument("-s", "--schema", help="Schema to write to storage")
    storage_parser.add_argument("-o", "--options", help="Options to use with storage")
    storage_parser.add_argument("-f", "--file", help="File path for fs/ipfs operations")
    storage_parser.add_argument("--output", help="Output path for downloaded files", default="./downloads")

    # Signup command
    signup_parser = subparsers.add_parser("signup", help="Sign up a new user.")

    # Publish command
    publish_parser = subparsers.add_parser("publish", help="Publish agents.")
    publish_parser.add_argument("-d", "--decorator", help="Publish module via decorator", action="store_true")
    publish_parser.add_argument("-r", "--register", 
                              help="Register modules with hub. Optionally provide a GitHub URL to skip IPFS storage", 
                              nargs='?', 
                              const=True,
                              metavar="URL")
    publish_parser.add_argument("-s", "--subdeployments", help="Publish subdeployments", action="store_true")
        
    async with naptha as naptha:
        args = parser.parse_args()
        args = _parse_str_args(args)
        print(args)
        if args.command == "signup":
            _, _ = await user_setup_flow(hub_url, public_key)
        elif args.command in [
            "nodes", "agents", "orchestrators", "environments", 
            "personas", "kbs", "memories", "tools", "run", "inference", 
            "publish", "create", "storage"
        ]:
            if not naptha.hub.is_authenticated:
                if not hub_username or not hub_password:
                    print(
                        "Please set HUB_USERNAME and HUB_PASSWORD environment variables or sign up first (run naptha signup).")
                    return
                success, _, _ = await naptha.hub.signin(hub_username, hub_password)
                if not success:
                    print("Authentication failed. Please check your username and password.")
                    return

            if args.command == "nodes":
                if not args.list_servers:
                    await list_nodes(naptha)   
                else:
                    await list_servers(naptha)
            elif args.command == "agents":
                if not args.agent_name:
                    await list_agents(naptha)
                elif args.delete and len(args.agent_name.split()) == 1:
                    await naptha.hub.delete_agent(args.agent_name)
                elif len(args.agent_name.split()) == 1:
                    if hasattr(args, 'metadata') and args.metadata is not None:
                        params = shlex.split(args.metadata)
                        parsed_params = {}
                        for param in params:
                            key, value = param.split('=')
                            parsed_params[key] = value

                        required_metadata = ['description', 'parameters', 'module_url']
                        missing_metadata = [param for param in required_metadata if param not in parsed_params]
                        if missing_metadata:
                            print(f"Missing required metadata: {', '.join(missing_metadata)}")
                            return
                        agent_config = {
                            "id": f"agent:{args.agent_name}",
                            "name": args.agent_name,
                            "description": parsed_params['description'],
                            "parameters": parsed_params['parameters'],
                            "author": f"user:{naptha.hub.public_key}",
                            "module_url": parsed_params['module_url'],
                            "module_type": parsed_params.get('module_type', 'agent'),
                            "module_version": parsed_params.get('module_version', '0.1'),
                            "module_entrypoint": parsed_params.get('module_entrypoint', 'run.py'),
                            "execution_type": parsed_params.get('execution_type', 'package')
                        }
                        await create_agent(naptha, agent_config)
                else:
                    print("Invalid command.")
            elif args.command == "orchestrators":
                if not args.orchestrator_name:
                    await list_orchestrators(naptha)
                elif args.delete and len(args.orchestrator_name.split()) == 1:
                    await naptha.hub.delete_orchestrator(args.orchestrator_name)
                elif len(args.orchestrator_name.split()) == 1:
                    if hasattr(args, 'metadata') and args.metadata is not None:
                        params = shlex.split(args.metadata)
                        parsed_params = {}
                        for param in params:
                            key, value = param.split('=')
                            parsed_params[key] = value

                        required_metadata = ['description', 'parameters', 'module_url']
                        if not all(param in parsed_params for param in required_metadata):
                            print(f"Missing one or more of the following required metadata: {required_metadata}")
                            return
                            
                        orchestrator_config = {
                            "id": f"orchestrator:{args.orchestrator_name}",
                            "name": args.orchestrator_name,
                            "description": parsed_params['description'],
                            "parameters": parsed_params['parameters'],
                            "author": f"user:{naptha.hub.public_key}",
                            "module_url": parsed_params['module_url'],
                            "module_type": parsed_params.get('module_type', 'orchestrator'),
                            "module_version": parsed_params.get('module_version', '0.1'),
                            "module_entrypoint": parsed_params.get('module_entrypoint', 'run.py'),
                            "execution_type": parsed_params.get('execution_type', 'package')
                        }
                        await create_orchestrator(naptha, orchestrator_config)
                else:
                    print("Invalid command.")
            elif args.command == "environments":
                if not args.environment_name:
                    await list_environments(naptha)
                elif args.delete and len(args.environment_name.split()) == 1:
                    await naptha.hub.delete_environment(args.environment_name)
                elif len(args.environment_name.split()) == 1:
                    if hasattr(args, 'metadata') and args.metadata is not None:
                        params = shlex.split(args.metadata)
                        parsed_params = {}
                        for param in params:
                            key, value = param.split('=')
                            parsed_params[key] = value

                        required_metadata = ['description', 'parameters', 'module_url']
                        if not all(param in parsed_params for param in required_metadata):
                            print(f"Missing one or more of the following required metadata: {required_metadata}")
                            return
                            
                        environment_config = {
                            "id": f"environment:{args.environment_name}",
                            "name": args.environment_name,
                            "description": parsed_params['description'],
                            "parameters": parsed_params['parameters'],
                            "author": f"user:{naptha.hub.public_key}",
                            "module_url": parsed_params['module_url'],
                            "module_type": parsed_params.get('module_type', 'environment'),
                            "module_version": parsed_params.get('module_version', '0.1'),
                            "module_entrypoint": parsed_params.get('module_entrypoint', 'run.py'),
                            "execution_type": parsed_params.get('execution_type', 'package')
                        }
                        await create_environment(naptha, environment_config)
                else:
                    print("Invalid command.")
            elif args.command == "tools":
                if not args.tool_name:
                    await list_tools(naptha)
                elif args.delete and len(args.tool_name.split()) == 1:
                    await naptha.hub.delete_tool(args.tool_name)
                elif len(args.tool_name.split()) == 1:
                    if hasattr(args, 'metadata') and args.metadata is not None:
                        params = shlex.split(args.metadata)
                        parsed_params = {}
                        for param in params:
                            key, value = param.split('=')
                            parsed_params[key] = value

                        required_metadata = ['description', 'parameters', 'module_url']
                        if not all(param in parsed_params for param in required_metadata):
                            print(f"Missing one or more of the following required metadata: {required_metadata}")
                            return
                            
                        tool_config = {
                            "id": f"tool:{args.tool_name}",
                            "name": args.tool_name,
                            "description": parsed_params['description'],
                            "parameters": parsed_params['parameters'],
                            "author": f"user:{naptha.hub.public_key}",
                            "module_url": parsed_params['module_url'],
                            "module_type": parsed_params.get('module_type', 'tool'),
                            "module_version": parsed_params.get('module_version', '0.1'),
                            "module_entrypoint": parsed_params.get('module_entrypoint', 'run.py'),
                            "execution_type": parsed_params.get('execution_type', 'package')
                        }
                        await naptha.hub.create_tool(tool_config)
                else:
                    print("Invalid command.")
            elif args.command == "personas":
                if not args.persona_name:
                    await list_personas(naptha)
                elif args.delete and len(args.persona_name.split()) == 1:
                    await naptha.hub.delete_persona(args.persona_name)
                elif len(args.persona_name.split()) == 1:
                    if hasattr(args, 'metadata') and args.metadata is not None:
                        params = shlex.split(args.metadata)
                        parsed_params = {}
                        for param in params:
                            key, value = param.split('=')
                            parsed_params[key] = value

                        required_metadata = ['description', 'parameters', 'module_url']
                        if not all(param in parsed_params for param in required_metadata):
                            print(f"Missing one or more of the following required metadata: {required_metadata}")
                            return
                            
                        persona_config = {
                            "id": f"persona:{args.persona_name}",
                            "name": args.persona_name,
                            "description": parsed_params['description'],
                            "parameters": parsed_params['parameters'],
                            "author": f"user:{naptha.hub.public_key}",
                            "module_url": parsed_params['module_url'],
                            "module_type": parsed_params.get('module_type', 'persona'),
                            "module_version": parsed_params.get('module_version', '0.1'),
                            "module_entrypoint": parsed_params.get('module_entrypoint', 'run.py'),
                            "execution_type": parsed_params.get('execution_type', 'package')
                        }
                        await create_persona(naptha, persona_config)
                else:
                    print("Invalid command.")
            elif args.command == "memories":
                if not args.memory_name:
                    # List all memories
                    await list_memories(naptha)
                elif args.list:
                    # List content of specific memory
                    await list_memory_content(naptha, args.memory_name)
                elif args.add:
                    # Add data to memory
                    if not args.content:
                        console = Console()
                        console.print("[red]Data is required for add command.[/red]")
                        return
                    await add_data_to_memory(naptha, args.memory_name, args.content, user_id=f"user:{naptha.hub.public_key}", memory_node_url=args.memory_node_urls[0])
                elif args.delete and len(args.memory_name.split()) == 1:
                    await naptha.hub.delete_memory(args.memory_name)
                elif len(args.memory_name.split()) == 1:
                    if hasattr(args, 'metadata') and args.metadata is not None:
                        params = shlex.split(args.metadata)
                        parsed_params = {}
                        for param in params:
                            key, value = param.split('=')
                            parsed_params[key] = value

                        required_metadata = ['description', 'parameters', 'module_url']
                        if not all(param in parsed_params for param in required_metadata):
                            print(f"Missing one or more of the following required metadata: {required_metadata}")
                            return
                            
                        memory_config = {
                            "id": f"memory:{args.memory_name}",
                            "name": args.memory_name,
                            "description": parsed_params['description'],
                            "parameters": parsed_params['parameters'],
                            "author": f"user:{naptha.hub.public_key}",
                            "module_url": parsed_params['module_url'],
                            "module_type": parsed_params.get('module_type', 'memory'),
                            "module_version": parsed_params.get('module_version', '0.1'),
                            "module_entrypoint": parsed_params.get('module_entrypoint', 'run.py'),
                            "execution_type": parsed_params.get('execution_type', 'package')
                        }
                        await naptha.hub.create_memory(memory_config)
                else:
                    # Show specific memory info
                    await list_memories(naptha, args.memory_name)
            elif args.command == "kbs":
                if not args.kb_name:
                    # List all knowledge bases
                    await list_kbs(naptha)
                elif args.delete and len(args.kb_name.split()) == 1:
                    await naptha.hub.delete_kb(args.kb_name)
                elif len(args.kb_name.split()) == 1:
                    if hasattr(args, 'metadata') and args.metadata is not None:
                        params = shlex.split(args.metadata)
                        parsed_params = {}
                        for param in params:
                            key, value = param.split('=')
                            parsed_params[key] = value

                        required_metadata = ['description', 'parameters', 'module_url']
                        if not all(param in parsed_params for param in required_metadata):
                            print(f"Missing one or more of the following required metadata: {required_metadata}")
                            return
                            
                        kb_config = {
                            "id": f"kb:{args.kb_name}",
                            "name": args.kb_name,
                            "description": parsed_params['description'],
                            "parameters": parsed_params['parameters'],
                            "author": f"user:{naptha.hub.public_key}",
                            "module_url": parsed_params['module_url'],
                            "module_type": parsed_params.get('module_type', 'kb'),
                            "module_version": parsed_params.get('module_version', '0.1'),
                            "module_entrypoint": parsed_params.get('module_entrypoint', 'run.py'),
                            "execution_type": parsed_params.get('execution_type', 'package')
                        }
                        await naptha.hub.create_kb(kb_config)
                else:
                    # Show specific knowledge base info
                    await list_kbs(naptha, args.kb_name)
            elif args.command == "create":
                await create(naptha, args.module, args.agent_modules, args.agent_nodes, args.environment_modules, args.environment_nodes)
            elif args.command == "run":                    
                await run(naptha, args.agent, args.parameters, args.agent_nodes, args.tool_nodes, args.environment_nodes, args.kb_nodes, args.memory_nodes, args.file, args.persona_modules)
            elif args.command == "inference":
                request = ChatCompletionRequest(
                    messages=[{"role": "user", "content": args.prompt}],
                    model=args.model,
                )
                await naptha.inference_client.run_inference(request)
            elif args.command == "storage":
                await storage_interaction(
                    naptha, 
                    args.storage_type, 
                    args.operation, 
                    args.path, 
                    data=args.data, 
                    schema=args.schema, 
                    options=args.options, 
                    file=args.file
                )
            elif args.command == "publish":
                await naptha.publish_modules(args.decorator, args.register, args.subdeployments)
        else:
            parser.print_help()
async def run(naptha,
module_name,
parameters=None,
agent_nodes=None,
tool_nodes=None,
environment_nodes=None,
kb_nodes=None,
memory_nodes=None,
yaml_file=None,
persona_modules=None)
Expand source code
async def run(
    naptha,
    module_name,
    parameters=None, 
    agent_nodes=None,
    tool_nodes=None,
    environment_nodes=None,
    kb_nodes=None,
    memory_nodes=None,
    yaml_file=None, 
    persona_modules=None
):   
    if yaml_file and parameters:
        raise ValueError("Cannot pass both yaml_file and parameters")
    
    if yaml_file:
        parameters = load_yaml_to_dict(yaml_file)

    module_type = module_name.split(":")[0] if ":" in module_name else "agent" # Default to agent for backwards compatibility

    user = await naptha.node.check_user(user_input={"public_key": naptha.hub.public_key})

    if user['is_registered'] == True:
        print("Found user...", user)
    else:
        print("No user found. Registering user...")
        user = await naptha.node.register_user(user_input=user)
        print(f"User registered: {user}.")

    # Handle sub-deployments
    agent_deployments = []
    if agent_nodes:
        for agent_node in agent_nodes:
            agent_deployments.append(AgentDeployment(node=NodeConfigUser(ip=agent_node.strip())))
    tool_deployments = []
    if tool_nodes:
        for tool_node in tool_nodes:
            tool_deployments.append(ToolDeployment(node=NodeConfigUser(ip=tool_node.strip())))
    environment_deployments = []
    if environment_nodes:
        for environment_node in environment_nodes:
            environment_deployments.append(EnvironmentDeployment(node=NodeConfigUser(ip=environment_node.strip())))
    kb_deployments = []
    if kb_nodes:
        for kb_node in kb_nodes:
            kb_deployments.append(KBDeployment(node=NodeConfigUser(ip=kb_node.strip())))
    memory_deployments = []
    if memory_nodes:
        for memory_node in memory_nodes:
            memory_deployments.append(MemoryDeployment(node=NodeConfigUser(ip=memory_node.strip())))


    if module_type == "agent":
        print("Running Agent...")

        agent_deployment = AgentDeployment(
            module={"id": module_name, "name": module_name.split(":")[-1], "module_type": module_type}, 
            node=url_to_node(os.getenv("NODE_URL")), 
            config={"persona_module": {"name": persona_modules[0]}} if persona_modules else None,
            tool_deployments=tool_deployments,
            kb_deployments=kb_deployments,
            memory_deployments=memory_deployments,
            environment_deployments=environment_deployments
        )

        agent_run_input = {
            'consumer_id': user['id'],
            "inputs": parameters,
            "deployment": agent_deployment.model_dump(),
            "signature": sign_consumer_id(user['id'], os.getenv("PRIVATE_KEY"))
        }
        print(f"Agent run input: {agent_run_input}")

        agent_run = await naptha.node.run_agent_and_poll(agent_run_input)

    elif module_type == "tool":
        print("Running Tool...")
        tool_deployment = ToolDeployment(
            module={"id": module_name, "name": module_name.split(":")[-1], "module_type": module_type},
            node=url_to_node(os.getenv("NODE_URL")))

        tool_run_input = ToolRunInput(
            consumer_id=user['id'],
            inputs=parameters,
            deployment=tool_deployment,
            signature=sign_consumer_id(user['id'], os.getenv("PRIVATE_KEY"))
        )
        tool_run = await naptha.node.run_tool_and_poll(tool_run_input)

    elif module_type == "orchestrator":
        print("Running Orchestrator...")

        orchestrator_deployment = OrchestratorDeployment(
            module={"id": module_name, "name": module_name.split(":")[-1], "module_type": module_type}, 
            node=url_to_node(os.getenv("NODE_URL")),
            agent_deployments=agent_deployments,
            environment_deployments=environment_deployments,
            kb_deployments=kb_deployments,
            memory_deployments=memory_deployments,
        )

        orchestrator_run_input = OrchestratorRunInput(
            consumer_id=user['id'],
            inputs=parameters,
            deployment=orchestrator_deployment,
            signature=sign_consumer_id(user['id'], os.getenv("PRIVATE_KEY"))
        )
        orchestrator_run = await naptha.node.run_orchestrator_and_poll(orchestrator_run_input)

    elif module_type == "environment":
        print("Running Environment...")

        environment_deployment = EnvironmentDeployment(
            module={"id": module_name, "name": module_name.split(":")[-1], "module_type": module_type}, 
            node=url_to_node(os.getenv("NODE_URL"))
        )

        environment_run_input = EnvironmentRunInput(
            inputs=parameters,
            deployment=environment_deployment,
            consumer_id=user['id'],
            signature=sign_consumer_id(user['id'], os.getenv("PRIVATE_KEY"))
        )
        environment_run = await naptha.node.run_environment_and_poll(environment_run_input)

    elif module_type == "kb":
        print("Running Knowledge Base...")

        kb_deployment = KBDeployment(
            module={"id": module_name, "name": module_name.split(":")[-1], "module_type": module_type}, 
            node=url_to_node(os.getenv("NODE_URL"))
        )

        kb_run_input = KBRunInput(
            consumer_id=user['id'],
            inputs=parameters,
            deployment=kb_deployment,
            signature=sign_consumer_id(user['id'], os.getenv("PRIVATE_KEY"))
        )
        kb_run = await naptha.node.run_kb_and_poll(kb_run_input)
    elif module_type == "memory":
        print("Running Memory Module...")

        memory_deployment = MemoryDeployment(
            module={"id": module_name, "name": module_name.split(":")[-1], "module_type": module_type}, 
            node=url_to_node(os.getenv("NODE_URL"))
        )

        memory_run_input = MemoryRunInput(
            consumer_id=user['id'],
            inputs=parameters,
            deployment=memory_deployment,
            signature=sign_consumer_id(user['id'], os.getenv("PRIVATE_KEY"))
        )
        memory_run = await naptha.node.run_memory_and_poll(memory_run_input)     
    else:
        print(f"Module type {module_type} not supported.")
async def storage_interaction(naptha, storage_type, operation, path, data=None, schema=None, options=None, file=None)
Expand source code
async def storage_interaction(naptha, storage_type, operation, path, data=None, schema=None, options=None, file=None):
    """Handle storage interactions using StorageProvider"""
    storage_provider = StorageProvider(naptha.node.node)
    print(f"Storage interaction: {storage_type}, {operation}, {path}, {data}, {schema}, {options}, {file}")

    try:
        # Convert string storage type to enum
        storage_type = StorageType(storage_type)

        # Special handling for filesystem/IPFS file operations
        if storage_type in [StorageType.FILESYSTEM, StorageType.IPFS]:
            if operation == "create" and file:
                with open(file, 'rb') as f:
                    request = CreateStorageRequest(
                        storage_type=storage_type,
                        path=path,
                        file=f,
                        options=json.loads(options) if options else {}
                    )
                    result = await storage_provider.execute(request)
                    return result
                    
            elif operation == "read":
                request = ReadStorageRequest(
                    storage_type=storage_type,
                    path=path,
                    options=json.loads(options) if options else {}
                )
                result = await storage_provider.execute(request)
                
                # Handle downloaded file
                if isinstance(result.data, bytes):
                    output_dir = "./downloads"
                    os.makedirs(output_dir, exist_ok=True)
                    output_path = os.path.join(output_dir, os.path.basename(path))
                    with open(output_path, 'wb') as f:
                        f.write(result.data)
                    print(f"File downloaded to: {output_path}")
                return result

        # Handle database and other operations
        match operation:
            case "create":
                if schema:
                    request = CreateStorageRequest(
                        storage_type=storage_type,
                        path=path,
                        data=json.loads(schema)
                    )
                elif data:
                    request = CreateStorageRequest(
                        storage_type=storage_type,
                        path=path,
                        data=json.loads(data)
                    )
                else:
                    raise ValueError("Either schema or data must be provided for create command")
                    
            case "read":
                request = ReadStorageRequest(
                    storage_type=storage_type,
                    path=path,
                    options=json.loads(options) if options else {}
                )
                
            case "update":
                if not data:
                    raise ValueError("Data must be provided for update command")
                request = UpdateStorageRequest(
                    storage_type=storage_type,
                    path=path,
                    data=json.loads(data),
                    options=json.loads(options) if options else {}
                )
                
            case "delete":
                request = DeleteStorageRequest(
                    storage_type=storage_type,
                    path=path,
                    options=json.loads(options) if options else {}
                )
                
            case "list":
                request = ListStorageRequest(
                    storage_type=storage_type,
                    path=path,
                    options=json.loads(options) if options else {}
                )
                
            case "search":
                if not data:
                    raise ValueError("Query data must be provided for search command")
                request = SearchStorageRequest(
                    storage_type=storage_type,
                    path=path,
                    query=json.loads(data),
                    options=json.loads(options) if options else {}
                )

        result = await storage_provider.execute(request)
        print(result)
        return result

    except Exception as e:
        print(f"Storage operation failed: {str(e)}")
        raise

Handle storage interactions using StorageProvider