traitement par lots ok

develop
gwen 3 years ago
parent 57ba65e606
commit 91beaf2772

@ -1,8 +1,17 @@
xmlreflector: # ________________________________________________________________________
type: actesdataset.XMLHousesReflector
# reading raw bourbon dataset
bourbon:
type: actesdataset.XMLDataSetCollection
housename: bourbon housename: bourbon
folderpath: data/01_raw/houses/bourbon folderpath: data/01_raw/houses/bourbon
# writing bourbon xmlcontent document attribute
bourbon_content:
type: actesdataset.XMLDataSetCollection
housename: bourbon
folderpath: data/02_intermediate/houses/bourbon
actors: actors:
type: pandas.CSVDataSet type: pandas.CSVDataSet
filepath: data/01_raw/csv/actors.csv filepath: data/01_raw/csv/actors.csv

@ -1 +1 @@
xlststylesheet: templates/xsl/actes_princiers.xsl xsltstylesheet: templates/xsl/actes_princiers.xsl

@ -1,21 +1,7 @@
from pathlib import Path #from typing import Dict
from typing import Dict
from kedro.framework.context import KedroContext from kedro.framework.context import KedroContext
from kedro.pipeline import Pipeline, node, pipeline
from actesdataset import XMLDataSet
# FIXME : move in an utils.py module
def tree(directory, relative_to=None):
"helper that returns a directory tree structure"
trees = dict()
for path in sorted(directory.rglob("*.xml")):
trees[path.stem] = str(path.relative_to(relative_to))
return trees
class ProjectContext(KedroContext): class ProjectContext(KedroContext):
project_name = "actes princiers" project_name = "actes princiers"
project_version = "0.1" project_version = "0.1"
@ -32,42 +18,11 @@ class ProjectContext(KedroContext):
houses = self.config_loader.get("houses*") houses = self.config_loader.get("houses*")
return houses['raw_datapath'] return houses['raw_datapath']
def _get_catalog(self, *args, **kwargs): # def _get_catalog(self, *args, **kwargs):
"catalog loader entry point" # "catalog loader entry point"
# loading yaml defined catalogs # # loading yaml defined catalogs
catalog = super()._get_catalog(*args, **kwargs) # catalog = super()._get_catalog(*args, **kwargs)
# kedro.io.data_catalog.DataCatalog # # kedro.io.data_catalog.DataCatalog
# adding data sets # # adding data sets
self.nodes_description = self._house_dataset_loader(catalog) # self.nodes_description = self._house_dataset_loader(catalog)
return catalog # return catalog
def _house_dataset_loader(self, catalog):
nodes_description = []
houses_datapath = self.get_houses_datapath()
data_root_path = Path.cwd() / houses_datapath
relative_to = Path.cwd()
for dataset_name, dataset_path in tree(data_root_path, relative_to=relative_to).items():
catalog.add(data_set_name=dataset_name,
data_set=XMLDataSet(filepath=dataset_path),
replace=True)
# adding an output catalog entry
output_dataset_name = dataset_name + "_output"
# XXX : make better
output_dataset_path = Path(dataset_path.replace("01_raw", "02_intermediate"))
# let's create subfolders if they don't exist
output_dataset_dir = output_dataset_path.parent
output_dataset_dir.mkdir(parents=True, exist_ok=True)
catalog.add(data_set_name=output_dataset_name,
data_set=XMLDataSet(filepath=output_dataset_path),
replace=True)
# prepare information for the next stage (the pipeline stage)
node_description = dict(
inputs=dataset_name,
outputs=output_dataset_name,
name=dataset_name)
nodes_description.append(node_description)
return nodes_description
def prepare_pipeline_creation(self):
return self.nodes_description

@ -1,8 +1,39 @@
import logging
from pathlib import Path
from lxml import etree from lxml import etree
def parse_xsl(source_doc, xlststylesheet): from actesdataset import XMLDataSet
logger = logging.getLogger(__name__)
def transform(source_doc, xlststylesheet):
#<class 'lxml.etree._XSLTResultTree'> #<class 'lxml.etree._XSLTResultTree'>
xslt_doc = etree.parse(xlststylesheet) xslt_doc = etree.parse(xlststylesheet)
xslt_transformer = etree.XSLT(xslt_doc) xslt_transformer = etree.XSLT(xslt_doc)
return str(xslt_transformer(source_doc)) return str(xslt_transformer(source_doc))
def parse_xml_collection(datasets, param):
# FIXME set signature
# datasets -> dict
# param -> str
# logger.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
# logger.info(str(bourbon))
output_datasets = dict()
# datasets = bourbon.get_datasets()
for dataset_filenamestem, dataset in datasets.items():
# manually loading the dataset
dataset._load()
output_source_doc = transform(dataset.get_source_doc(), param)
output_filepath = dataset.get_filepath().replace("01_raw", "02_intermediate")
output_xmldataset = XMLDataSet(output_filepath)
output_xmldataset.set_source_doc(output_source_doc)
output_datasets[dataset_filenamestem] = output_xmldataset
# let's create subfolders if they don't exist
output_filepath = Path(output_filepath)
output_xmldataset_dir = output_filepath.parent
output_xmldataset_dir.mkdir(parents=True, exist_ok=True)
return output_datasets

@ -1,37 +1,18 @@
import logging
from kedro.pipeline import Pipeline, node, pipeline from kedro.pipeline import Pipeline, node, pipeline
from kedro.framework.session import KedroSession from kedro.framework.session import KedroSession
from .nodes import parse_xsl from .nodes import parse_xml_collection
logger = logging.getLogger(__name__)
# we need the context here in order to have access to prepare_pipeline_creation()
with KedroSession.create() as session:
context = session.load_context()
# important: **we have to call** the catalog as an attribute,
# because it makes a call to the _get_catalog() of the context method
catalog = context.catalog
# logger.info("loading houses")
# logger.info("------------------ houses ---------------------")
# logger.info(str(context.get_houses()))
def nodes_factory(nodes_description): def create_pipeline(**kwargs) -> Pipeline:
"nodes creation" return pipeline(
nodes = [] [
for node_description in nodes_description: node(
node_name = node_description['name'] func=parse_xml_collection,
# logger.info(f"building node: {node_name}...") inputs=["bourbon", "params:xsltstylesheet"],
nodes.append(node( outputs="bourbon_content",
func=parse_xsl, name="bourbon_ds_collection",
inputs=[node_description['inputs'], "params:xlststylesheet"], ),
outputs=node_description['outputs'], ]
name=node_name, )
tags="xsl",
))
return nodes
def create_pipeline(**kwargs):
"pipeline entry point needed by the global pipeline registry"
return pipeline(nodes_factory(context.prepare_pipeline_creation()))

@ -1,28 +1,54 @@
import json import json
from typing import Dict, Any from typing import Dict, Any
from pathlib import Path
import logging
from lxml import etree from lxml import etree
from kedro.io import AbstractDataSet, DataSetError from kedro.io import AbstractDataSet, DataSetError
from kedro.framework.session import KedroSession
# FIXME: supprimer l'héritage logger = logging.getLogger(__name__)
class XMLDataSet(AbstractDataSet):
# FIXME : ça n'est plus utile
# we need the context here to have access to the config
with KedroSession.create() as session:
context = session.load_context()
class XMLDataSet:
"lxml.etree._ElementTree loader" "lxml.etree._ElementTree loader"
# FIXME set the typing signature !!!!
# FIXME set the typing signature
def __init__(self, filepath: str): def __init__(self, filepath: str):
self._filepath = filepath self._filepath = filepath
def _load(self): def get_filepath(self):
source_doc = etree.parse(self._filepath) return self._filepath
def get_source_doc(self):
if hasattr(self, 'source_doc'):
return self.source_doc
else:
attr_error_msg = str(self._describe())
raise AttributeError(f"XMLDataSet bject {attr_error_msg} has no attribute named : 'source_doc'")
def set_source_doc(self, source_doc):
self.source_doc = source_doc
def _transform_source_doc(self):
# remove namespace : # remove namespace :
query = "descendant-or-self::*[namespace-uri()!='']" query = "descendant-or-self::*[namespace-uri()!='']"
for element in source_doc.xpath(query): for element in self.source_doc.xpath(query):
#replace element name with its local name #replace element name with its local name
element.tag = etree.QName(element).localname element.tag = etree.QName(element).localname
etree.cleanup_namespaces(source_doc) etree.cleanup_namespaces(self.source_doc)
return source_doc return self.source_doc
def _load(self):
self.source_doc = etree.parse(self._filepath)
self._transform_source_doc()
return self.source_doc
def _save(self, data:str) -> None: def _save(self, data:str) -> None:
with open(self._filepath, 'w') as fhandle: with open(self._filepath, 'w') as fhandle:
@ -31,8 +57,9 @@ class XMLDataSet(AbstractDataSet):
def _describe(self) -> Dict[str, Any]: def _describe(self) -> Dict[str, Any]:
return dict(filepath=self._filepath) return dict(filepath=self._filepath)
class XMLHousesReflector(AbstractDataSet):
"""``XMLHousesReflector`` stores instances of ``XMLDataSet`` class XMLDataSetCollection(AbstractDataSet):
"""Stores instances of ``XMLDataSet``
implementations to provide ``load`` and ``save`` capabilities. implementations to provide ``load`` and ``save`` capabilities.
anywhere in the program. To use a ``DataCatalog``, you need to anywhere in the program. To use a ``DataCatalog``, you need to
instantiate it with a file system folder path, it "reflects" instantiate it with a file system folder path, it "reflects"
@ -50,26 +77,40 @@ class XMLHousesReflector(AbstractDataSet):
>>> io = XMLCatalogReflector(housename='bourbon', folderpath='/tmp/mydir', data_sets={'cars': cars}) >>> io = XMLCatalogReflector(housename='bourbon', folderpath='/tmp/mydir', data_sets={'cars': cars})
# filepath, load_args=None, save_args=None): # filepath, load_args=None, save_args=None):
""" """
# FIXME set the typing signature
def __init__(self, def __init__(self,
housename: str, housename: str,
folderpath: str, folderpath: str):
data_sets: dict[str, XMLDataSet] = None):
self._housename = housename self._housename = housename
self._folderpath = folderpath self._folderpath = Path(folderpath)
self._datasets = data_sets # self.house_path = Path(context.get_houses()[self._housename]['path'])
# self.filepath = filepath # print(self.house_path)
def get_datasets(self):
if hasattr(self, 'datasets'):
return self.datasets
else:
attr_error_msg = str(self._describe())
raise AttributeError(f"Object {attr_error_msg} has no attribute named : 'datasets'")
# FIXME : set the signature
def _load(self): def _load(self):
return "C'est chargé!" ":return: dict[str, XMLDataSet]"
self.datasets = dict()
def _save(self): for filepath in sorted(self._folderpath.glob("*.xml")):
raise NotImplementedError("Attention : dataset en lecture seule !") self.datasets[filepath.stem] = XMLDataSet(
filepath=str(filepath))
def _exists(self) -> bool: return self.datasets
return True
# FIXME : set the signature
def _save(self, datasets):
# faire une méthode save et pas _save
for stemfilename, dataset in datasets.items():
# FIXME XXX -> pas besoin refaire un get_source_doc !!!!!!
dataset._save(dataset.get_source_doc())
def _describe(self): def _describe(self):
return dict(name="my own dataset") return dict(name=self._housename, folderpath=self._folderpath)
# def load(self, name: str) -> Any: # def load(self, name: str) -> Any:
# """Loads a registered data set. # """Loads a registered data set.
@ -114,7 +155,7 @@ class XMLHousesReflector(AbstractDataSet):
## >>> io.save("cars", df) ## >>> io.save("cars", df)
## """ ## """
## dataset = self._get_dataset(name) ## dataset = self._get_dataset(name)
### self._logger.info("Saving data to '%s' (%s)...", name, type(dataset).__name__) ### self._print("Saving data to '%s' (%s)...", name, type(dataset).__name__)
## dataset.save(data) ## dataset.save(data)
# def _describe(self) -> Dict[str, Any]: # def _describe(self) -> Dict[str, Any]:

Loading…
Cancel
Save