Efficient Data Ingestion: Using Python to Stream Large Datasets into PostgreSQL

Fauzan Ghaza
3 min readJan 23, 2025

--

Streamline Your Data Workflow with Python and PostgreSQL

Handling large datasets can be a challenging task, especially when importing data into a database without overwhelming your system’s memory. In this article, I’ll walk you through a Python script that efficiently handles data ingestion by reading files in chunks and inserting them into a PostgreSQL database.

Introduction

Data ingestion is a critical step in data analytics and machine learning workflows. Whether you’re working with CSVs, Parquet files, or other formats, getting your data into a database quickly and efficiently is essential. This tutorial demonstrates how to:

  • Read large datasets without overloading memory.
  • Handle both CSV and Parquet file formats.
  • Insert data into a PostgreSQL database in chunks.

Let’s dive into the solution!

What This Script Does

The Python script automates the ingestion process with the following features:

  1. File Format Support: Handles both CSV and Parquet files.
  2. Chunk-Based Processing: Reads data in chunks to avoid memory overload.
  3. PostgreSQL Integration: Leverages SQLAlchemy for database connections and insertions.
  4. Command-Line Arguments: Offers customization for database credentials, file path, chunk size, and more.

Code Walkthrough

Below is the complete script with explanations for each key section.

1. Parameter Parsing with argparse

Using the argparse library, the script accepts user input via command-line arguments:

import argparse

parser = argparse.ArgumentParser(description="Ingest data into PostgreSQL")

parser.add_argument("--user", help="Postgres username", default="root")
parser.add_argument("--host", help="Database host", default="localhost")
parser.add_argument("--port", help="Database port", default="5432")
parser.add_argument("--db", help="Database name", default="ny_taxi")
parser.add_argument("--table", help="Target table name")
parser.add_argument("--url", help="Path to the data file (CSV or Parquet)")
parser.add_argument("--chunk", help="Chunk size for processing", default=10000, type=int)

args = parser.parse_args()

This approach allows flexibility, enabling users to adapt the script to various datasets and database configurations.

2. Reading the Data File

The script identifies the file format (CSV or Parquet) and reads the data accordingly:

import pandas as pd

def read_data(url, chunksize):
if url.endswith(".csv"):
print("File type: CSV")
return pd.read_csv(url, iterator=True, chunksize=chunksize)
elif url.endswith(".parquet"):
print("File type: Parquet")
df = pd.read_parquet(url)
return [df[i:i + chunksize] for i in range(0, df.shape[0], chunksize)]
else:
raise ValueError("Unsupported file type: " + url)

3. Inserting Data into PostgreSQL

Using SQLAlchemy, the script connects to PostgreSQL and inserts data in chunks:

from sqlalchemy import create_engine
def insert_data(chunks, table, engine):
for i, chunk in enumerate(chunks, start=1):
print(f"Inserting batch {i}...")
chunk["tpep_pickup_datetime"] = pd.to_datetime(chunk["tpep_pickup_datetime"])
chunk["tpep_dropoff_datetime"] = pd.to_datetime(chunk["tpep_dropoff_datetime"])
chunk.to_sql(name=table, con=engine, if_exists="append", index=False)

How to Use the Script

Follow these steps to use the script:

1. Install Dependencies

Ensure you have the required libraries installed:

pip install pandas sqlalchemy psycopg2 pyarrow

2. Save the Script

Save the script as data_ingestion.py.

3. Run the Script

Run the script with appropriate arguments. For example:

python data_ingestion.py \
--user root \
--host localhost \
--db ny_taxi \
--table yellow_taxi_data \
--url path/to/your/file.csv \
--chunk 10000

When prompted, enter the password for your PostgreSQL user.

Pipeline Overview

Below is a visual representation of the pipeline for this ingestion process:

Example Scenarios

Here are some scenarios where this script shines:

1. Handling Large CSV Files

When working with CSV files larger than your system’s memory, the script processes and inserts data in manageable chunks.

2. Integrating Parquet Files

Parquet files, known for their efficiency, are fully supported. The script reads the entire file and splits it into chunks.

3. Preprocessing During Ingestion

You can modify the script to preprocess data (e.g., handling missing values or normalizing columns) during ingestion.

Challenges and Solutions

Challenge: Ingestion is slow for very large datasets.

Solution: Optimize PostgreSQL performance by:

  • Disabling indexes temporarily during insertion.
  • Using bulk inserts.

Challenge: Unsupported file formats.

Solution: Extend the script to handle formats like JSON or XML.

Conclusion

This script provides a robust and scalable solution for ingesting large datasets into PostgreSQL. By leveraging Python’s data processing capabilities and SQLAlchemy’s database integration, you can handle files of any size with ease.

Feel free to adapt this script to your specific workflows. If you have questions or suggestions, share them in the comments below — I’d love to hear your thoughts!

#DataEngineer #Database #Pipeline # Python ##dezoomcamp #LearningInPublic

--

--

No responses yet