Module naptha_sdk.client.naptha
Functions
def agent(name)
-
Expand source code
def agent(name): def decorator(func): frame = inspect.currentframe() caller_frame = frame.f_back instantiation_file = caller_frame.f_code.co_filename variables = scrape_init(instantiation_file) params = scrape_func_params(func) agent_code, obj_name, local_modules, selective_import_modules, standard_import_modules, variable_modules, union_modules = scrape_func(func, variables) agent_code = render_agent_code(name, agent_code, obj_name, local_modules, selective_import_modules, standard_import_modules, variable_modules, union_modules, params) init_agent_package(name) write_code_to_package(name, agent_code) add_dependencies_to_pyproject(name, selective_import_modules + standard_import_modules) add_files_to_package(name, params, os.getenv("HUB_USERNAME")) loop = asyncio.get_event_loop() if loop.is_running(): asyncio.ensure_future(Naptha().create_agent(name)) else: loop.run_until_complete(Naptha().create_agent(name)) return func return decorator
Classes
class Agent (name, fn, agent_node_url)
-
Expand source code
class Agent: def __init__(self, name, fn, agent_node_url, ): self.name = name self.fn = fn self.agent_node_url = agent_node_url self.repo_id = name
class Naptha
-
Expand source code
class Naptha: """The entry point into Naptha.""" def __init__(self): self.public_key = get_public_key(os.getenv("PRIVATE_KEY")) if os.getenv("PRIVATE_KEY") else None self.user = User(id=f"user:{self.public_key}") self.hub_username = os.getenv("HUB_USERNAME", None) self.hub_url = os.getenv("HUB_URL", None) node_url = os.getenv("NODE_URL") if node_url is None: raise ValueError("NODE_URL is not set. Make sure your project has a .env file with a NODE_URL variable.") self.node = UserClient(url_to_node(node_url)) self.inference_client = InferenceClient(url_to_node(node_url)) self.hub = Hub(self.hub_url, self.public_key) async def __aenter__(self): """Async enter method for context manager""" await self.hub.connect() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async exit method for context manager""" await self.hub.close() async def create_agent(self, name): async with self.hub: _, _, user_id = await self.hub.signin(self.hub_username, os.getenv("HUB_PASSWORD")) agent_config = { "id": f"agent:{name}", "name": name, "description": name, "author": self.hub.user_id, "module_url": "None", "module_type": "agent", "module_version": "0.1", "execution_type": "agent" } logger.info(f"Registering Agent {agent_config}") agent = await self.hub.create_or_update_agent(agent_config) if agent: logger.info(f"Agent {name} created successfully") else: logger.error(f"Failed to create agent {name}") async def publish_modules(self, decorator = False, register = None, subdeployments = False): logger.info(f"Publishing Agent Packages...") start_time = time.time() if not decorator: module_path = Path.cwd() deployment_path = module_path / module_path.name / 'configs/deployment.json' with open(deployment_path, 'r') as f: deployment = json.load(f) module = deployment[0]['module'] modules = [module] if subdeployments: deployment = await setup_module_deployment(module['module_type'], deployment_path) for module_type in ['agent', 'kb', 'tool', 'environment']: subdeployment = module_type + '_deployments' if hasattr(deployment, subdeployment) and getattr(deployment, subdeployment): for submodule in getattr(deployment, subdeployment): modules.append(submodule.module) else: path = Path.cwd() / AGENT_DIR modules = [item.name for item in path.iterdir() if item.is_dir()] for module in modules: git_add_commit(module) module = { "name": module, "description": module, "parameters": "None", "module_type": "agent", "module_url": "None", "module_version": "v0.1", "module_entrypoint": "run.py", "execution_type": "package" } for module in modules: if "module_url" in module and module['module_url'] is not "None": module_url = module['module_url'] # For decorator=False, only the main module should not have a module_url else: # If register is a string, use it as the URL if isinstance(register, str): module_url = register logger.info(f"Using provided URL for {module['module_type']} {module['name']}: {module_url}") # Otherwise, publish to IPFS else: _, response = await publish_ipfs_package(module['name'], decorator) module_url = f'ipfs://{response["ipfs_hash"]}' logger.info(f"Storing {module['module_type']} {module['name']} on IPFS") logger.info(f"IPFS Hash: {response['ipfs_hash']}. You can download it from http://provider.akash.pro:30584/ipfs/{response['ipfs_hash']}") if register: # Register module with hub async with self.hub: _, _, user_id = await self.hub.signin(self.hub_username, os.getenv("HUB_PASSWORD")) module_config = { "id": f"{module['module_type']}:{module['name']}", "name": module['name'], "description": module['description'], "parameters": module['parameters'], "author": self.hub.user_id, "module_url": module_url, "module_type": module['module_type'], "module_version": module['module_version'], "module_entrypoint": module['module_entrypoint'], "execution_type": module['execution_type'], } logger.info(f"Registering {module['module_type']} {module['name']} on Naptha Hub {module_config}") module = await self.hub.create_or_update_module(module['module_type'], module_config) end_time = time.time() total_time = end_time - start_time logger.info(f"Total time taken to publish {len(modules)} modules: {total_time:.2f} seconds") def build(self): asyncio.run(self.build_agents()) async def connect_publish(self): await self.hub.connect() await self.hub.signin(os.getenv("HUB_USERNAME"), os.getenv("HUB_PASSWORD")) await self.publish_agents() await self.hub.close() def publish(self): asyncio.run(self.connect_publish())
The entry point into Naptha.
Methods
def build(self)
-
Expand source code
def build(self): asyncio.run(self.build_agents())
async def connect_publish(self)
-
Expand source code
async def connect_publish(self): await self.hub.connect() await self.hub.signin(os.getenv("HUB_USERNAME"), os.getenv("HUB_PASSWORD")) await self.publish_agents() await self.hub.close()
async def create_agent(self, name)
-
Expand source code
async def create_agent(self, name): async with self.hub: _, _, user_id = await self.hub.signin(self.hub_username, os.getenv("HUB_PASSWORD")) agent_config = { "id": f"agent:{name}", "name": name, "description": name, "author": self.hub.user_id, "module_url": "None", "module_type": "agent", "module_version": "0.1", "execution_type": "agent" } logger.info(f"Registering Agent {agent_config}") agent = await self.hub.create_or_update_agent(agent_config) if agent: logger.info(f"Agent {name} created successfully") else: logger.error(f"Failed to create agent {name}")
def publish(self)
-
Expand source code
def publish(self): asyncio.run(self.connect_publish())
async def publish_modules(self, decorator=False, register=None, subdeployments=False)
-
Expand source code
async def publish_modules(self, decorator = False, register = None, subdeployments = False): logger.info(f"Publishing Agent Packages...") start_time = time.time() if not decorator: module_path = Path.cwd() deployment_path = module_path / module_path.name / 'configs/deployment.json' with open(deployment_path, 'r') as f: deployment = json.load(f) module = deployment[0]['module'] modules = [module] if subdeployments: deployment = await setup_module_deployment(module['module_type'], deployment_path) for module_type in ['agent', 'kb', 'tool', 'environment']: subdeployment = module_type + '_deployments' if hasattr(deployment, subdeployment) and getattr(deployment, subdeployment): for submodule in getattr(deployment, subdeployment): modules.append(submodule.module) else: path = Path.cwd() / AGENT_DIR modules = [item.name for item in path.iterdir() if item.is_dir()] for module in modules: git_add_commit(module) module = { "name": module, "description": module, "parameters": "None", "module_type": "agent", "module_url": "None", "module_version": "v0.1", "module_entrypoint": "run.py", "execution_type": "package" } for module in modules: if "module_url" in module and module['module_url'] is not "None": module_url = module['module_url'] # For decorator=False, only the main module should not have a module_url else: # If register is a string, use it as the URL if isinstance(register, str): module_url = register logger.info(f"Using provided URL for {module['module_type']} {module['name']}: {module_url}") # Otherwise, publish to IPFS else: _, response = await publish_ipfs_package(module['name'], decorator) module_url = f'ipfs://{response["ipfs_hash"]}' logger.info(f"Storing {module['module_type']} {module['name']} on IPFS") logger.info(f"IPFS Hash: {response['ipfs_hash']}. You can download it from http://provider.akash.pro:30584/ipfs/{response['ipfs_hash']}") if register: # Register module with hub async with self.hub: _, _, user_id = await self.hub.signin(self.hub_username, os.getenv("HUB_PASSWORD")) module_config = { "id": f"{module['module_type']}:{module['name']}", "name": module['name'], "description": module['description'], "parameters": module['parameters'], "author": self.hub.user_id, "module_url": module_url, "module_type": module['module_type'], "module_version": module['module_version'], "module_entrypoint": module['module_entrypoint'], "execution_type": module['execution_type'], } logger.info(f"Registering {module['module_type']} {module['name']} on Naptha Hub {module_config}") module = await self.hub.create_or_update_module(module['module_type'], module_config) end_time = time.time() total_time = end_time - start_time logger.info(f"Total time taken to publish {len(modules)} modules: {total_time:.2f} seconds")