Module naptha_sdk.modules.kb

Classes

class KnowledgeBase (kb_deployment: KBDeployment)
Expand source code
class KnowledgeBase:
    def __init__(self, kb_deployment: KBDeployment):
        self.kb_deployment = kb_deployment
        self.kb_node = NodeClient(self.kb_deployment.node)
        self.table_name = kb_deployment.config.path
        self.schema = kb_deployment.config.schema
        if "id_column" in kb_deployment.config:
            self.id_column = kb_deployment.config.id_column
        else:
            self.id_column = "id"
        if self.table_name is None:
            self.table_name = kb_deployment.module["name"]

    @classmethod
    async def create(cls, module_run):
        instance = cls(module_run)
        await instance._initialize()
        return instance

    async def _initialize(self):
        try:
            tables = await self.kb_node.list_tables()
            if self.table_name not in tables:
                await self.kb_node.create_table(self.table_name, self.schema)
        except Exception as e:
            logger.error(f"Error initializing knowledge base: {str(e)}")
            raise
        
    async def call_kb_func(self, module_run_input: KBRunInput, *args, **kwargs):
        logger.info(f"Running knowledge base on knowledge base node {self.kb_node}")
        kb_run = await self.kb_node.run_module(module_type="kb", run_input=module_run_input.model_dict())
        return kb_run

Static methods

async def create(module_run)

Methods

async def call_kb_func(self,
module_run_input: KBRunInput,
*args,
**kwargs)
Expand source code
async def call_kb_func(self, module_run_input: KBRunInput, *args, **kwargs):
    logger.info(f"Running knowledge base on knowledge base node {self.kb_node}")
    kb_run = await self.kb_node.run_module(module_type="kb", run_input=module_run_input.model_dict())
    return kb_run