diff --git a/actes-princiers/conf/base/catalog.yml b/actes-princiers/conf/base/catalog.yml index 8006348..c21bd45 100644 --- a/actes-princiers/conf/base/catalog.yml +++ b/actes-princiers/conf/base/catalog.yml @@ -1,8 +1,17 @@ -xmlreflector: - type: actesdataset.XMLHousesReflector +# ________________________________________________________________________ + +# reading raw bourbon dataset +bourbon: + type: actesdataset.XMLDataSetCollection housename: bourbon folderpath: data/01_raw/houses/bourbon +# writing bourbon xmlcontent document attribute +bourbon_content: + type: actesdataset.XMLDataSetCollection + housename: bourbon + folderpath: data/02_intermediate/houses/bourbon + actors: type: pandas.CSVDataSet filepath: data/01_raw/csv/actors.csv diff --git a/actes-princiers/conf/base/parameters.yml b/actes-princiers/conf/base/parameters.yml index 18d8ea3..6f83708 100644 --- a/actes-princiers/conf/base/parameters.yml +++ b/actes-princiers/conf/base/parameters.yml @@ -1 +1 @@ -xlststylesheet: templates/xsl/actes_princiers.xsl +xsltstylesheet: templates/xsl/actes_princiers.xsl diff --git a/actes-princiers/src/actes_princiers/customcontext.py b/actes-princiers/src/actes_princiers/customcontext.py index c75b341..202a92b 100644 --- a/actes-princiers/src/actes_princiers/customcontext.py +++ b/actes-princiers/src/actes_princiers/customcontext.py @@ -1,21 +1,7 @@ -from pathlib import Path -from typing import Dict +#from typing import Dict from kedro.framework.context import KedroContext -from kedro.pipeline import Pipeline, node, pipeline -from actesdataset import XMLDataSet - - -# FIXME : move in an utils.py module -def tree(directory, relative_to=None): - "helper that returns a directory tree structure" - trees = dict() - for path in sorted(directory.rglob("*.xml")): - trees[path.stem] = str(path.relative_to(relative_to)) - return trees - - class ProjectContext(KedroContext): project_name = "actes princiers" project_version = "0.1" @@ -32,42 +18,11 @@ class ProjectContext(KedroContext): houses = self.config_loader.get("houses*") return houses['raw_datapath'] - def _get_catalog(self, *args, **kwargs): - "catalog loader entry point" - # loading yaml defined catalogs - catalog = super()._get_catalog(*args, **kwargs) - # kedro.io.data_catalog.DataCatalog - # adding data sets - self.nodes_description = self._house_dataset_loader(catalog) - return catalog - - def _house_dataset_loader(self, catalog): - nodes_description = [] - houses_datapath = self.get_houses_datapath() - data_root_path = Path.cwd() / houses_datapath - relative_to = Path.cwd() - for dataset_name, dataset_path in tree(data_root_path, relative_to=relative_to).items(): - catalog.add(data_set_name=dataset_name, - data_set=XMLDataSet(filepath=dataset_path), - replace=True) - # adding an output catalog entry - output_dataset_name = dataset_name + "_output" - # XXX : make better - output_dataset_path = Path(dataset_path.replace("01_raw", "02_intermediate")) - # let's create subfolders if they don't exist - output_dataset_dir = output_dataset_path.parent - output_dataset_dir.mkdir(parents=True, exist_ok=True) - catalog.add(data_set_name=output_dataset_name, - data_set=XMLDataSet(filepath=output_dataset_path), - replace=True) - # prepare information for the next stage (the pipeline stage) - node_description = dict( - inputs=dataset_name, - outputs=output_dataset_name, - name=dataset_name) - nodes_description.append(node_description) - return nodes_description - - def prepare_pipeline_creation(self): - return self.nodes_description - +# def _get_catalog(self, *args, **kwargs): +# "catalog loader entry point" +# # loading yaml defined catalogs +# catalog = super()._get_catalog(*args, **kwargs) +# # kedro.io.data_catalog.DataCatalog +# # adding data sets +# self.nodes_description = self._house_dataset_loader(catalog) +# return catalog diff --git a/actes-princiers/src/actes_princiers/pipelines/xml_processing/nodes.py b/actes-princiers/src/actes_princiers/pipelines/xml_processing/nodes.py index 7aca6bf..c62426f 100755 --- a/actes-princiers/src/actes_princiers/pipelines/xml_processing/nodes.py +++ b/actes-princiers/src/actes_princiers/pipelines/xml_processing/nodes.py @@ -1,8 +1,39 @@ +import logging +from pathlib import Path + from lxml import etree -def parse_xsl(source_doc, xlststylesheet): +from actesdataset import XMLDataSet + +logger = logging.getLogger(__name__) + + +def transform(source_doc, xlststylesheet): # xslt_doc = etree.parse(xlststylesheet) xslt_transformer = etree.XSLT(xslt_doc) return str(xslt_transformer(source_doc)) + +def parse_xml_collection(datasets, param): + # FIXME set signature + # datasets -> dict + # param -> str +# logger.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~") +# logger.info(str(bourbon)) + output_datasets = dict() +# datasets = bourbon.get_datasets() + for dataset_filenamestem, dataset in datasets.items(): + # manually loading the dataset + dataset._load() + output_source_doc = transform(dataset.get_source_doc(), param) + output_filepath = dataset.get_filepath().replace("01_raw", "02_intermediate") + output_xmldataset = XMLDataSet(output_filepath) + output_xmldataset.set_source_doc(output_source_doc) + output_datasets[dataset_filenamestem] = output_xmldataset + # let's create subfolders if they don't exist + output_filepath = Path(output_filepath) + output_xmldataset_dir = output_filepath.parent + output_xmldataset_dir.mkdir(parents=True, exist_ok=True) + return output_datasets + diff --git a/actes-princiers/src/actes_princiers/pipelines/xml_processing/pipeline.py b/actes-princiers/src/actes_princiers/pipelines/xml_processing/pipeline.py index 8ca6f28..d23c6e9 100755 --- a/actes-princiers/src/actes_princiers/pipelines/xml_processing/pipeline.py +++ b/actes-princiers/src/actes_princiers/pipelines/xml_processing/pipeline.py @@ -1,37 +1,18 @@ -import logging - from kedro.pipeline import Pipeline, node, pipeline from kedro.framework.session import KedroSession -from .nodes import parse_xsl - -logger = logging.getLogger(__name__) +from .nodes import parse_xml_collection -# we need the context here in order to have access to prepare_pipeline_creation() -with KedroSession.create() as session: - context = session.load_context() - # important: **we have to call** the catalog as an attribute, - # because it makes a call to the _get_catalog() of the context method - catalog = context.catalog -# logger.info("loading houses") -# logger.info("------------------ houses ---------------------") -# logger.info(str(context.get_houses())) -def nodes_factory(nodes_description): - "nodes creation" - nodes = [] - for node_description in nodes_description: - node_name = node_description['name'] - # logger.info(f"building node: {node_name}...") - nodes.append(node( - func=parse_xsl, - inputs=[node_description['inputs'], "params:xlststylesheet"], - outputs=node_description['outputs'], - name=node_name, - tags="xsl", - )) - return nodes +def create_pipeline(**kwargs) -> Pipeline: + return pipeline( + [ + node( + func=parse_xml_collection, + inputs=["bourbon", "params:xsltstylesheet"], + outputs="bourbon_content", + name="bourbon_ds_collection", + ), + ] + ) -def create_pipeline(**kwargs): - "pipeline entry point needed by the global pipeline registry" - return pipeline(nodes_factory(context.prepare_pipeline_creation())) diff --git a/actes-princiers/src/actesdataset.py b/actes-princiers/src/actesdataset.py index 9448287..5ffc29f 100644 --- a/actes-princiers/src/actesdataset.py +++ b/actes-princiers/src/actesdataset.py @@ -1,28 +1,54 @@ import json from typing import Dict, Any +from pathlib import Path +import logging from lxml import etree from kedro.io import AbstractDataSet, DataSetError +from kedro.framework.session import KedroSession -# FIXME: supprimer l'héritage -class XMLDataSet(AbstractDataSet): +logger = logging.getLogger(__name__) + +# FIXME : ça n'est plus utile +# we need the context here to have access to the config +with KedroSession.create() as session: + context = session.load_context() + +class XMLDataSet: "lxml.etree._ElementTree loader" - # FIXME set the typing signature !!!! + # FIXME set the typing signature def __init__(self, filepath: str): self._filepath = filepath - def _load(self): - source_doc = etree.parse(self._filepath) + def get_filepath(self): + return self._filepath + + def get_source_doc(self): + if hasattr(self, 'source_doc'): + return self.source_doc + else: + attr_error_msg = str(self._describe()) + raise AttributeError(f"XMLDataSet bject {attr_error_msg} has no attribute named : 'source_doc'") + + def set_source_doc(self, source_doc): + self.source_doc = source_doc + + def _transform_source_doc(self): # remove namespace : query = "descendant-or-self::*[namespace-uri()!='']" - for element in source_doc.xpath(query): + for element in self.source_doc.xpath(query): #replace element name with its local name element.tag = etree.QName(element).localname - etree.cleanup_namespaces(source_doc) - return source_doc + etree.cleanup_namespaces(self.source_doc) + return self.source_doc + + def _load(self): + self.source_doc = etree.parse(self._filepath) + self._transform_source_doc() + return self.source_doc def _save(self, data:str) -> None: with open(self._filepath, 'w') as fhandle: @@ -31,8 +57,9 @@ class XMLDataSet(AbstractDataSet): def _describe(self) -> Dict[str, Any]: return dict(filepath=self._filepath) -class XMLHousesReflector(AbstractDataSet): - """``XMLHousesReflector`` stores instances of ``XMLDataSet`` + +class XMLDataSetCollection(AbstractDataSet): + """Stores instances of ``XMLDataSet`` implementations to provide ``load`` and ``save`` capabilities. anywhere in the program. To use a ``DataCatalog``, you need to instantiate it with a file system folder path, it "reflects" @@ -50,26 +77,40 @@ class XMLHousesReflector(AbstractDataSet): >>> io = XMLCatalogReflector(housename='bourbon', folderpath='/tmp/mydir', data_sets={'cars': cars}) # filepath, load_args=None, save_args=None): """ + # FIXME set the typing signature def __init__(self, housename: str, - folderpath: str, - data_sets: dict[str, XMLDataSet] = None): + folderpath: str): self._housename = housename - self._folderpath = folderpath - self._datasets = data_sets -# self.filepath = filepath - + self._folderpath = Path(folderpath) +# self.house_path = Path(context.get_houses()[self._housename]['path']) +# print(self.house_path) + + def get_datasets(self): + if hasattr(self, 'datasets'): + return self.datasets + else: + attr_error_msg = str(self._describe()) + raise AttributeError(f"Object {attr_error_msg} has no attribute named : 'datasets'") + + # FIXME : set the signature def _load(self): - return "C'est chargé!" - - def _save(self): - raise NotImplementedError("Attention : dataset en lecture seule !") - - def _exists(self) -> bool: - return True + ":return: dict[str, XMLDataSet]" + self.datasets = dict() + for filepath in sorted(self._folderpath.glob("*.xml")): + self.datasets[filepath.stem] = XMLDataSet( + filepath=str(filepath)) + return self.datasets + + # FIXME : set the signature + def _save(self, datasets): + # faire une méthode save et pas _save + for stemfilename, dataset in datasets.items(): + # FIXME XXX -> pas besoin refaire un get_source_doc !!!!!! + dataset._save(dataset.get_source_doc()) def _describe(self): - return dict(name="my own dataset") + return dict(name=self._housename, folderpath=self._folderpath) # def load(self, name: str) -> Any: # """Loads a registered data set. @@ -114,7 +155,7 @@ class XMLHousesReflector(AbstractDataSet): ## >>> io.save("cars", df) ## """ ## dataset = self._get_dataset(name) -### self._logger.info("Saving data to '%s' (%s)...", name, type(dataset).__name__) +### self._print("Saving data to '%s' (%s)...", name, type(dataset).__name__) ## dataset.save(data) # def _describe(self) -> Dict[str, Any]: