Module naptha_sdk.module_manager
Functions
def add_dependencies_to_pyproject(package_name, packages)
-
Expand source code
def add_dependencies_to_pyproject(package_name, packages): # Adds dependencies with wildcard versioning with open(f"{AGENT_DIR}/{package_name}/pyproject.toml", 'r', encoding='utf-8') as file: data = tomlkit.parse(file.read()) dependencies = data['tool']['poetry']['dependencies'] dependencies["python"] = ">=3.10,<3.13" dependencies["naptha-sdk"] = { "git": "https://github.com/NapthaAI/naptha-sdk.git", "branch": "feat/run-agent-tools" } packages_to_add = [] for package in packages: curr_package = package['module'].split('.')[0] if curr_package not in packages_to_add and not is_std_lib(curr_package): dependencies[curr_package] = PACKAGE_VERSIONS.get(curr_package, "*") dependencies["python-dotenv"] = "*" # Serialize the TOML data and write it back to the file with open(f"{AGENT_DIR}/{package_name}/pyproject.toml", 'w', encoding='utf-8') as file: file.write(tomlkit.dumps(data))
def add_files_to_package(agent_name, params, user_id)
-
Expand source code
def add_files_to_package(agent_name, params, user_id): package_path = f'{AGENT_DIR}/{agent_name}' # Generate schema and component yaml generate_schema(agent_name, params) generate_component_yaml(agent_name, user_id) # Create .env.example file env_example_path = os.path.join(package_path, '.env.example') with open(env_example_path, 'w') as env_file: env_file.write('OPENAI_API_KEY=\n')
def extract_dependencies(module, modules)
-
Expand source code
def extract_dependencies(module, modules): dependencies = [] for mod in modules: if mod['name'] != module['name']: # Use a negative lookahead to exclude matches within quotes pattern = r'\b' + re.escape(mod['name']) + r'\b(?=([^"\']*["\'][^"\']*["\'])*[^"\']*$)' if re.search(pattern, module['source']): dependencies.append(mod['name']) return dependencies
def generate_component_yaml(agent_name, user_id)
-
Expand source code
def generate_component_yaml(agent_name, user_id): component = { 'name': agent_name, 'type': agent_name, 'author': user_id, 'version': '0.1.0', 'description': agent_name, 'license': 'MIT', 'models': { 'default_model_provider': 'ollama', 'ollama': { 'model': 'ollama/llama3.1:70b', 'max_tokens': 1000, 'temperature': 0, 'api_base': 'http://localhost:11434' } }, 'inputs': { 'system_message': 'You are a helpful AI assistant.', 'save': False, 'location': 'node' }, 'outputs': { 'filename': 'output.txt', 'save': False, 'location': 'node' }, 'implementation': { 'package': { 'entrypoint': 'run.py' } } } with open(f'{AGENT_DIR}/{agent_name}/{agent_name}/component.yaml', 'w') as file: yaml.dump(component, file, default_flow_style=False)
def generate_schema(agent_name, params)
-
Expand source code
def generate_schema(agent_name, params): schema_code = '''from pydantic import BaseModel from typing import Any class InputSchema(BaseModel): tool_name: str tool_input_type: str tool_input_value: dict ''' for name, info in params.items(): print("INFO", name, info) if info['value'] is None: if 'List' in str(info['type']): schema_code += f' {name}: list\n' elif info['type'] is None: schema_code += f' {name}: Any\n' elif issubclass(info["type"], BaseModel): schema_code += f' {name}: dict\n' else: schema_code += f' {name}: {info["type"].__name__}\n' else: if 'List' in str(info['type']): schema_code += f' {name}: list = {info["value"]}\n' elif info['type'] is None: schema_code += f' {name}: Any = {info["value"]}\n' elif issubclass(info["type"], BaseModel): schema_code += f' {name}: dict = {info["value"]}\n' else: schema_code += f' {name}: {info["type"].__name__} = {info["value"]}\n' with open(f'{AGENT_DIR}/{agent_name}/{agent_name}/schemas.py', 'w') as file: file.write(schema_code)
def git_add_commit(agent_name)
-
Expand source code
def git_add_commit(agent_name): subprocess.run(["git", "-C", f"{AGENT_DIR}/{agent_name}", "add", "-A"]) subprocess.run(["git", "-C", f"{AGENT_DIR}/{agent_name}", "commit", "-m", "Initial commit"]) subprocess.run(["git", "-C", f"{AGENT_DIR}/{agent_name}", "tag", "-f", "v0.1"])
def init_agent_package(package_name)
-
Expand source code
def init_agent_package(package_name): subprocess.run(["poetry", "new", f"{AGENT_DIR}/{package_name}"]) subprocess.run(["git", "init", f"{AGENT_DIR}/{package_name}"])
def is_std_lib(module_name)
-
Expand source code
def is_std_lib(module_name): try: module_spec = importlib.util.find_spec(module_name) return module_spec is not None and 'site-packages' not in module_spec.origin except ImportError: return False
def load_input_schema(repo_name)
-
Expand source code
def load_input_schema(repo_name): """Loads the input schema""" schemas_module = importlib.import_module(f"{repo_name}.schemas") input_schema = getattr(schemas_module, "Persona") return input_schema
Loads the input schema
async def load_persona(persona_module)
-
Expand source code
async def load_persona(persona_module): """Load persona from a JSON or YAML file in a git repository.""" hub_username = os.getenv("HUB_USERNAME") hub_password = os.getenv("HUB_PASSWORD") hub_url = os.getenv("HUB_URL") if not hub_username or not hub_password or not hub_url: raise ValueError("HUB_USERNAME, HUB_PASSWORD, and HUB_URL environment variables must be set") async with Hub(hub_url) as hub: success, _, _ = await hub.signin(hub_username, hub_password) if not success: raise ConnectionError(f"Failed to authenticate with Hub.") personas = await hub.list_personas(persona_module['name']) persona = personas[0] persona_url = persona['module_url'] # Clone the repo repo_name = persona_url.split('/')[-1] repo_path = Path(f"{AGENT_DIR}/{repo_name}") # Remove existing repo if it exists if repo_path.exists(): import shutil shutil.rmtree(repo_path) _ = Repo.clone_from(persona_url, to_path=str(repo_path)) persona_file = repo_path / persona['module_entrypoint'] if not persona_file.exists(): logger.error(f"Persona file not found in repository {repo_name}") return None # Load based on file extension with persona_file.open('r') as f: if persona_file.suffix == '.json': persona_data = json.load(f) elif persona_file.suffix in ['.yml', '.yaml']: persona_data = yaml.safe_load(f) else: logger.error(f"Unsupported file type {persona_file.suffix} in {repo_name}") return None # input_schema = load_input_schema(repo_name) return persona_data
Load persona from a JSON or YAML file in a git repository.
async def publish_ipfs_package(agent_name, decorator=False)
-
Expand source code
async def publish_ipfs_package(agent_name, decorator = False): package_path = f"{AGENT_DIR}/{agent_name}" if not decorator: output_zip_file = zip_dir_with_gitignore(Path.cwd()) else: output_zip_file = zip_dir(package_path) success, response = await write_to_ipfs(output_zip_file) logger.info(f"Response: {response}") return success, response
def read_gitignore(directory)
-
Expand source code
def read_gitignore(directory): gitignore_path = os.path.join(directory, '.gitignore') if not os.path.exists(gitignore_path): logger.info(f"No .gitignore file found in {directory}") return [] with open(gitignore_path, 'r') as file: lines = file.readlines() ignored_files = [line.strip() for line in lines if line.strip() and not line.startswith('#')] return ignored_files
def render_agent_code(agent_name,
agent_code,
obj_name,
local_modules,
selective_import_modules,
standard_import_modules,
variable_modules,
union_modules,
params)-
Expand source code
def render_agent_code(agent_name, agent_code, obj_name, local_modules, selective_import_modules, standard_import_modules, variable_modules, union_modules, params): # Add the imports for installed modules (e.g. crewai) content = '' for module in standard_import_modules: line = f'import {module["name"]} \n' content += line for module in selective_import_modules: line = f'from {module["module"]} import {module["name"]} \n' content += line for module in variable_modules: if module["module"] and module["import_needed"]: content += f'from {module["module"]} import {module["name"]} \n' if any('crewai' in module['module'] for module in selective_import_modules): content += "from crewai import Task\n" for module in union_modules: content += module['source'] # Add the naptha imports and logger setup naptha_imports = f'''from dotenv import load_dotenv from {agent_name}.schemas import InputSchema from naptha_sdk.utils import get_logger logger = get_logger(__name__) load_dotenv() ''' content += naptha_imports for module in selective_import_modules: if 'source' in module and module['source']: content += module['source'] + "\n" # Add the source code for the local modules for module in local_modules: content += module['source'] + "\n" for module in variable_modules: content += module['source'] + "\n" # Convert class method to function agent_code = agent_code.replace('self.', '') agent_code = agent_code.replace('self', '') content += textwrap.dedent(agent_code) + "\n\n" param_str = ", ".join(f"inputs.{name}" for name, info in params.items()) # Define the new function signature content += f"""def run(inputs: InputSchema, *args, **kwargs): {agent_name}_0 = {obj_name}({param_str}) tool_input_class = globals().get(inputs.tool_input_type) tool_input = tool_input_class(**inputs.tool_input_value) method = getattr({agent_name}_0, inputs.tool_name, None) return method(tool_input) if __name__ == "__main__": from naptha_sdk.utils import load_yaml from {agent_name}.schemas import InputSchema cfg_path = "{agent_name}/component.yaml" cfg = load_yaml(cfg_path) # You will likely need to change the inputs dict inputs = {{"tool_name": "execute_task", "tool_input_type": "Task", "tool_input_value": {{"description": "What is the market cap of AMZN?", "expected_output": "The market cap of AMZN"}}}} inputs = InputSchema(**inputs) response = run(inputs) print(response) """ return content
def sort_modules(modules, dependencies)
-
Expand source code
def sort_modules(modules, dependencies): sorted_modules = [] unsorted_modules = modules.copy() while unsorted_modules: for mod in unsorted_modules: mod_deps = dependencies[mod['name']] if all(dep in [m['name'] for m in sorted_modules] for dep in mod_deps): sorted_modules.append(mod) unsorted_modules.remove(mod) break return sorted_modules
def write_code_to_package(agent_name, code)
-
Expand source code
def write_code_to_package(agent_name, code): package_path = f'{AGENT_DIR}/{agent_name}' code_path = os.path.join(package_path, agent_name, 'run.py') os.makedirs(os.path.dirname(code_path), exist_ok=True) with open(code_path, 'w') as file: file.write(code)
async def write_to_ipfs(file_path)
-
Expand source code
async def write_to_ipfs(file_path): """Write a file to IPFS, optionally publish to IPNS or update an existing IPNS record.""" try: logger.info(f"Writing file to IPFS: {file_path}") if not IPFS_GATEWAY_URL: return (500, {"message": "IPFS_GATEWAY_URL not found"}) client = ipfshttpclient.connect(IPFS_GATEWAY_URL) with tempfile.NamedTemporaryFile(mode="wb", delete=False) as tmpfile: with open(file_path, "rb") as f: content = f.read() tmpfile.write(content) tmpfile_name = tmpfile.name result = client.add(tmpfile_name) client.pin.add(result["Hash"]) os.unlink(tmpfile_name) ipfs_hash = result["Hash"] response = { "message": "File written and pinned to IPFS", "ipfs_hash": ipfs_hash, } return (201, response) except Exception as e: logger.error(f"Error writing file to IPFS: {e}") import traceback logger.error(f"Error writing file to IPFS: {e}") logger.error(f"Traceback: {traceback.format_exc()}") return (500, {"message": f"Error writing file to IPFS: {e}"})
Write a file to IPFS, optionally publish to IPNS or update an existing IPNS record.
def zip_dir(directory_path: str) ‑> None
-
Expand source code
def zip_dir(directory_path: str) -> None: """ Zip the specified directory and write it to a file on disk. """ output_zip_file = f"{directory_path}.zip" with zipfile.ZipFile(output_zip_file, "w", zipfile.ZIP_DEFLATED) as zip_file: for root, dirs, files in os.walk(directory_path): for file in files: file_path = os.path.join(root, file) zip_file.write(file_path, os.path.relpath(file_path, directory_path)) print(f"Zipped directory '{directory_path}' to '{output_zip_file}'") return output_zip_file
Zip the specified directory and write it to a file on disk.
def zip_dir_with_gitignore(directory_path)
-
Expand source code
def zip_dir_with_gitignore(directory_path): ignored_files = read_gitignore(directory_path) output_zip_file = f"./{os.path.basename(directory_path)}.zip" # Convert patterns in .gitignore to absolute paths for comparison ignored_patterns = [os.path.join(directory_path, pattern) for pattern in ignored_files] with zipfile.ZipFile(output_zip_file, "w", zipfile.ZIP_DEFLATED) as zip_file: for root, dirs, files in os.walk(directory_path): dirs = [d for d in dirs if not any(fnmatch.fnmatch(os.path.join(root, d), pattern) for pattern in ignored_patterns)] for file in files: file_path = os.path.join(root, file) if any(fnmatch.fnmatch(file_path, pattern) for pattern in ignored_patterns): continue if file == output_zip_file.split('/')[1]: continue zip_file.write(file_path, os.path.relpath(file_path, directory_path)) logger.info(f"Zipped directory '{directory_path}' to '{output_zip_file}'") return output_zip_file