populate nosql database pipeline

develop
gwen 3 years ago
parent 1ec817652e
commit 6af317f16b

@ -2,7 +2,7 @@
# KEDRO PROJECT
# ignore all local configuration
# conf/local/**
conf/local/**
!conf/local/.gitkeep
.telemetry

@ -20,4 +20,4 @@ WARNING: Please do not put access credentials in the base configuration folder.
## Need help?
[Find out more about configuration from the Kedro documentation](https://docs.kedro.org/en/stable/kedro_project_setup/configuration.html).
[Find out more about configuration from the Kedro documentation](https://docs.kedro.org/en/stable/configuration/index.html).

@ -1 +1,3 @@
version: 1.0
db_name: actesdb
db_collection_name: actes

@ -0,0 +1,3 @@
"Data Processing pipeline"
from .pipeline import create_pipeline # NOQA

@ -0,0 +1,42 @@
import logging
from pathlib import Path
from typing import Dict
from kedro.framework.session import KedroSession
from actesdataset import JSONDataSetCollection
import pymongo
logger = logging.getLogger(__name__)
def populate_mongo(jsondoc: JSONDataSetCollection, storage_ip: str, db_name: str, db_collection_name: str) -> None:
#logger.info(storage_ip)
#logger.info(db_name)
#logger.info(db_collection_name)
jsondatasets = jsondoc.datasets
housename = jsondoc._housename
mongodb_url = "mongodb://{}:27017/".format(storage_ip)
logger.info("connection to the mongodb server: " + mongodb_url)
# pymongo settings
myclient = pymongo.MongoClient(mongodb_url)
actesdb = myclient[db_name]
actes_collection = actesdb[db_collection_name]
# TODO faire un insert_many directement ?
for dataset_filenamestem, dataset in jsondatasets.items():
# a manual load is required here, because
# the dataset **is not** registered in kedro's catalog
document = dataset._load()
# FIXME que mettre comme id ? le filename ?
document["_id"] = document["filename"]
#logger.info(str(document))
res = actes_collection.insert_one(document)
logger.info(res.inserted_id)
return

@ -0,0 +1,19 @@
from kedro.pipeline import Pipeline, node, pipeline
from .nodes import populate_mongo
def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(
func=populate_mongo,
inputs=["bourbon_fulljsonoutput", "params:storage_ip", "params:db_name",
"params:db_collection_name"],
outputs=None,
name="populate_mongo",
tags="populate_database",
)
]
)

@ -13,18 +13,21 @@ def create_pipeline(**kwargs) -> Pipeline:
inputs=["bourbon"],
outputs="bourbon_xmlcontent",
name="bourbon_ds_collection",
tags="etl_transform"
),
node(
func=make_json_collection,
inputs="bourbon_json",
outputs="bourbon_jsonoutput",
name="bourbon_json_ds_collection",
tags="etl_transform"
),
node(
func=add_xmlcontent_tojson,
inputs=["bourbon_jsonoutput", "bourbon_pseudoxmlcontent"],
outputs="bourbon_fulljsonoutput",
name="bourbon_fulljson_ds_collection",
tags="etl_transform"
),
# node(

@ -22,8 +22,8 @@ https://kedro.readthedocs.io/en/stable/kedro_project_setup/settings.html."""
# CONF_SOURCE = "conf"
# Class that manages how configuration is loaded.
# from kedro.config import OmegaConfigLoader
# CONFIG_LOADER_CLASS = OmegaConfigLoader
from kedro.config import OmegaConfigLoader
CONFIG_LOADER_CLASS = OmegaConfigLoader
# Keyword arguments to pass to the `CONFIG_LOADER_CLASS` constructor.
# CONFIG_LOADER_ARGS = {
# "config_patterns": {

Loading…
Cancel
Save