Introduction:
In today’s data-driven world, managing and analyzing vast amounts of data efficiently is crucial. Snowflake, a cloud-based data warehousing platform, offers a powerful solution for storing and querying data. This blog post will explore a generic Python script that facilitates the process of uploading CSV files to Snowflake, making it easy for data professionals to integrate their data seamlessly.
If you’re exploring Python for Snowflake integration using key-pair authentication or seeking a comprehensive guide, check out our resources at Beginner’s Guide to Snowflake Key-Pair Authentication and Python and Snowflake Integration: A Step-by-Step Guide. These tutorials cater to both beginners and those looking for practical insights into connecting Python with Snowflake, ensuring a smooth journey into data management and analysis.
Prerequisites:
Before delving into the details of the Python script, ensure that you have the necessary prerequisites in place:
- A Snowflake account: Obtain the account details, including the account name, user credentials, and database information.
- Snowflake stage: Set up a Snowflake stage where the CSV files will be staged before being loaded into the database.
- Python environment: Make sure you have Python installed on your system.
Understanding the Python Script:
Let’s break down the provided Python script into key components and understand how it works.
- Snowflake Connection Details:
- The script begins by defining Snowflake connection details such as the account name, user credentials, database, warehouse, schema, role, and stage.
- These details are essential for establishing a connection to Snowflake.
snowflake_env = "Environment prefix"
snowflake_account = 'your_Account'
snowflake_user = '#your registered email id'
snowflake_database = snowflake_env + 'your_database'
snowflake_warehouse = snowflake_env + 'your_warehouse',
snowflake_schema = 'your_schema'
snowflake_role = snowflake_env + 'your_role'
snowflake_stage = 'your_Stage'
- Connection to Snowflake:
- The
connect_to_snowflake
function establishes a connection to Snowflake using the provided connection details. - It also includes error handling to manage potential connection issues.
def connect_to_snowflake():
try:
conn = snowflake.connector.connect(
user=snowflake_user,
account=snowflake_account,
database=snowflake_database,
schema=snowflake_schema,
warehouse=snowflake_warehouse,
role=snowflake_role,
authenticator='externalbrowser'
)
# Check connection success
if hasattr(conn, 'cursor'):
print("Snowflake connection established successfully.")
cursor = conn.cursor()
cursor.execute(f"USE WAREHOUSE TEMP_WH")
cursor.execute(f"USE DATABASE {snowflake_database}")
cursor.execute(f"USE SCHEMA {snowflake_schema}")
else:
print("Error connecting to Snowflake.")
return conn
except snowflake.connector.errors.DatabaseError as e:
print("Error connecting to Snowflake: ", str(e))
- Creating a Table in Snowflake:
- The
create_table
function creates a table in Snowflake with the specified name and columns.
def create_table(conn, table_name, columns):
query = f"CREATE OR REPLACE TABLE {table_name} ({columns})"
print(query)
conn.cursor().execute(query)
print(f"table {table_name} created successfully!")
- Uploading CSV Files to Snowflake:
- The
upload_csv_to_snowflake
function uploads a CSV file to Snowflake. - It uses a Snowflake stage to handle the file upload process.
def upload_csv_to_snowflake(conn, file_path, table_name):
cursor = conn.cursor()
cursor.execute(f"PUT 'file://{file_path}' @{snowflake_stage} auto_compress=false OVERWRITE = TRUE;")
print("PUT statement command executed!")
cursor.execute(f"COPY INTO {table_name} FROM @{snowflake_stage}/{os.path.basename(file_path)} FILE_FORMAT = 'PNR_EDA.SAS_IH_AD_CSV_DATA' ON_ERROR = 'CONTINUE' FORCE = TRUE PURGE = FALSE ENFORCE_LENGTH =FALSE ;")
print("COPY into command executed!")
- Processing CSV Files and Directories:
- The
process_csv_file
function reads a CSV file, extracts column names, and creates a corresponding Snowflake table. - The
process_csv_directory
function processes all CSV files in a specified directory.
def process_csv_file(conn, file_path, table_name):
df = pd.read_csv(file_path)
column_definitions = [f"{column.replace(' ', '_')} string" for column in df.columns]
create_table(conn, table_name, ', '.join(column_definitions))
upload_csv_to_snowflake(conn, file_path, table_name)
def process_csv_directory(conn, directory_path):
csv_files = glob.glob(os.path.join(directory_path, '*.csv'))
csv_files = [file.replace("\\", "/") for file in csv_files]
for file in csv_files:
table_name = os.path.splitext(os.path.basename(file))[0]
process_csv_file(conn, file, table_name)
- Main Function:
- The
main
function is the entry point of the script. - It establishes a connection to Snowflake and processes either a single CSV file or all CSV files in a specified directory.
def main():
conn = connect_to_snowflake()
# Uncomment and use when targeting a single file
# csv_file_path = 'your/path/to/csv/xyz.csv'
# table_name = os.path.splitext(os.path.basename(csv_file_path))[0]
# process_csv_file(conn, csv_file_path, table_name)
# Uncomment and use when targeting to upload all CSVs in a directory
# directory_path = 'path/to/directory/'
# process_csv_directory(conn, directory_path)
if __name__ == '__main__':
main()
Python Script for Uploading CSV Files to Snowflake
## pip install snowflake-connector-python
import os
import glob
import pandas as pd
import snowflake.connector
def get_snowflake_connection(env_prefix, account, user, database, warehouse, schema, role, stage):
try:
conn = snowflake.connector.connect(
user=user,
account=account,
database=database,
schema=schema,
warehouse=warehouse,
role=role,
authenticator='externalbrowser'
)
# Check connection success
if hasattr(conn, 'cursor'):
print("Snowflake connection established successfully.")
cursor = conn.cursor()
cursor.execute(f"USE WAREHOUSE TEMP_WH")
cursor.execute(f"USE DATABASE {database}")
cursor.execute(f"USE SCHEMA {schema}")
else:
print("Error connecting to Snowflake.")
return conn
except snowflake.connector.errors.DatabaseError as e:
print("Error connecting to Snowflake: ", str(e))
def create_snowflake_table(conn, table_name, columns):
# Create a table in Snowflake
query = f"CREATE OR REPLACE TABLE {table_name} ({columns})"
print(query)
conn.cursor().execute(query)
print(f"Table {table_name} created successfully!")
def upload_csv_to_snowflake(conn, file_path, table_name, stage):
# Upload the data to Snowflake
cursor = conn.cursor()
# cursor.execute(f"TRUNCATE TABLE {table_name}") # Optional: truncate the table before inserting data
# print("TRUNCATE TABLE command executed!")
cursor.execute(f"PUT 'file://{file_path}' @{stage} auto_compress=false OVERWRITE = TRUE;") # Stage the file
print("PUT statement command executed!")
cursor.execute(f"COPY INTO {table_name} FROM @{stage}/{os.path.basename(file_path)} FILE_FORMAT = 'PNR_EDA.SAS_IH_AD_CSV_DATA' ON_ERROR = 'CONTINUE' FORCE = TRUE PURGE = FALSE ENFORCE_LENGTH =FALSE ;") # Copy the data into the table
print("COPY into command executed!")
def process_csv_file(conn, file_path, table_name):
# Read the CSV file
df = pd.read_csv(file_path)
# Create the table in Snowflake
column_definitions = [f"{column.replace(' ', '_')} string" for column in df.columns]
create_snowflake_table(conn, table_name, ', '.join(column_definitions))
# Upload CSV data to Snowflake
upload_csv_to_snowflake(conn, file_path, table_name, stage)
def process_csv_directory(conn, directory_path, stage):
# Get all CSV files in the directory
csv_files = glob.glob(os.path.join(directory_path, '*.csv'))
csv_files = [file.replace("\\", "/") for file in csv_files] # Replace backslashes with forward slashes
for file in csv_files:
table_name = os.path.splitext(os.path.basename(file))[0] # Use the filename as the table name
process_csv_file(conn, file, table_name)
def main():
# Snowflake connection details
env_prefix = "Environment prefix"
account = 'your_Account'
user = '#your registered email id'
database = env_prefix + 'your_database'
warehouse = env_prefix + 'your_warehouse'
schema = 'your_schema'
role = env_prefix + 'your_role'
stage = 'your_Stage'
# Establish connection to Snowflake
conn = get_snowflake_connection(env_prefix, account, user, database, warehouse, schema, role, stage)
# Use when you target single file
# csv_file_path = 'C:/Projects/Python/Snowflake/Upload_2_SNF/DE_TEAM.csv'
# table_name = os.path.splitext(os.path.basename(csv_file_path))[0] # table name as file name or your own table name
# process_csv_file(conn,csv_file_path, table_name)
# Use when targeting to upload all CSVs in a directory
directory_path = 'path/to/directory/'
process_csv_directory(conn, directory_path, stage)
if __name__ == '__main__':
main()
Conclusion:
This generic Python script provides a streamlined approach to upload CSV files to Snowflake. By making it easy to handle both individual files and entire directories, data professionals can efficiently integrate their data into Snowflake’s cloud-based data warehousing platform. Customization options are available, allowing users to tailor the script to their specific needs.