How To Guide#

Currently all of these examples would require custom sql to be writen in Snowpark without the ice pick extension. With the ice pick extension we add the functionality to support new objects to Snowpark, and make the common tasks below easy to implement only using python.

[1]:
# Initializing the extended session

from ice_pick import extend_session
from snowflake.snowpark import Session
import configparser

# Create the connection and extend the session with ice_pick
# assumes credentials are in "snowflake_creds.config"
config = configparser.ConfigParser()
config.read('snowflake_creds.config')
session = extend_session(Session).builder.configs(dict(config['DEFAULT'])).create()
[2]:
import warnings
warnings.filterwarnings("ignore")

Get DDL of objects#

You can easily get ddl of all objects in your database by using the schema object filter. The schema object filter will return all objects matching the regular expresson provided. Wildcards (“.*”) can be used for all fields to get all objects.

[7]:
# Get ddl as a string for a single object

# session.create_schema_object(database, schema, object name, object type)
customer_table = session.create_schema_object('TEST', 'SCHEMA_1', 'CUSTOMER', 'TABLE')

customer_table.get_ddl()
[7]:
'create or replace TABLE CUSTOMER (\n\tC_CUSTKEY NUMBER(38,0),\n\tC_NAME VARCHAR(25),\n\tC_ADDRESS VARCHAR(40),\n\tC_NATIONKEY NUMBER(38,0),\n\tC_PHONE VARCHAR(15),\n\tC_ACCTBAL NUMBER(12,2),\n\tC_MKTSEGMENT VARCHAR(10),\n\tC_COMMENT VARCHAR(117)\n);'
[8]:
# Get many objects using a filter

# session.create_schema_object_filter([database], [schema], [object name], [object type])
all_schema_objects = session.create_schema_object_filter([".*"], [".*"], [".*"], [".*"])

all_schema_object_list = all_schema_objects.return_schema_objects()

print(f"returned_objects: {len(all_schema_object_list)}")
print(f"""first 5 object names and types:
    {
        [f"{obj.database}.{obj.schema}.{obj.object_name}"
        for obj in all_schema_object_list[0:5]]
      }""")
returned_objects: 7
first 5 object names and types:
    ['TEST.SCHEMA_1.SP_PI()', 'TEST.SCHEMA_1.CUSTOMER', 'TEST.SCHEMA_1.LINEITEM', 'TEST.SCHEMA_2.CUSTOMER', 'TEST.SCHEMA_2.LINEITEM']
[15]:
# Saving all the schema objects from the filter

for obj in all_schema_object_list:
    print(f"Saving Object: {obj.database}.{obj.schema}.{obj.object_name}")

    print(f"To Path: DDL/{obj.database}/{obj.schema}/{obj.object_type}/\
{obj.database}.{obj.schema}.{obj.object_name}.sql")

    obj.get_ddl(save=True)
Saving Object: TEST.SCHEMA_1.SP_PI()
To Path: DDL/TEST/SCHEMA_1/PROCEDURE/TEST.SCHEMA_1.SP_PI().sql
Saving Object: TEST.SCHEMA_1.CUSTOMER
To Path: DDL/TEST/SCHEMA_1/TABLE/TEST.SCHEMA_1.CUSTOMER.sql
Saving Object: TEST.SCHEMA_1.LINEITEM
To Path: DDL/TEST/SCHEMA_1/TABLE/TEST.SCHEMA_1.LINEITEM.sql
Saving Object: TEST.SCHEMA_2.CUSTOMER
To Path: DDL/TEST/SCHEMA_2/TABLE/TEST.SCHEMA_2.CUSTOMER.sql
Saving Object: TEST.SCHEMA_2.LINEITEM
To Path: DDL/TEST/SCHEMA_2/TABLE/TEST.SCHEMA_2.LINEITEM.sql
Saving Object: TEST_2.SCHEMA_A.CUSTOMER
To Path: DDL/TEST_2/SCHEMA_A/TABLE/TEST_2.SCHEMA_A.CUSTOMER.sql
Saving Object: TEST.SCHEMA_1.ECHO_VARCHAR(VARCHAR)
To Path: DDL/TEST/SCHEMA_1/USER FUNCTION/TEST.SCHEMA_1.ECHO_VARCHAR(VARCHAR).sql

Example of saved DDL for TEST.SCHEMA_1.SP_PI().sql:

CREATE OR REPLACE PROCEDURE "SP_PI"()
RETURNS FLOAT
LANGUAGE JAVASCRIPT
EXECUTE AS OWNER
AS '
    return 3.1415926;
    ';

Check access to an object (user privileges)#

Access to objects can be found in the Snowflake UI. However, if you are just accessing Snowflake with Snowpark it can be easier to verify privilegs programatically. To view all privileges we need to recusively search through the role heirarchy, but ice pick takes care of this automatically.

[4]:
user = session.User("PRESTONT4")

object_to_check = session.create_schema_object('TEST', 'SCHEMA_1', 'CUSTOMER', 'TABLE')

privileges = user.check_privilege(object_to_check)

print(f"""User {user.name} has privileges: {privileges}
      on object {object_to_check.database}.{object_to_check.schema}.{object_to_check.object_name} """)
User PRESTONT4 has privileges: ['OWNERSHIP']
      on object TEST.SCHEMA_1.CUSTOMER

Manage and optimize account resources like warehouses#

We can use the Warehouse object to view stats like usage on the warehouse and size the warhouse up or down. With the resize_recommendation() method we use some simple rules (like local disk spillage of queries) to make resizing recommendations.

[6]:
warehouse = session.Warehouse("COMPUTE_WH")

# view warehouse query history for the last 5 hours
query_hist_df = warehouse.query_history(5, 0, interval='hour')
print(f"Latest 5 queries: {query_hist_df['QUERY_ID'].values[0:5]} \n")

# get warehouse optimization recommendations based on warehouse usage
recommendation = warehouse.resize_recommendation(auto_apply = False)
print(f"Resize recommendation: {recommendation}")

# resize the warehouse to a "small" size based on the recommendation
resize_status = warehouse.resize("SMALL")
print(f"Resize execution: {resize_status}")

Latest 5 queries: ['01abb8c0-0001-9954-001b-13870005e1e2'
 '01abb8c0-0001-99a0-001b-138700065092'
 '01abb8c0-0001-9965-001b-13870006d022'
 '01abb8c0-0001-9980-001b-13870005c226'
 '01abb8bf-0001-99b7-001b-13870006c022']

Resize recommendation: Size Down
Resize execution: Statement executed successfully.

Handle edge cases that Snowpark API does not support#

  • Additional higher level functions support pandas functions like concat and melt

[9]:
# Concat example
# This is useful becasuse the default Snowpark union cannot handle mismathced column name
from snowflake.snowpark.types import IntegerType, StringType, StructField, StructType, FloatType, NullType

schema_1 = StructType([StructField("a", IntegerType()), StructField("b", StringType())])
schema_2 = StructType([StructField("a", FloatType()), StructField("c", StringType())])
schema_3 = StructType([StructField("a", IntegerType()), StructField("c", StringType())])
schema_4 = StructType([StructField("c", StringType()), StructField("d", StringType())])


df_1 = session.create_dataframe([[1, "snow"], [3, "flake"]], schema_1)
df_2 = session.create_dataframe([[2.0, "ice"], [4.0, "pick"]], schema_2)
df_3 = session.create_dataframe([[6, "test_1"], [7, "test_2"]], schema_3)
df_4 = session.create_dataframe([["testing_d", "testing_f"], ["testing_g", "testing_h"]], schema_4)

union_dfs = [df_1, df_2, df_3, df_4]

unioned_df = session.concat(union_dfs)

unioned_df.show()
----------------------------------------
|"A"   |"B"    |"C"        |"D"        |
----------------------------------------
|1.0   |snow   |NULL       |NULL       |
|3.0   |flake  |NULL       |NULL       |
|2.0   |NULL   |ice        |NULL       |
|4.0   |NULL   |pick       |NULL       |
|6.0   |NULL   |test_1     |NULL       |
|7.0   |NULL   |test_2     |NULL       |
|NULL  |NULL   |testing_d  |testing_f  |
|NULL  |NULL   |testing_g  |testing_h  |
----------------------------------------

[2]:
# Melt example
# Currenlty Snowpark doesn't have function like melt

from snowflake.snowpark.types import IntegerType, StringType, StructField, StructType, FloatType, NullType

schema = StructType([StructField("A", StringType()), StructField("B", IntegerType()), StructField("C", IntegerType())])
df = session.create_dataframe([['a', 1, 2], ['b', 3, 4], ['c', 5, 6]], schema)

melt_df = session.melt(df, ['A'], ['B', 'C'])
melt_df.show()
------------------------------
|"A"  |"VALUE"  |"VARIABLE"  |
------------------------------
|a    |1        |B           |
|b    |3        |B           |
|c    |5        |B           |
|a    |2        |C           |
|b    |4        |C           |
|c    |6        |C           |
------------------------------

Replicate Account Object To Another Account#

(Usefull for RBACK testing purposes)

[ ]:
from ice_pick import extend_session
from snowflake.snowpark import Session
import configparser

# create a source and target session, the target session can be a vanilla snowpark connector
config = configparser.ConfigParser()
config.read('snowflake_creds.config')
source_session = extend_session(Session).builder.configs(dict(config['SOURCE'])).create()
target_session = Session.builder.configs(dict(config['TARGET'])).create()

# grab the objects in source account and get the ddl
# some objects in Snowflake don't have the "GET_DDL" function, so we mock the ddl instead

source_dbs_filter = source_session.create_account_object_filter([".*"], ['database'])
source_dbs = source_dbs_filter.return_account_objects()
source_db_ddls = [ source_db.get_ddl() for source_db in source_dbs ]

source_schemas_filter = source_session.create_account_object_filter([".*"], ['schema'])
source_schemas = source_schemas_filter.return_account_objects()
source_schema_ddls = [ source_schema.get_ddl() for source_schema in source_schemas ]

source_acct_objs_filter = source_session.create_account_object_filter([".*"], ['user', 'role', 'warehouse'])
source_acct_objs = source_acct_objs_filter.return_account_objects()
source_acct_obj_ddls = [ source_acct_obj.mock_ddl() for source_acct_obj in source_acct_objs ]

schema_objects_filter = source_session.create_schema_object_filter(['.*'], ['.*'], ['.*'], ["TABLES", "VIEWS"])
schema_objects = schema_objects_filter.return_schema_objects()
schema_objects_ddls = [ schema_object.get_ddl() for schema_object in schema_objects ]

# Executed the ddl collected from the source account in the target account
def execute_ddl_in_target(session: Session, ddls:list) -> None:
    statuses = [ target_session.sql(ddl).collect() for ddl in ddls ]
    return statuses

execute_ddl_in_target(target_session, source_db_ddls)
execute_ddl_in_target(target_session, source_schema_ddls)
execute_ddl_in_target(target_session, source_acct_obj_ddls)
execute_ddl_in_target(target_session, schema_objects_ddls)