You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

3.0 KiB

Concaténation

Voici un exemple simple de pipeline Kedro qui charge plusieurs fichiers CSV, les concatène et écrit le résultat dans un nouveau fichier CSV.

Structure du projet

Voici une structure de projet Kedro typique pour cet exemple :

my_kedro_project/
│
├── conf/
│   ├── base/
│   │   ├── catalog.yml
│   │   └── parameters.yml
│   └── local/
│       ├── catalog.yml
│       └── parameters.yml
│
├── src/
│   └── my_kedro_project/
│       ├── __init__.py
│       ├── pipeline_registry.py
│       ├── nodes.py
│       └── pipeline.py
│
├── data/
│   ├── 01_raw/
│   │   ├── file1.csv
│   │   ├── file2.csv
│   │   └── file3.csv
│   └── 02_intermediate/
│       └── concatenated.csv
│
└── pyproject.toml

Configuration du catalogue de données

Dans conf/base/catalog.yml, configurez les datasets :

file1:
  type: pandas.CSVDataSet
  filepath: data/01_raw/file1.csv

file2:
  type: pandas.CSVDataSet
  filepath: data/01_raw/file2.csv

file3:
  type: pandas.CSVDataSet
  filepath: data/01_raw/file3.csv

concatenated:
  type: pandas.CSVDataSet
  filepath: data/02_intermediate/concatenated.csv

Définition des nœuds

Dans src/my_kedro_project/nodes.py, définissez les nœuds pour concaténer les fichiers CSV :

import pandas as pd
from kedro.pipeline import node

def concatenate_csvs(file1: pd.DataFrame, file2: pd.DataFrame, file3: pd.DataFrame) -> pd.DataFrame:
    return pd.concat([file1, file2, file3])

node_concatenate = node(
    func=concatenate_csvs,
    inputs=["file1", "file2", "file3"],
    outputs="concatenated",
    name="concatenate_csvs_node"
)

Définition du pipeline

Dans src/my_kedro_project/pipeline.py, créez le pipeline :

from kedro.pipeline import Pipeline
from my_kedro_project.nodes import node_concatenate

def create_pipeline(**kwargs) -> Pipeline:
    return Pipeline([node_concatenate])

Registre de pipeline

Dans src/my_kedro_project/pipeline_registry.py, enregistrez le pipeline :

from kedro.pipeline import Pipeline
from my_kedro_project.pipeline import create_pipeline

def register_pipelines() -> Dict[str, Pipeline]:
    return {
        "__default__": create_pipeline(),
    }

Exécution du pipeline

Pour exécuter le pipeline, utilisez la commande suivante dans le répertoire racine de votre projet :

kedro run

Cela chargera les fichiers CSV file1.csv, file2.csv, et file3.csv depuis le répertoire data/01_raw/, les concaténera et écrira le résultat dans data/02_intermediate/concatenated.csv.

Conclusion

Cet exemple montre comment configurer un pipeline Kedro pour charger plusieurs fichiers CSV, les concaténer et écrire le résultat dans un nouveau fichier CSV. Vous pouvez adapter cette structure pour des workflows plus complexes en ajoutant d'autres nœuds et pipelines selon vos besoins.