From 6af317f16b87921707fc59561cc6792226b63198 Mon Sep 17 00:00:00 2001 From: gwen Date: Mon, 4 Sep 2023 17:38:54 +0200 Subject: [PATCH] populate nosql database pipeline --- actes-princiers/.gitignore | 2 +- actes-princiers/conf/README.md | 2 +- actes-princiers/conf/base/parameters.yml | 2 + .../pipelines/populate_mongo/__init__.py | 3 ++ .../pipelines/populate_mongo/nodes.py | 42 +++++++++++++++++++ .../pipelines/populate_mongo/pipeline.py | 19 +++++++++ .../pipelines/xml_processing/pipeline.py | 3 ++ .../src/actes_princiers/settings.py | 4 +- 8 files changed, 73 insertions(+), 4 deletions(-) create mode 100755 actes-princiers/src/actes_princiers/pipelines/populate_mongo/__init__.py create mode 100755 actes-princiers/src/actes_princiers/pipelines/populate_mongo/nodes.py create mode 100755 actes-princiers/src/actes_princiers/pipelines/populate_mongo/pipeline.py diff --git a/actes-princiers/.gitignore b/actes-princiers/.gitignore index 4c4506f..329c811 100644 --- a/actes-princiers/.gitignore +++ b/actes-princiers/.gitignore @@ -2,7 +2,7 @@ # KEDRO PROJECT # ignore all local configuration -# conf/local/** +conf/local/** !conf/local/.gitkeep .telemetry diff --git a/actes-princiers/conf/README.md b/actes-princiers/conf/README.md index cee6f56..5f4f2a9 100644 --- a/actes-princiers/conf/README.md +++ b/actes-princiers/conf/README.md @@ -20,4 +20,4 @@ WARNING: Please do not put access credentials in the base configuration folder. ## Need help? -[Find out more about configuration from the Kedro documentation](https://docs.kedro.org/en/stable/kedro_project_setup/configuration.html). +[Find out more about configuration from the Kedro documentation](https://docs.kedro.org/en/stable/configuration/index.html). diff --git a/actes-princiers/conf/base/parameters.yml b/actes-princiers/conf/base/parameters.yml index a5ef50f..131ab24 100644 --- a/actes-princiers/conf/base/parameters.yml +++ b/actes-princiers/conf/base/parameters.yml @@ -1 +1,3 @@ version: 1.0 +db_name: actesdb +db_collection_name: actes diff --git a/actes-princiers/src/actes_princiers/pipelines/populate_mongo/__init__.py b/actes-princiers/src/actes_princiers/pipelines/populate_mongo/__init__.py new file mode 100755 index 0000000..e08ea5f --- /dev/null +++ b/actes-princiers/src/actes_princiers/pipelines/populate_mongo/__init__.py @@ -0,0 +1,3 @@ +"Data Processing pipeline" + +from .pipeline import create_pipeline # NOQA diff --git a/actes-princiers/src/actes_princiers/pipelines/populate_mongo/nodes.py b/actes-princiers/src/actes_princiers/pipelines/populate_mongo/nodes.py new file mode 100755 index 0000000..2713d54 --- /dev/null +++ b/actes-princiers/src/actes_princiers/pipelines/populate_mongo/nodes.py @@ -0,0 +1,42 @@ +import logging +from pathlib import Path +from typing import Dict + +from kedro.framework.session import KedroSession + +from actesdataset import JSONDataSetCollection + +import pymongo + + + +logger = logging.getLogger(__name__) + + +def populate_mongo(jsondoc: JSONDataSetCollection, storage_ip: str, db_name: str, db_collection_name: str) -> None: + + #logger.info(storage_ip) + #logger.info(db_name) + #logger.info(db_collection_name) + jsondatasets = jsondoc.datasets + housename = jsondoc._housename + mongodb_url = "mongodb://{}:27017/".format(storage_ip) + logger.info("connection to the mongodb server: " + mongodb_url) + + # pymongo settings + myclient = pymongo.MongoClient(mongodb_url) + + actesdb = myclient[db_name] + actes_collection = actesdb[db_collection_name] + + # TODO faire un insert_many directement ? + for dataset_filenamestem, dataset in jsondatasets.items(): + # a manual load is required here, because + # the dataset **is not** registered in kedro's catalog + document = dataset._load() + # FIXME que mettre comme id ? le filename ? + document["_id"] = document["filename"] + #logger.info(str(document)) + res = actes_collection.insert_one(document) + logger.info(res.inserted_id) + return diff --git a/actes-princiers/src/actes_princiers/pipelines/populate_mongo/pipeline.py b/actes-princiers/src/actes_princiers/pipelines/populate_mongo/pipeline.py new file mode 100755 index 0000000..d8fde8d --- /dev/null +++ b/actes-princiers/src/actes_princiers/pipelines/populate_mongo/pipeline.py @@ -0,0 +1,19 @@ +from kedro.pipeline import Pipeline, node, pipeline + +from .nodes import populate_mongo + + +def create_pipeline(**kwargs) -> Pipeline: + return pipeline( + [ + node( + func=populate_mongo, + inputs=["bourbon_fulljsonoutput", "params:storage_ip", "params:db_name", + "params:db_collection_name"], + outputs=None, + name="populate_mongo", + tags="populate_database", + ) + ] + ) + diff --git a/actes-princiers/src/actes_princiers/pipelines/xml_processing/pipeline.py b/actes-princiers/src/actes_princiers/pipelines/xml_processing/pipeline.py index 879539f..a43fe65 100755 --- a/actes-princiers/src/actes_princiers/pipelines/xml_processing/pipeline.py +++ b/actes-princiers/src/actes_princiers/pipelines/xml_processing/pipeline.py @@ -13,18 +13,21 @@ def create_pipeline(**kwargs) -> Pipeline: inputs=["bourbon"], outputs="bourbon_xmlcontent", name="bourbon_ds_collection", + tags="etl_transform" ), node( func=make_json_collection, inputs="bourbon_json", outputs="bourbon_jsonoutput", name="bourbon_json_ds_collection", + tags="etl_transform" ), node( func=add_xmlcontent_tojson, inputs=["bourbon_jsonoutput", "bourbon_pseudoxmlcontent"], outputs="bourbon_fulljsonoutput", name="bourbon_fulljson_ds_collection", + tags="etl_transform" ), # node( diff --git a/actes-princiers/src/actes_princiers/settings.py b/actes-princiers/src/actes_princiers/settings.py index d8982b8..2ad0cfe 100644 --- a/actes-princiers/src/actes_princiers/settings.py +++ b/actes-princiers/src/actes_princiers/settings.py @@ -22,8 +22,8 @@ https://kedro.readthedocs.io/en/stable/kedro_project_setup/settings.html.""" # CONF_SOURCE = "conf" # Class that manages how configuration is loaded. -# from kedro.config import OmegaConfigLoader -# CONFIG_LOADER_CLASS = OmegaConfigLoader +from kedro.config import OmegaConfigLoader +CONFIG_LOADER_CLASS = OmegaConfigLoader # Keyword arguments to pass to the `CONFIG_LOADER_CLASS` constructor. # CONFIG_LOADER_ARGS = { # "config_patterns": {