Efficient Data Ingestion: Using Python to Stream Large Datasets into PostgreSQL
Streamline Your Data Workflow with Python and PostgreSQL
Handling large datasets can be a challenging task, especially when importing data into a database without overwhelming your system’s memory. In this article, I’ll walk you through a Python script that efficiently handles data ingestion by reading files in chunks and inserting them into a PostgreSQL database.
Introduction
Data ingestion is a critical step in data analytics and machine learning workflows. Whether you’re working with CSVs, Parquet files, or other formats, getting your data into a database quickly and efficiently is essential. This tutorial demonstrates how to:
- Read large datasets without overloading memory.
- Handle both CSV and Parquet file formats.
- Insert data into a PostgreSQL database in chunks.
Let’s dive into the solution!
What This Script Does
The Python script automates the ingestion process with the following features:
- File Format Support: Handles both CSV and Parquet files.
- Chunk-Based Processing: Reads data in chunks to avoid memory overload.
- PostgreSQL Integration: Leverages SQLAlchemy for database connections and insertions.
- Command-Line Arguments: Offers customization for database credentials, file path, chunk size, and more.
Code Walkthrough
Below is the complete script with explanations for each key section.
1. Parameter Parsing with argparse
Using the argparse
library, the script accepts user input via command-line arguments:
import argparse
parser = argparse.ArgumentParser(description="Ingest data into PostgreSQL")
parser.add_argument("--user", help="Postgres username", default="root")
parser.add_argument("--host", help="Database host", default="localhost")
parser.add_argument("--port", help="Database port", default="5432")
parser.add_argument("--db", help="Database name", default="ny_taxi")
parser.add_argument("--table", help="Target table name")
parser.add_argument("--url", help="Path to the data file (CSV or Parquet)")
parser.add_argument("--chunk", help="Chunk size for processing", default=10000, type=int)
args = parser.parse_args()
This approach allows flexibility, enabling users to adapt the script to various datasets and database configurations.
2. Reading the Data File
The script identifies the file format (CSV or Parquet) and reads the data accordingly:
import pandas as pd
def read_data(url, chunksize):
if url.endswith(".csv"):
print("File type: CSV")
return pd.read_csv(url, iterator=True, chunksize=chunksize)
elif url.endswith(".parquet"):
print("File type: Parquet")
df = pd.read_parquet(url)
return [df[i:i + chunksize] for i in range(0, df.shape[0], chunksize)]
else:
raise ValueError("Unsupported file type: " + url)
3. Inserting Data into PostgreSQL
Using SQLAlchemy, the script connects to PostgreSQL and inserts data in chunks:
from sqlalchemy import create_engine
def insert_data(chunks, table, engine):
for i, chunk in enumerate(chunks, start=1):
print(f"Inserting batch {i}...")
chunk["tpep_pickup_datetime"] = pd.to_datetime(chunk["tpep_pickup_datetime"])
chunk["tpep_dropoff_datetime"] = pd.to_datetime(chunk["tpep_dropoff_datetime"])
chunk.to_sql(name=table, con=engine, if_exists="append", index=False)
How to Use the Script
Follow these steps to use the script:
1. Install Dependencies
Ensure you have the required libraries installed:
pip install pandas sqlalchemy psycopg2 pyarrow
2. Save the Script
Save the script as data_ingestion.py
.
3. Run the Script
Run the script with appropriate arguments. For example:
python data_ingestion.py \
--user root \
--host localhost \
--db ny_taxi \
--table yellow_taxi_data \
--url path/to/your/file.csv \
--chunk 10000
When prompted, enter the password for your PostgreSQL user.
Pipeline Overview
Below is a visual representation of the pipeline for this ingestion process:
Example Scenarios
Here are some scenarios where this script shines:
1. Handling Large CSV Files
When working with CSV files larger than your system’s memory, the script processes and inserts data in manageable chunks.
2. Integrating Parquet Files
Parquet files, known for their efficiency, are fully supported. The script reads the entire file and splits it into chunks.
3. Preprocessing During Ingestion
You can modify the script to preprocess data (e.g., handling missing values or normalizing columns) during ingestion.
Challenges and Solutions
Challenge: Ingestion is slow for very large datasets.
Solution: Optimize PostgreSQL performance by:
- Disabling indexes temporarily during insertion.
- Using bulk inserts.
Challenge: Unsupported file formats.
Solution: Extend the script to handle formats like JSON or XML.
Conclusion
This script provides a robust and scalable solution for ingesting large datasets into PostgreSQL. By leveraging Python’s data processing capabilities and SQLAlchemy’s database integration, you can handle files of any size with ease.
Feel free to adapt this script to your specific workflows. If you have questions or suggestions, share them in the comments below — I’d love to hear your thoughts!
#DataEngineer #Database #Pipeline # Python ##dezoomcamp #LearningInPublic