Module naptha_sdk.storage.storage_provider

Classes

class StorageError (message: str, status_code: int | None = None)
Expand source code
class StorageError(Exception):
    """Custom exception for storage operations"""
    def __init__(self, message: str, status_code: Optional[int] = None):
        self.message = message
        self.status_code = status_code
        super().__init__(self.message)

Custom exception for storage operations

Ancestors

  • builtins.Exception
  • builtins.BaseException
class StorageProvider (node: NodeConfigUser)
Expand source code
class StorageProvider:
    def __init__(self, node: NodeConfigUser):
        self.node = node
        self.node_url = node_to_url(node)
        self.client = httpx.AsyncClient(timeout=HTTP_TIMEOUT)
        logger.info(f"Storage Provider URL: {self.node_url}")


    async def _make_request(
        self,
        request: BaseStorageRequest,
        files: Optional[Dict] = None
    ) -> Any:
        """Make HTTP request to storage endpoint"""
        endpoint = f"{self.node_url}/storage/{request.storage_type.value}/{request.request_type.value}/{request.path}"
        print(f"Request: {request}")
        try:
            response = None
            match request:
                case CreateStorageRequest():
                    form_data = {}
                    if not request.data:
                        request.data = {}
                    if not request.options:
                        request.options = {}
                    form_data['data'] = json.dumps({**request.data, **request.options})
                    print(f"Form data: {form_data}")
                    files = files if files is not None else {}
                    response = await self.client.post(endpoint, data=form_data, files=files)

                case ReadStorageRequest():
                    if request.storage_type in [StorageType.FILESYSTEM, StorageType.IPFS]:
                        response = await self.client.get(endpoint)
                        response.raise_for_status()
                        # Check content type to determine response handling
                        content_type = response.headers.get('content-type', '')
                        if 'json' in content_type:
                            return response.json()
                        return response.content
                    else:
                        params = {"options": json.dumps(request.options)} if request.options else None
                        response = await self.client.get(endpoint, params=params)
                                        
                case UpdateStorageRequest():
                    # Extract condition from options if present
                    condition = request.options.get("condition") if request.options else None
                    form_data = {
                        'data': json.dumps(request.data)
                    }
                    params = {
                        'condition': json.dumps(condition) if condition else None
                    }
                    response = await self.client.put(endpoint, data=form_data, params=params)
                    
                case ListStorageRequest():
                    params = {"options": json.dumps(request.options)} if request.options else None
                    response = await self.client.get(endpoint, params=params)
                    
                case DeleteStorageRequest():
                    params = {}
                    if request.storage_type == StorageType.DATABASE:
                        if request.condition:
                            params["condition"] = json.dumps(request.condition)
                    elif request.storage_type == StorageType.FILESYSTEM:
                        if request.options:
                            params["options"] = json.dumps(request.options)
                    response = await self.client.delete(endpoint, params=params)
                    
                case SearchStorageRequest():
                    search_data = {
                        "query": request.query,
                        "query_type": request.query_type,
                        "limit": request.limit
                    }
                    response = await self.client.post(endpoint, json=search_data)
            
            if response:
                response.raise_for_status()
                content_type = response.headers.get('content-type', '')
                if 'json' in content_type:
                    return response.json()
                return response.content
                
        except httpx.HTTPStatusError as e:
            logger.error(f"HTTP error occurred: {e.response.text}")
            raise StorageError(f"HTTP error occurred: {str(e)}", status_code=e.response.status_code)
        except Exception as e:
            logger.error(f"Storage operation failed: {str(e)}")
            raise StorageError(f"Storage operation failed: {str(e)}")

    async def execute(self, request: BaseStorageRequest) -> Union[StorageObject, List[StorageObject], bool]:
        """Execute storage request and return appropriate response"""
        files = None
        if isinstance(request, CreateStorageRequest) and request.file:
            files = {"file": request.file}
            
        result = await self._make_request(request, files=files)

        match request:
            case DeleteStorageRequest():
                return True
                
            case ListStorageRequest():
                # Handle list response which might be a simple array
                if isinstance(result, list):
                    return [
                        StorageObject(
                            location=StorageLocation(
                                storage_type=request.storage_type,
                                path=request.path
                            ),
                            data=item
                        )
                        for item in result
                    ]
                return StorageObject(
                    location=StorageLocation(storage_type=request.storage_type, path=request.path),
                    data=result
                )
                
            case SearchStorageRequest():
                # Handle search response which returns objects with path
                return [
                    StorageObject(
                        location=StorageLocation(
                            storage_type=request.storage_type,
                            path=obj.get("path", "")
                        ),
                        data=obj.get("data"),
                        metadata=obj.get("metadata")
                    )
                    for obj in result
                ]
                
            case _:
                return StorageObject(
                    location=StorageLocation(
                        storage_type=request.storage_type,
                        path=request.path
                    ),
                    data=result
                )

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.client.aclose()

Methods

async def execute(self,
request: BaseStorageRequest) ‑> StorageObject | List[StorageObject] | bool
Expand source code
async def execute(self, request: BaseStorageRequest) -> Union[StorageObject, List[StorageObject], bool]:
    """Execute storage request and return appropriate response"""
    files = None
    if isinstance(request, CreateStorageRequest) and request.file:
        files = {"file": request.file}
        
    result = await self._make_request(request, files=files)

    match request:
        case DeleteStorageRequest():
            return True
            
        case ListStorageRequest():
            # Handle list response which might be a simple array
            if isinstance(result, list):
                return [
                    StorageObject(
                        location=StorageLocation(
                            storage_type=request.storage_type,
                            path=request.path
                        ),
                        data=item
                    )
                    for item in result
                ]
            return StorageObject(
                location=StorageLocation(storage_type=request.storage_type, path=request.path),
                data=result
            )
            
        case SearchStorageRequest():
            # Handle search response which returns objects with path
            return [
                StorageObject(
                    location=StorageLocation(
                        storage_type=request.storage_type,
                        path=obj.get("path", "")
                    ),
                    data=obj.get("data"),
                    metadata=obj.get("metadata")
                )
                for obj in result
            ]
            
        case _:
            return StorageObject(
                location=StorageLocation(
                    storage_type=request.storage_type,
                    path=request.path
                ),
                data=result
            )

Execute storage request and return appropriate response