Module naptha_sdk.scrape

Functions

def get_obj_dependencies(context_globals, fn_code, processed=None)
Expand source code
def get_obj_dependencies(context_globals, fn_code, processed=None):
    if processed is None:
        processed = set()

    modules = []
    variables = []
    for name, obj in context_globals.items():
        if name in fn_code and obj not in processed:
            print("name", name, "obj", obj, type(obj), type(obj).__name__)
            if name.startswith("__"):
                continue

            processed.add(obj)  # Add the current object to the set of processed objects

            if "Union" in type(obj).__name__:
                print("Union", obj.__args__)
                args_str = ', '.join(arg.__name__ if hasattr(arg, '__name__') else str(arg) for arg in obj.__args__)
                variables.append({"type": "union", "target": name, "value": f"Union[{args_str}]"})
                for arg in obj.__args__:
                    if inspect.isclass(arg):
                        obj_info = {
                            'name': arg.__name__,
                            'module': arg.__module__,
                            'import_type': "selective",
                            'is_local': False
                        }
                        modules.append(obj_info)
            elif isinstance(obj, (int, str, float, bool)):
                variables.append({"type": "constant", "target": name, "value": f"{obj}"})
            elif inspect.ismodule(obj):
                obj_info = {
                    'name': name,
                    'module': obj.__name__,
                    'import_type': "standard",
                    'is_local': is_local_module(obj)
                }
                modules.append(obj_info)
            else:
                module = sys.modules.get(obj.__module__, None)
                if module and module not in processed:
                    is_local = is_local_module(module)
                    obj_info = {
                        'name': name,
                        'module': obj.__module__,
                        'import_type': "selective",
                        'is_local': is_local
                    }
                    if is_local:
                        if isinstance(obj, TypeVar):
                            obj_info['source'] = ""
                        else:
                            obj_info['source'] = inspect.getsource(obj)
                            new_modules, new_variables = get_obj_dependencies(module.__dict__, obj_info['source'], processed)
                            modules.extend(new_modules)
                            variables.extend(new_variables)
                    modules.append(obj_info)

    return modules, variables
def is_local_module(module)
Expand source code
def is_local_module(module):
    if not hasattr(module, '__file__'):
        return False  # Built-in modules don't have __file__

    module_path = os.path.abspath(module.__file__)
    current_dir = os.path.abspath(os.getcwd())

    # Check if the module is in the current directory or its subdirectories
    if module_path.startswith(current_dir):
        # Check if the module is in the .venv directory
        venv_dir = os.path.join(current_dir, '.venv')
        if module_path.startswith(venv_dir):
            return False  # It's in .venv, so it's an installed package
        return True  # It's in the project directory, but not in .venv
    
    return False  # It's outside the project directory
def scrape_func(func, variables)
Expand source code
def scrape_func(func, variables):
    fn_code = inspect.getsource(func)
    fn_name = func.__name__
    fn_code = "\n".join(line for line in fn_code.splitlines() if not line.strip().startswith("@"))

    # check which variables are used in the function
    used_variables = []
    for variable in variables:
        if variable['target'] in fn_code:
            used_variables.append(variable)

    if inspect.isfunction(func):
        context_globals = func.__globals__
    elif inspect.isclass(func):
        module = sys.modules[func.__module__]
        context_globals = module.__dict__

    modules, new_variables = get_obj_dependencies(context_globals, fn_code)
    used_variables.extend(new_variables)

    # Deal with variables from the main file
    for used_variable in used_variables:
        if used_variable['type'] == 'constant':
            if isinstance(used_variable['value'], str) and used_variable['value'].endswith('.yaml'):
                cwd = Path.cwd()
                full_path = Path(f"src/{cwd.name}/{used_variable['value']}")
                if full_path.exists():
                    with open(full_path, 'r') as file:
                        yaml_data = yaml.safe_load(file)
                    line = f"{used_variable['target']} = {yaml_data}\n"
                    class_info = {
                        'name': used_variable['target'],
                        'module': None,
                        'import_type': "variable",
                        'is_local': False
                    }
                    class_info['source'] = line
            else:
                line = f"{used_variable['target']} = {used_variable['value']}\n"
                class_info = {
                    'name': used_variable['target'],
                    'module': None,
                    'import_type': "variable",
                    'is_local': False,
                    'source': line
                }
            modules.append(class_info)
        elif used_variable['type'] == 'union':
            line = f"{used_variable['target']} = {used_variable['value']}\n"
            class_info = {
                'name': used_variable['target'],
                'module': None,
                'import_type': "union",
                'is_local': False
            }
            class_info['source'] = line
            modules.append(class_info)
        elif used_variable['type'] == 'call':
            # If the variable's class is not in modules, add it
            var_class = context_globals.get(used_variable['cls_name'])
            if var_class and inspect.isclass(var_class):
                module = sys.modules[var_class.__module__]
                class_info = {
                    'name': used_variable['cls_name'],
                    'module': var_class.__module__,
                    'import_type': "variable",
                    'is_local': False
                }
                line = f"{used_variable['target']} = {used_variable['cls_name']}("
                if 'keywords' in used_variable:
                    for kw, value in zip(used_variable['keywords'], used_variable['values']):
                        if isinstance(value, str):
                            line += f'{kw}="{value}", '
                        else:
                            line += f'{kw}={value}, '
                line += ")\n"
                class_info['source'] = line
                if any(module['name'] == used_variable['cls_name'] for module in modules):
                    class_info['import_needed'] = False
                modules.append(class_info)

    modules = [module for module in modules if module['name'] != 'logger']
    local_modules = [module for module in modules if module['is_local']]
    module_dependencies = {mod['name']: extract_dependencies(mod, local_modules) for mod in local_modules}
    local_modules = sort_modules(local_modules, module_dependencies) # Sort local modules based on dependencies
    selective_import_modules = [module for module in modules if not module['is_local'] and module['import_type'] == 'selective']
    standard_import_modules = [module for module in modules if module['import_type'] == 'standard']
    variable_modules = [module for module in modules if module['import_type'] == 'variable']
    union_modules = [module for module in modules if module['import_type'] == 'union']

    return fn_code, fn_name, local_modules, selective_import_modules, standard_import_modules, variable_modules, union_modules
def scrape_func_params(func)
Expand source code
def scrape_func_params(func):
    # Extract func parameter names, default values, and type annotations
    sig = inspect.signature(func)
    params = {}
    for param_name, param in sig.parameters.items():
        if param_name != "self":
            param_info = {
                'value': None if param.default is param.empty else param.default,
                'type': None
            }
            
            # Get type annotation if available
            if param.annotation is not param.empty:
                param_info['type'] = param.annotation
            
            params[param_name] = param_info
    
    return params
def scrape_init(file_path)
Expand source code
def scrape_init(file_path):
    def extract_value(value):
        if isinstance(value, ast.Constant):
            return value.value
        elif isinstance(value, ast.Name):
            return value.id
        elif isinstance(value, ast.Attribute):
            return f"{extract_value(value.value)}.{value.attr}"
        elif isinstance(value, ast.Call):
            if isinstance(value.func, ast.Attribute):
                return f"{extract_value(value.func.value)}.{value.func.attr}()"
            elif isinstance(value.func, ast.Name):
                return f"{value.func.id}()"
        else:
            return ast.unparse(value)

    with open(file_path, 'r') as file:
        tree = ast.parse(file.read(), filename=file_path)

    variables = []
    unique_variables = {}

    for node in ast.walk(tree):
        if isinstance(node, ast.Assign):
            for target in node.targets:
                if isinstance(target, ast.Name):
                    if isinstance(node.value, ast.Call) and isinstance(node.value.func, ast.Name):
                        data = {"type": "call", "target": target.id, "cls_name": node.value.func.id}
                        if node.value.keywords:
                            data['keywords'] = [kw.arg for kw in node.value.keywords]
                            data['values'] = [extract_value(kw.value) for kw in node.value.keywords]
                        unique_variables[data['target']] = data
                    elif isinstance(node.value, ast.Constant):
                        data = {"type": "constant", "target": target.id, "value": node.value.value}
                        unique_variables[data['target']] = data

    # Convert the dictionary values back to a list
    variables = list(unique_variables.values())

    return variables