How to Connect to Snowflake SQL API Using Python

In the fast-evolving landscape of data management and analysis, Snowflake has emerged as a leading cloud-based data warehousing solution. Connecting to Snowflake’s SQL API using Python provides a powerful way to interact with your data and integrate it seamlessly into your applications. In this comprehensive guide, we’ll walk you through a Python script that utilizes Snowflake’s API, fetching data and uploading it to another table. Our focus is on establishing a secure connection and leveraging the flexibility of Python for data processing.

Understanding Snowflake SQL API

The Snowflake SQL API is a RESTful interface providing developers with versatile access to Snowflake databases. With capabilities ranging from executing queries to managing deployments, users can submit SQL statements, monitor execution status, cancel operations, and retrieve query results concurrently. The API, secured by network policies, organizes query results into partitions, yet has limitations, including unsupported commands like PUT and GET in Snowflake SQL and restrictions on certain transactional and session-altering operations. Despite these constraints, the Snowflake SQL API remains a potent tool for crafting custom applications and integrations tailored to Snowflake databases.

Python Script for Snowflake SQL API Interaction

The Python script provided in this guide acts as a bridge between your Python environment and Snowflake’s SQL API. Its primary functions include generating a JWT token for secure authentication, fetching data from Snowflake using the API, and uploading the retrieved data to another table. This script streamlines the process of data integration, allowing you to seamlessly incorporate Snowflake’s capabilities into your Python applications. The script’s modular structure and reliance on the requests library make it easy to understand and customize according to your specific use case.

By executing the script, you initiate a series of steps: establishing a secure connection to Snowflake, retrieving data based on your specified query, processing the fetched data, and finally, uploading it to another table. The use of a configuration file (config.ini) ensures flexibility, allowing you to adapt the script to different Snowflake configurations and destination tables effortlessly. Whether you are building data pipelines, automating analytics workflows, or enhancing your applications with real-time data, this Python script provides a foundation for efficient and secure Snowflake integration.

Setting the Stage: Configuration Setup

Before diving into the script, let’s ensure you have the right setup. The script utilizes a configuration file (config.ini) to manage Snowflake and destination table configurations. Ensure you have this file in place and populated with the required details.

[Snowflake]
SnowflakeAccount = your_account
SnowflakeUser = your_username
SnowflakePrivateKeyPath = /path/to/private_key.pem
SnowflakePrivateKeyPassword = your_private_key_password
SnowflakeUrl = https://your_account.snowflakecomputing.com
SnowflakeDatabase = your_database
SnowflakeSchema = your_schema
SnowflakeWarehouse = your_warehouse
SnowflakeRole = your_role
SnowflakeQuery = SELECT * FROM your_table

[DestinationTable]
destination_table_name = your_destination_table

Replace the placeholders (your_...) with your actual Snowflake account details and configurations.

Step 1: Generating JWT Token

The generate_jwt_token function initializes a JWTGenerator with your Snowflake configuration parameters, extracting the account, username, private key path, and password. It then obtains a JWT token using the get_token method. You can find the source code here

# Import statements and other functions...

def generate_jwt_token(snowflake_config):
    # Extract Snowflake configuration parameters
    snowflake_account = snowflake_config.get('SnowflakeAccount')
    snowflake_username = snowflake_config.get('SnowflakeUser')
    snowflake_private_key_path = snowflake_config.get('SnowflakePrivateKeyPath')
    snowflake_private_key_password = snowflake_config.get('SnowflakePrivateKeyPassword')

    # Initialize JWTGenerator with Snowflake configuration
    jwt_generator = JWTGenerator(account=snowflake_account,
                                 user=snowflake_username,
                                 private_key_file_path=snowflake_private_key_path,
                                 private_key_password=snowflake_private_key_password)

    jwt_token = jwt_generator.get_token()
    # Generate and return the JWT token
    return jwt_token

Step 2: Fetching Data from Snowflake

The get_data_from_snowflake function uses the JWT token and Snowflake configuration to make a request to the Snowflake API. It supports fetching data from multiple partitions if present.

# Import statements and other functions...

def get_data_from_snowflake(jwt_token, snowflake_config):
    # Extract Snowflake configuration parameters
    snowflake_url = snowflake_config.get('SnowflakeUrl')
    snowflake_database = snowflake_config.get('SnowflakeDatabase')
    snowflake_schema = snowflake_config.get('SnowflakeSchema')
    snowflake_warehouse = snowflake_config.get('SnowflakeWarehouse')
    snowflake_role = snowflake_config.get('SnowflakeRole')

    # Statement to fetch data (modify as needed)
    statement = snowflake_config.get('SnowflakeQuery')

    # Prepare headers and data payload...

    # Make the request to Snowflake API
    response = requests.post(snowflake_url, headers=headers, json=data)

    # Process the response...

    return {'data': all_data, 'columns' : all_columns,'column_types': column_types}

Step 3: Data Processing

Data processing is a critical step to extract meaningful insights. The process_data function extracts relevant data from the JSON response, including column names, data, and column types.

# Import statements and other functions...

def process_data(json_data, all_data, columns):
    # Extract the relevant data from the json_data for processing
    data = json_data.get('data', [])

    # Extract column names and types from the metadata...

    # Append the column names to the columns list if not already present...

    # Append the data to the all_data list...

    return {'data': all_data, 'columns': columns, 'column_types': column_types}

Step 4: Uploading Data to Snowflake

Now that we have fetched and processed our data, the next step is to upload it to another table. The upload_data_to_snowflake_table function takes care of this.

# Import statements and other functions...

def upload_data_to_snowflake_table(jwt_token, data_to_load, columns, snowflake_config, destination_table_config):
    # Extract configuration details...

    # Prepare headers and create the SQL statement...

    # Join all data rows into a single string...

    # Concatenate the SQL statement with the values...

    # Prepare data payload...

    # Make the request to Snowflake API...

    # Process the response...

    return None

Bringing It All Together: Main Execution

The main function serves as the entry point of the script. It reads the configuration file, generates a JWT token, fetches data from Snowflake, and uploads it to another table.

# Import statements and other functions...

def main():
    config = ConfigParser()
    config.read('config.ini')

    # Read snowflake configuration section...

    # Generate JWT Token...

    # Get data from Snowflake in Json format...

    # Upload the retrieved data to another table...

if __name__ == "__main__":
    main()

Snowflake SQL API Partitions

  • The Snowflake API organizes data into partitions, and the number and size of each partition are determined by Snowflake based on the data returned by a SQL query.
  • When a request is submitted, the response body includes a “partitionInfo” field containing an array of objects, each describing a specific partition of data.
  • The first object in the array details the partition of data returned in the current response, while subsequent objects describe additional partitions that can be retrieved by submitting subsequent requests with “partition=partition_number.”
  • Each object in the array specifies the number of rows and the size of a partition, providing metadata that an application can utilize to handle partitions returned in subsequent requests.
  • An example response snippet is provided, including information such as “code,” “statementHandle,” “sqlState,” “message,” “createdOn,” and “statementStatusUrl.”
  • The “resultSetMetaData” section includes details like the total number of rows, format (e.g., “jsonv2”), and the array of partition information with specific details on row count, uncompressed size, and compressed size for each partition.
  • The “data” section illustrates sample data rows returned by the SQL query, with columns like customer information, address, postal code, and timestamp.
{
  "code": "090001",
  "statementHandle": "536fad38-b564-4dc5-9892-a4543504df6c",
  "sqlState": "00000",
  "message": "successfully executed",
  "createdOn": 1597090533987,
  "statementStatusUrl": "/api/v2/statements/536fad38-b564-4dc5-9892-a4543504df6c",
  "resultSetMetaData" : {
    "numRows" : 50000,
    "format" : "jsonv2",
    "partitionInfo" : [ {
      "rowCount" : 12288,
      "uncompressedSize" : 124067,
      "compressedSize" : 29591
    }, {
      "rowCount" : 37712,
      "uncompressedSize" : 414841,
      "compressedSize" : 84469
    }],
  },
  "data": [
    ["customer1", "1234 A Avenue", "98765", "2021-01-20
    12:34:56.03459878"],
    ["customer2", "987 B Street", "98765", "2020-05-31
    01:15:43.765432134"],
    ["customer3", "8777 C Blvd", "98765", "2019-07-01
    23:12:55.123467865"],
    ["customer4", "64646 D Circle", "98765", "2021-08-03
    13:43:23.0"]
  ]
}

Full source code on Snowflake SQL API using Python



from configparser import ConfigParser
import json
from urllib.parse import urljoin, urlparse
import requests
from sql_api_generate_jwt import JWTGenerator




def generate_jwt_token(snowflake_config):
    # Extract Snowflake configuration parameters
    snowflake_account = snowflake_config.get('SnowflakeAccount')
    snowflake_username = snowflake_config.get('SnowflakeUser')
    snowflake_private_key_path = snowflake_config.get('SnowflakePrivateKeyPath')
    snowflake_private_key_password = snowflake_config.get('SnowflakePrivateKeyPassword')

    # Initialize JWTGenerator with Snowflake configuration
    jwt_generator = JWTGenerator(account=snowflake_account,
                                 user=snowflake_username,
                                 private_key_file_path=snowflake_private_key_path,
                                 private_key_password=snowflake_private_key_password)
    
    jwt_token = jwt_generator.get_token()
    # Generate and return the JWT token
    return jwt_token

def get_data_from_snowflake(jwt_token, snowflake_config):
    # Extract Snowflake configuration parameters
    snowflake_url = snowflake_config.get('SnowflakeUrl')
    snowflake_database = snowflake_config.get('SnowflakeDatabase')
    snowflake_schema = snowflake_config.get('SnowflakeSchema')
    snowflake_warehouse = snowflake_config.get('SnowflakeWarehouse')
    snowflake_role = snowflake_config.get('SnowflakeRole')

    # Statement to fetch data (modify as needed)
    statement = snowflake_config.get('SnowflakeQuery')
   
    # Prepare headers
    headers = {
        'Accept': 'application/json',
        'Content-Type': 'application/json',
        'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT',
        'Authorization': f'Bearer {jwt_token}'
    }

    # Prepare data payload
    data = {
        "statement": statement,
        "timeout": "1000",
        "database": snowflake_database,
        "schema": snowflake_schema,
        "warehouse": snowflake_warehouse,
        "role": snowflake_role,
        "bindings": {},
        "parameters": {}
    }

    # Make the request to Snowflake API
    response = requests.post(snowflake_url, headers=headers, json=data)

    if response.status_code == 200:
        json_data = response.json()
        all_data = []
        all_columns = []
        # Process the first partition data
        data_info = process_data(json_data, all_data, all_columns)
        all_data = data_info['data']
        all_columns = data_info['columns']
        column_types = data_info['column_types']

        # Check if there are additional partitions
        link_header = response.headers.get('Link', '')
        next_partition_url = extract_next_partition_url(link_header, snowflake_url)
        # print (next_partition_url) 

        while next_partition_url:
            # Make subsequent requests for additional partitions
            response = requests.get(next_partition_url, headers=headers)
            if response.status_code == 200:
                json_data = response.json()
                
                # Process the data for each additional partition
                data_info = process_data(json_data, all_data, all_columns)
                all_data = data_info['data']
                all_columns = data_info['columns']
                column_types = data_info['column_types']
                # Check for more partitions in the Link header
                link_header = response.headers.get('Link', '')
                next_partition_url = extract_next_partition_url(link_header, snowflake_url)
                print (next_partition_url)  
            else:
                print(f"Error fetching additional partitions: {response.status_code}")
                print(response.text)
    
    return {'data': all_data, 'columns' : all_columns,'column_types': column_types}

def extract_next_partition_url(link_header, base_url):
    # Extract the URL for the next partition from the Link header
    links = link_header.split(',')
    for link in links:
        if 'rel="next"' in link:
            url = link.split(';')[0].strip('<>')
            # Add the base_url as the scheme
            parsed_url = urlparse(base_url)
            if "api/v2/statements/" in parsed_url.path:
                base_path = parsed_url.path.rstrip('/')  # Remove trailing slash from path
                return urljoin(f"{parsed_url.scheme}://{parsed_url.netloc}{base_path}", url)
            else:
                return urljoin(f"{parsed_url.scheme}://{parsed_url.netloc}", url)
    return None


def process_data(json_data, all_data, columns):
    # Extract the relevant data from the json_data for processing
    data = json_data.get('data', [])

    # Extract column names and types from the metadata
    result_set_metadata = json_data.get('resultSetMetaData', {})
    columns_info = result_set_metadata.get('rowType', [])
    column_names = [column_info['name'] for column_info in columns_info]
    column_types = {column_info['name'].upper(): column_info['type'].upper() for column_info in columns_info}


    # Append the column names to the columns list if not already present
    if not columns:
        columns.extend(column_names)

    # Append the data to the all_data list
    all_data.extend(data)

    return {'data': all_data, 'columns': columns, 'column_types': column_types}

def upload_data_to_snowflake_table(jwt_token, data_to_load, columns, snowflake_config, destination_table_config):
    target_table = destination_table_config.get('destination_table_name')
    snowflake_url = snowflake_config.get('SnowflakeUrl')
    snowflake_database = snowflake_config.get('SnowflakeDatabase')
    snowflake_schema = snowflake_config.get('SnowflakeSchema')
    snowflake_warehouse = snowflake_config.get('SnowflakeWarehouse')
    snowflake_role = snowflake_config.get('SnowflakeRole')

    # Prepare headers
    headers = {
        'Accept': 'application/json',
        'Content-Type': 'application/json',
        'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT',
        'Authorization': f'Bearer {jwt_token}'
    }

    # Create the SQL statement
    placeholders = ', '.join(['?' for _ in range(len(columns))])
    statement = f"INSERT INTO {target_table} ({', '.join(columns)})"

    # Join all data rows into a single string
    # values = ', '.join([f"({', '.join(map(str, row))})" for row in data_to_load])
    values = ', '.join(["({})".format(', '.join("'{}'".format(str(value)) for value in row)) for row in data_to_load])


    # Concatenate the SQL statement with the values
    full_statement = f"{statement} VALUES {values}"
    # print(full_statement)
    # Prepare data payload
    data = {
        'statement': full_statement,
        'timeout': '1000',
        'database': snowflake_database,
        'schema': snowflake_schema,
        'warehouse': snowflake_warehouse,
        'role': snowflake_role,
        'bindings': {},  # No explicit bindings
        'parameters': {}
    }

    # Make the request to Snowflake API
    response = requests.post(snowflake_url, headers=headers, json=data)

    if response.status_code == 200:
        print("Data inserted successfully.")
    else:
        print(f"Error inserting data: {response.status_code}")
        print(response.text)


def main():
    config = ConfigParser()
    config.read('config.ini')
    
    #Read snowflake configuration section
    snowflake_config = config['Snowflake']
    
    # Generate JWT Tokem
    jwt_token = generate_jwt_token(snowflake_config)

    # Get data from Snowflake in Json format
    data = get_data_from_snowflake(jwt_token, snowflake_config)
    all_data = data['data']
    columns = data['columns']
    column_types = data['column_types']

    # Upload the retrieved data to another table
    destination_table_config = config['DestinationTable']
    upload_data_to_snowflake_table(jwt_token, all_data, columns, snowflake_config, destination_table_config)

if __name__ == "__main__":
    main()

Conclusion

Connecting to Snowflake’s SQL API using Python offers a robust solution for managing and analyzing your data. This guide has provided you with a step-by-step walkthrough of a Python script that leverages Snowflake’s API to fetch and upload data securely. By following the outlined steps and customizing the script to your specific needs, you can seamlessly integrate Snowflake with your Python applications. Empower your data-driven solutions with the combined capabilities of Snowflake and Python, and take your data integration to new heights.

For more in-depth information and examples, refer to the official Snowflake documentation.

Start connecting to Snowflake with Python today and unlock the full potential of your data-driven applications!

Leave a Comment