Source code for src.internal.data_connector.clearml_dataset

"""ClearML Dataset Connector
A interface to interact with ClearML datasets.
"""
from pathlib import Path
from typing import Dict, List, Optional, Sequence, Union

from clearml import Dataset, Task

from ...models.common import Artifact
from .connector import DatasetConnector


[docs]class ClearMLDataset(DatasetConnector): """ClearML Dataset Connector.""" def __init__(self): """Initialize a ClearML dataset connector. Note: Do not directly instantiate, instead, you should use the `create` or `get` class method to create the connector. """ super().__init__() self.dataset: Optional[Dataset] = None self.output_path: Optional[Union[Path, str]] = None self._task: Optional[Task] = None @property def file_entries(self) -> Dict: if self.dataset is None: raise AttributeError("Dataset has not been initialized") return self.dataset.file_entries_dict # type: ignore @property def artifacts(self) -> List[Artifact]: """Get a list of artifacts in the dataset. Raises: NotImplementedError: If dataset connector does not implement this method. Returns: List[Artifact]: List of artifacts in the dataset """ if self.dataset is None: raise AttributeError("Dataset has not been initialized") data = [] if not self._task: self._task = Task.get_task(task_id=self.dataset.id) for name, entry in self._task.artifacts.items(): data.append( Artifact(artifactType="dataset", name=name, url=entry.url) ) return data @property def name(self) -> str: if self.dataset is None: raise AttributeError("Dataset has not been initialized") return self.dataset.name @property def project(self) -> str: if self.dataset is None: raise AttributeError("Dataset has not been initialized") return self.dataset.project @property def tags(self) -> List[str]: """Get the tags associated with the dataset. Raises: AttributeError: If dataset has not been initialized. Returns: List[str]: List of tags associated with the dataset. """ if self.dataset is None: raise AttributeError("Dataset has not been initialized") return self.dataset.tags
[docs] @classmethod def get( cls, id: Optional[str] = None, project: Optional[str] = None, name: Optional[str] = None, version: Optional[str] = None, ) -> "ClearMLDataset": dataset = cls() dataset.dataset = Dataset.get( dataset_id=id, dataset_name=name, dataset_project=project, dataset_version=version, ) dataset.id = dataset.dataset.id dataset.default_remote = dataset.dataset.get_default_storage() try: dataset._task = Task.get_task(task_id=dataset.id) except Exception: pass return dataset
[docs] @classmethod def create( cls, name: Optional[str] = None, version: Optional[str] = None, project: Optional[str] = None, tags: Optional[Sequence[str]] = None, default_remote: Optional[str] = None, description: Optional[str] = None, ) -> "ClearMLDataset": dataset = cls() dataset.dataset = Dataset.create( dataset_name=name, dataset_project=project, dataset_tags=tags, dataset_version=version, output_uri=default_remote, description=description, ) dataset.id = dataset.dataset.id dataset.default_remote = default_remote return dataset
[docs] @staticmethod def list_datasets( project: Optional[str] = None, partial_name: Optional[str] = None, tags: Optional[Sequence[str]] = None, ids: Optional[Sequence[str]] = None, ) -> List[Dict]: return Dataset.list_datasets( partial_name=partial_name, dataset_project=project, ids=ids, tags=tags, )
[docs] def add_files( self, path: Union[str, Path], recursive: bool = True ) -> None: if self.dataset is None: raise AttributeError("Dataset has not been created.") self.dataset.add_files(path=path, recursive=recursive)
[docs] def remove_files( self, path: Union[str, Path], recursive: bool = True ) -> None: if self.dataset is None: raise AttributeError("Dataset has not been created.") self.dataset.remove_files(dataset_path=str(path), recursive=recursive)
[docs] def upload(self, remote: Optional[str] = None) -> None: if self.dataset is None: raise AttributeError("Dataset has not been created.") if remote is None: if self.default_remote is None: warning_msg = "No remote url specified, using default file server specified in clearml_conf" else: warning_msg = f"No remote url specified, using default remote of {self.default_remote}" self.logger.warning(warning_msg) remote = self.default_remote self.dataset.upload(output_url=remote) # NOTE: no idea if the below one is necessary self.dataset.finalize()
[docs] def download(self, path: Union[str, Path], overwrite: bool = True) -> str: if self.dataset is None: raise AttributeError("Dataset has not been created.") self.output_path = self.dataset.get_mutable_local_copy( target_folder=path, overwrite=overwrite ) if self.output_path is None: raise RuntimeError("Failed to download dataset") return self.output_path
[docs] def delete(self) -> None: # If dataset does not exist, then # no need to do anything if self.dataset is not None: self.dataset.delete(dataset_id=self.id)